
## Teste Localiza - Engenharia de Dados
### Autor: Gustavo Lemos
### Data: 11/11/24
---

### 1. Ingestão e formato - Lendo arquivo csv do FileStore DBFS e transformando em parquet

In [0]:
# File location and type for CSV input
file_location = "/FileStore/tables/df_fraud_credit.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# Carregar o arquivo CSV para um DataFrame
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# Exibir as primeiras 5 linhas do DataFrame carregado
display(df.head(5))

# Contar o número total de registros
print(f"Total de registros: {df.count()}")

# Definir o local onde o arquivo Parquet será salvo
parquet_location = "/FileStore/tables/table_fraud_credit.parquet"

# Salvar o DataFrame como um arquivo Parquet
df.write.format("parquet").mode("overwrite").save(parquet_location)

# Ler o arquivo Parquet de volta para verificar
df_parquet = spark.read.format("parquet").load(parquet_location)

# Exibir as primeiras 5 linhas do DataFrame Parquet
display(df_parquet.head(5))

# Contar o número de registros no DataFrame Parquet
print(f"Total de registros no Parquet: {df_parquet.count()}")


timestamp,sending_address,receiving_address,amount,transaction_type,location_region,ip_prefix,login_frequency,session_duration,purchase_pattern,age_group,risk_score,anomaly
1618185002,0x9d32d0bf2c00f41ce7ca01b66e174cc4dcb0c1da,0x39f82e1c09bc6d7baccc1e79e5621ff812f50572,67435.0,transfer,Europe,192.0,3,48,focused,established,18.75,low_risk
1698642474,0xd6e251c23cbf52dbd472f079147873e655d8096f,0x51e8fbe24f124e0e30a614e14401b9bbfed5384c,1.0,purchase,South America,172.0,5,61,focused,established,25.0,low_risk
1619180066,0x2e0925b922fed01f6a85d213ae2718f54b8ca305,0x52c7911879f783d590af45bda0c0ef2b8536706f,66211.0,purchase,Asia,192.168,3,74,focused,established,31.25,low_risk
1591413882,0x93efefc25fcaf31d7695f28018d7a11ece55457f,0x8ac3b7bd531b3a833032f07d4e47c7af6ea7bace,14998.0,transfer,South America,172.0,8,111,high_value,veteran,36.75,low_risk
1611257295,0xad3b8de45d63f5cce28aef9a82cf30c397c6ceb9,0x6fdc047c2391615b3facd79b4588c7e9106e49f2,66002.0,sale,Africa,172.16,6,100,high_value,veteran,62.5,moderate_risk


Total de registros: 9291894


timestamp,sending_address,receiving_address,amount,transaction_type,location_region,ip_prefix,login_frequency,session_duration,purchase_pattern,age_group,risk_score,anomaly
1685650065,0x5297edd262e7084608cbbcb8d666c86b4740ce6c,0x84df8a4e93edd124a95b6e0c034cba4b783c9ed9,57785.0,sale,Europe,10.0,1,28,random,new,42.0,low_risk
1606499942,0x927f8801cf6073f5b6da2df4def5f548735e7062,0xcdcace0693e828baa1857ee5b127419bb5e7fc56,36016.0,transfer,North America,10.0,1,39,random,new,26.25,low_risk
1626687883,0xe5c91f67af87a0e20267b290e32eadab60cb044c,0x4ee64b1d1704498a40940040f28c6109d5b9c1ce,62972.0,sale,South America,192.168,8,136,high_value,veteran,84.375,moderate_risk
1658934355,0x29eb22d2110b8a73ee78542975d2fb701f31929a,0x11f40ae67f6b648e8b4bbc2d1a04c665214f7d25,71192.0,transfer,Africa,192.0,8,98,high_value,veteran,43.75,low_risk
1679189913,0xd5a98e35d92b478ad4dbefdf2b55b22f331043dc,0xb66b0ca73fe4f58fbc2485b46474688d9913753e,31251.0,sale,South America,192.0,2,29,random,new,42.0,low_risk


