SPARK

In [1]:
!pip install pyspark




In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col, when, isnull, avg, count


In [3]:
spark = SparkSession.builder\
.appName('localiza')\
.getOrCreate()

CONEXÃO COM DRIVE

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


LEITURA DO CSV

In [5]:
csv_file = '/content/drive/MyDrive/localiza/df_fraud_credit.csv'

In [6]:

df = spark.read.csv(csv_file, header=True, inferSchema=True)



In [74]:
#df.show()

LIMPEZA DOS DADOS IMPORTADOS E QUALIDADE DOS DADOS



In [7]:
#COUNT TOTAL DA BASE
count_total = df.count()

In [8]:
count_total

9291894

In [9]:
#COUNT BASE SEM DUPLICAÇÃO
count_sem_duplicacao = df.dropDuplicates().count()

In [10]:
count_sem_duplicacao

9291892

DF SEM DUPLICADAS

In [9]:
df = df.dropDuplicates()

In [10]:
df.count()

9291892

In [11]:
#COLUNAS COM VALORES NULL

valores_null = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])


In [12]:
valores_null.show()


+---------+---------------+-----------------+------+----------------+---------------+---------+---------------+----------------+----------------+---------+----------+-------+
|timestamp|sending_address|receiving_address|amount|transaction_type|location_region|ip_prefix|login_frequency|session_duration|purchase_pattern|age_group|risk_score|anomaly|
+---------+---------------+-----------------+------+----------------+---------------+---------+---------------+----------------+----------------+---------+----------+-------+
|        0|              0|                0|     0|               0|              0|        0|              0|               0|               0|        0|         0|      0|
+---------+---------------+-----------------+------+----------------+---------------+---------+---------------+----------------+----------------+---------+----------+-------+



In [None]:
#COUNT COM A COLUNA LOCAL_REGION = 0

In [16]:
count_coluna_location_region_zerada = df.filter(df['location_region'] == '0').count()


In [17]:
count_coluna_location_region_zerada

50000

In [None]:
#COUNT COM A COLUNA AMOUNT = NONE

In [18]:
count_coluna_amount_none = df.filter(df['amount'] == 'none').count()

In [19]:
count_coluna_amount_none

50000

# DF FINAL

In [11]:
df_final = df.filter(df['location_region'] != '0')



In [12]:
df_final.count()

9241892

In [13]:
df_final = df_final.filter(df_final['amount'] != 'none')


In [14]:
df_final.count()

9192145

COUNT APÓS TIRAR DUPLICADAS, LOCATION_REGION ==0 E AMOUNT == NONE

In [44]:
count_final = df_final.count()

In [28]:
count_final

0

local_region = 0

# PERGUNTA 1

In [15]:

p1 = df_final.groupBy("location_region").agg(avg("risk_score").alias("media_risk_score"))




In [17]:
p1_ordenado = p1.orderBy(col("media_risk_score").desc())


In [18]:
p1_ordenado.show()

+---------------+------------------+
|location_region|  media_risk_score|
+---------------+------------------+
|  North America| 45.15483362554195|
|  South America|45.139408383849265|
|           Asia| 44.99454178762852|
|         Africa| 44.90222970134172|
|         Europe| 44.59870767602968|
+---------------+------------------+



In [19]:
p1_ordenado.printSchema()

root
 |-- location_region: string (nullable = true)
 |-- media_risk_score: double (nullable = true)



# PERGUNTA 2

In [20]:
df_p2 = df_final.filter(df_final["transaction_type"] == "sale")


In [21]:
#PARTICIONANDO PELA COLUNA RECEIVING_ADDRESS
window_spec = Window.partitionBy("receiving_address").orderBy(col("timestamp").desc())


In [23]:
df_p2_linhas_numeradas = df_p2.withColumn("row_number", row_number().over(window_spec))


In [24]:
oferta_atual = df_p2_linhas_numeradas.filter(col("row_number") == 1)


In [26]:
top_3_ofertas = oferta_atual.orderBy(col("amount").desc()).limit(3)


In [27]:
p2_final = top_3_ofertas.select("receiving_address", "amount", "timestamp")


In [28]:
p2_final.show()


+--------------------+------+----------+
|   receiving_address|amount| timestamp|
+--------------------+------+----------+
|0x9508352b847b315...|9991.0|1704140418|
|0x16c9903f99897fa...|9858.0|1704199083|
|0x9c90e60ef483c90...|9740.0|1704190903|
+--------------------+------+----------+



In [42]:
output_raw = "/content/drive/MyDrive/localiza/lake_localiza/raw/raw.parquet"

In [40]:
output_silver = "/content/drive/MyDrive/localiza/lake_localiza/silver/silver.parquet"

In [30]:
output_gold_p1 = "/content/drive/MyDrive/localiza/lake_localiza/gold/p1.parquet"

In [31]:
output_gold_p2 = "/content/drive/MyDrive/localiza/lake_localiza/gold/p2.parquet"

In [43]:
df_final.write.parquet(output_raw)

TRATAMENTO PARA CAMADA PRATA

In [36]:
df_final.columns

['timestamp',
 'sending_address',
 'receiving_address',
 'amount',
 'transaction_type',
 'location_region',
 'ip_prefix',
 'login_frequency',
 'session_duration',
 'purchase_pattern',
 'age_group',
 'risk_score',
 'anomaly']

In [37]:
# Renomeando as colunas do DataFrame para português
df_silver = df_final \
    .withColumnRenamed('timestamp', 'data_hora') \
    .withColumnRenamed('sending_address', 'endereco_envio') \
    .withColumnRenamed('receiving_address', 'endereco_recebimento') \
    .withColumnRenamed('amount', 'quantidade') \
    .withColumnRenamed('transaction_type', 'tipo_transacao') \
    .withColumnRenamed('location_region', 'regiao_localizacao') \
    .withColumnRenamed('ip_prefix', 'prefixo_ip') \
    .withColumnRenamed('login_frequency', 'frequencia_login') \
    .withColumnRenamed('session_duration', 'duracao_sessao') \
    .withColumnRenamed('purchase_pattern', 'padrao_compra') \
    .withColumnRenamed('age_group', 'faixa_etaria') \
    .withColumnRenamed('risk_score', 'pontuacao_risco') \
    .withColumnRenamed('anomaly', 'anomalia')



In [38]:
df_silver.show(2)

+----------+--------------------+--------------------+----------+--------------+------------------+----------+----------------+--------------+-------------+------------+---------------+--------+
| data_hora|      endereco_envio|endereco_recebimento|quantidade|tipo_transacao|regiao_localizacao|prefixo_ip|frequencia_login|duracao_sessao|padrao_compra|faixa_etaria|pontuacao_risco|anomalia|
+----------+--------------------+--------------------+----------+--------------+------------------+----------+----------------+--------------+-------------+------------+---------------+--------+
|1641908151|0xaa3037771832558...|0x94ed0de7f781533...|   18098.0|          sale|              Asia|    172.16|               1|            28|       random|         new|           42.0|low_risk|
|1587247264|0xd5a98e35d92b478...|0xb66b0ca73fe4f58...|   71703.0|      transfer|     North America|      10.0|               3|            42|      focused| established|          18.75|low_risk|
+----------+-------------

In [41]:
df_silver.write.parquet(output_silver)

TRATAMENTO CAMADO OURO

In [32]:
p1_ordenado.write.parquet(output_gold_p1)

In [33]:
p2_final.write.parquet(output_gold_p2)

In [21]:
spark.stop()