Total de registros no Parquet: 9291894


---
### 2. Limpeza dos Dados
Para limpeza, vamos tratar dados ausentes e fazer algumas transformações no formato de dados.

In [0]:
# Remover registros com valores nulos ou inconsistentes
df_cleaned = df_parquet.dropna()  # Remove as linhas com valores nulos
df_cleaned = df_cleaned.filter(df_cleaned["amount"].isNotNull())  # Garantir que 'amount' não seja nulo

# Converter tipos de dados onde necessário (exemplo: timestamp)
from pyspark.sql.functions import col

df_cleaned = df_cleaned.withColumn("timestamp", col("timestamp").cast("timestamp"))

display(df_cleaned.head(10))
print(df_cleaned.count())


timestamp,sending_address,receiving_address,amount,transaction_type,location_region,ip_prefix,login_frequency,session_duration,purchase_pattern,age_group,risk_score,anomaly
2023-06-01T20:07:45.000+0000,0x5297edd262e7084608cbbcb8d666c86b4740ce6c,0x84df8a4e93edd124a95b6e0c034cba4b783c9ed9,57785.0,sale,Europe,10.0,1,28,random,new,42.0,low_risk
2020-11-27T17:59:02.000+0000,0x927f8801cf6073f5b6da2df4def5f548735e7062,0xcdcace0693e828baa1857ee5b127419bb5e7fc56,36016.0,transfer,North America,10.0,1,39,random,new,26.25,low_risk
2021-07-19T09:44:43.000+0000,0xe5c91f67af87a0e20267b290e32eadab60cb044c,0x4ee64b1d1704498a40940040f28c6109d5b9c1ce,62972.0,sale,South America,192.168,8,136,high_value,veteran,84.375,moderate_risk
2022-07-27T15:05:55.000+0000,0x29eb22d2110b8a73ee78542975d2fb701f31929a,0x11f40ae67f6b648e8b4bbc2d1a04c665214f7d25,71192.0,transfer,Africa,192.0,8,98,high_value,veteran,43.75,low_risk
2023-03-19T01:38:33.000+0000,0xd5a98e35d92b478ad4dbefdf2b55b22f331043dc,0xb66b0ca73fe4f58fbc2485b46474688d9913753e,31251.0,sale,South America,192.0,2,29,random,new,42.0,low_risk
2023-07-17T12:00:32.000+0000,0x927eb5472ae6124e561287052ba08e71b72ab8c5,0x2c1a91f45c0decdbf8c2b84abdb0a349237f7f41,72296.0,purchase,Africa,10.0,3,71,focused,established,31.25,low_risk
2020-09-18T06:04:30.000+0000,0x992ff96a3bf0676cef2d26d0276b27456ce19e7b,0xe2464122a2d71df4324151c8f7572ef23f5dd0ab,1248.0,transfer,South America,192.168,3,64,focused,established,15.0,low_risk
2021-08-01T11:31:23.000+0000,0xd1b28593139022c70b1d5e370cc800226efbf072,0x927f8801cf6073f5b6da2df4def5f548735e7062,66542.0,purchase,Europe,192.168,5,67,focused,established,31.25,low_risk
2021-04-25T08:35:12.000+0000,0x0bdc9e8dd3cf34b207a2f547f45a23e5808d84fb,0xb69b3912979ac2571f86843000a21c6c5ce84342,74674.0,transfer,South America,192.0,5,54,focused,established,18.75,low_risk
2020-01-23T23:09:43.000+0000,0xf8455a61277886b577be973b0401a1fb3ded50e6,0x3108e1d0956526998fc5cc8a008ea4fed631162b,24966.0,sale,North America,192.168,7,131,high_value,veteran,52.5,low_risk


9291894


In [0]:
#Verificação de tipagem dos dados
df_cleaned.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- sending_address: string (nullable = true)
 |-- receiving_address: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- location_region: string (nullable = true)
 |-- ip_prefix: double (nullable = true)
 |-- login_frequency: integer (nullable = true)
 |-- session_duration: integer (nullable = true)
 |-- purchase_pattern: string (nullable = true)
 |-- age_group: string (nullable = true)
 |-- risk_score: string (nullable = true)
 |-- anomaly: string (nullable = true)



---
### 3. Criação das Tabelas-Resultado
**Tabela 1:** Média de "risk_score" por "location_region"

Calcular a média do "risk_score" para cada "location_region" e ordenar de forma decrescente.


In [0]:
from pyspark.sql import functions as F

location_risk_avg = df_cleaned.groupBy("location_region").agg(
    F.avg("risk_score").alias("avg_risk_score")
).orderBy("avg_risk_score", ascending=False)

display(location_risk_avg) 

print(location_risk_avg.count())


location_region,avg_risk_score
North America,45.155840047473845
South America,45.13682993481994
Asia,44.99409205308348
Africa,44.90124784509396
0,44.90044636570302
Europe,44.59929868093872


6


**Tabela 2:** Top 3 "receiving_address" com maior "amount" para transações mais recentes de tipo "sale"

Primeiro, vamos filtrar as transações do tipo "sale", e depois identificar as transações mais recentes para cada "receiving_address", considerando a coluna "timestamp". 

Finalmente, ordenaremos por "amount" e pegaremos os três maiores valores.

In [0]:
# Filtrando transações do tipo 'sale'
sale_transactions = df_cleaned.filter(df_cleaned["transaction_type"] == "sale")

# Encontrar a transação mais recente para cada 'receiving_address'
from pyspark.sql.window import Window

window_spec = Window.partitionBy("receiving_address").orderBy(F.col("timestamp").desc())

# Adicionar uma coluna para a ordem da transação mais recente
sale_transactions_recent = sale_transactions.withColumn("rank", F.row_number().over(window_spec)) \
    .filter(F.col("rank") == 1)  # Filtra apenas as transações mais recentes

# Obter os 3 maiores 'amount' para cada 'receiving_address'
top_3_receiving_addresses = sale_transactions_recent.orderBy("amount", ascending=False) \
    .select("receiving_address", "amount", "timestamp").limit(3)

display(top_3_receiving_addresses)


receiving_address,amount,timestamp
0xfa45921781154db0fcb8468b20a368e48cc8bf4f,none,2023-12-29T22:31:22.000+0000
0x858170b93f5c5c1a33417255bb3f3ff639454ee5,none,2024-01-02T05:22:27.000+0000
0x577754308538f4be10a41afb4f8900cd24d7098f,none,2024-01-02T12:28:20.000+0000


---
### 4. Monitoramento de Qualidade dos Dados
Agora, vamos calcular alguns indicadores de qualidade dos dados, como a quantidade total de registros, registros inseridos e registros com erro.

In [0]:
# Qualidade dos dados
total_records = df.count()  # Total de registros no DataFrame
cleaned_records = df_cleaned.count()  # Total de registros após limpeza
errors = total_records - cleaned_records  # Registros com erro

# Calcular a porcentagem de erros
error_percentage = (errors / total_records) * 100

# Mostrar métricas de qualidade
print(f"Total de Registros: {total_records}")
print(f"Registros Limpos: {cleaned_records}")
print(f"Registros com Erro: {errors}")
print(f"Porcentagem de Erros: {error_percentage:.2f}%")


Total de Registros: 9291894
Registros Limpos: 9291894
Registros com Erro: 0
Porcentagem de Erros: 0.00%


### 4.1 Monitoramento aprimorado

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, FloatType, LongType

# Função para avaliar qualidade dos dados com validações mais completas
def assess_data_quality(df, expected_schema):
    quality_report = {}

    # 1. Quantidade total de registros
    total_records = df.count()
    quality_report['total_records'] = total_records

    # 2. Quantidade de registros com valores nulos
    null_counts = {}
    for col_name in df.columns:
        null_counts[col_name] = df.filter(F.col(col_name).isNull()).count()
    quality_report['null_counts'] = null_counts

    # 3. Erros de tipo de dados (dados fora do tipo esperado)
    type_errors = {}
    for col_name, expected_type in expected_schema.items():
        # Verifica se a coluna tem o tipo correto
        invalid_type_count = df.filter(F.col(col_name).cast(expected_type).isNull()).count()
        if invalid_type_count > 0:
            type_errors[col_name] = invalid_type_count
    quality_report['type_errors'] = type_errors

    # 4. Duplicação de registros (verifica duplicados)
    duplicate_count = df.count() - df.dropDuplicates().count()
    quality_report['duplicate_count'] = duplicate_count

    # 5. Erros de consistência: Exemplo: valores fora de intervalo, dados não formatados corretamente
    consistency_errors = {}
    
    # Verificar se 'amount' é maior que 0 e dentro de um intervalo razoável
    consistency_errors['invalid_amount'] = df.filter(F.col("amount") <= 0).count()
    consistency_errors['amount_out_of_range'] = df.filter((F.col("amount") > 1000000) | (F.col("amount") < 0)).count()  # Exemplo de intervalo
    
    # Verificar se 'risk_score' está dentro do intervalo de 0 a 100
    consistency_errors['invalid_risk_score'] = df.filter((F.col("risk_score") < 0) | (F.col("risk_score") > 100)).count()
    
    # Verificar se 'timestamp' não está no futuro
    consistency_errors['invalid_timestamp'] = df.filter(F.col("timestamp") > F.current_timestamp()).count()

    
    # Verificar se 'transaction_type' é válido (deve ser 'sale', 'purchase' ou 'transfer')
    consistency_errors['invalid_transaction_type'] = df.filter(~F.col("transaction_type").isin("sale", "purchase", "transfer")).count()
    
    # Verificar se o endereço 'receiving_address' tem o formato esperado (exemplo: comprimento fixo ou formato específico)
    consistency_errors['invalid_receiving_address'] = df.filter(F.length(F.col("receiving_address")) != 42).count()  # Exemplo de formato de endereço Ethereum

    # Verificar a validade do 'sending_address'
    consistency_errors['invalid_sending_address'] = df.filter(F.length(F.col("sending_address")) != 42).count()

    quality_report['consistency_errors'] = consistency_errors

    # 6. Conformidade dos dados (percentual de erros)
    total_errors = sum(null_counts.values()) + sum(type_errors.values()) + duplicate_count + sum(consistency_errors.values())
    error_percentage = (total_errors / total_records) * 100 if total_records > 0 else 0
    quality_report['error_percentage'] = error_percentage

    return quality_report

# Definindo o esquema esperado (exemplo)
expected_schema = {
    "timestamp": "long",
    "sending_address": "string",
    "receiving_address": "string",
    "amount": "float",
    "transaction_type": "string",
    "location_region": "string",
    "ip_prefix": "string",
    "login_frequency": "int",
    "session_duration": "int",
    "purchase_pattern": "string",
    "age_group": "string",
    "risk_score": "float",
    "anomaly": "string"
}

# Chame a função para avaliar a qualidade dos dados no DataFrame
quality_report = assess_data_quality(df_cleaned, expected_schema)

# Exibindo o relatório de qualidade
for key, value in quality_report.items():
    print(f"{key}: {value}")


total_records: 9291894
null_counts: {'timestamp': 0, 'sending_address': 0, 'receiving_address': 0, 'amount': 0, 'transaction_type': 0, 'location_region': 0, 'ip_prefix': 0, 'login_frequency': 0, 'session_duration': 0, 'purchase_pattern': 0, 'age_group': 0, 'risk_score': 0, 'anomaly': 0}
type_errors: {'amount': 50000, 'risk_score': 50226}
duplicate_count: 2
consistency_errors: {'invalid_amount': 0, 'amount_out_of_range': 0, 'invalid_risk_score': 0, 'invalid_timestamp': 0, 'invalid_transaction_type': 767639, 'invalid_receiving_address': 0, 'invalid_sending_address': 0}
error_percentage: 9.3400441287858
