## Instalação e preparação do Spark

In [1]:

!pip install pyspark

# NGROK
!wget -qnc https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip -n -q ngrok-stable-linux-amd64.zip


# Sessão Spark
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
      .config('spark.ui.port', '4050')
      .appName("SparkML")
      .getOrCreate()
)

# A sessão do SparkUI com NGROK
!./ngrok authtoken cr_2bPPeoC6NJqG0Huey7fRe713uFd
get_ipython().system_raw('./ngrok http 4050 &')
!sleep 10
!curl -s http://localhost:4040/api/tunnels | grep -Po 'public_url":"(?=https)\K[^"]*'

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=7339bead644f66e5e33c5032387e0a34739774230fe8ce35ca060b62d1a0a662
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml


In [2]:
from pyspark.sql.types import *

schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType())
])


schema_base_pix = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('chave_pix', StringType()),
    StructField('categoria', StringType()),
    StructField('transaction_date', StringType()),
    StructField('fraude', IntegerType())
])


caminho_json = '/content/case_final.json'

df = spark.read.json(
    caminho_json,
    schema=schema_base_pix,
    timestampFormat="yyyy-MM-dd HH:mm:ss"
)

In [3]:
df.show()
df.printSchema()

+------------+------------------+--------------------+--------------------+---------+-------------+-------------------+------+
|id_transacao|             valor|           remetente|        destinatario|chave_pix|    categoria|   transaction_date|fraude|
+------------+------------------+--------------------+--------------------+---------+-------------+-------------------+------+
|        1000|            588.08|{Jonathan Gonsalv...|{Calebe Melo, Cai...|aleatoria|       outros|2021-07-16 05:00:55|     0|
|        1001|           80682.5|{Jonathan Gonsalv...|{Davi Lucas Perei...|  celular|transferencia|2022-04-20 12:34:01|     1|
|        1002|             549.9|{Jonathan Gonsalv...|{Sabrina Castro, ...|      cpf|        lazer|2022-07-10 16:51:34|     0|
|        1003|             90.83|{Jonathan Gonsalv...|{Francisco da Con...|aleatoria|   transporte|2022-10-20 10:57:36|     0|
|        1004|13272.619999999999|{Jonathan Gonsalv...|{Isabelly Ferreir...|    email|transferencia|2021-04-06 2

## Preparação dos dados

In [4]:
from pyspark.sql.functions import col

df_flatten = df.withColumns({'remetente_nome': col('remetente').getField('nome'),
                                'remetente_banco': col('remetente').getField('banco'),
                                'remetente_tipo': col('remetente').getField('tipo'),
                                'destinatario_nome': col('destinatario').getField('nome'),
                                'destinatario_banco': col('destinatario').getField('banco'),
                                'destinatario_tipo': col('destinatario').getField('tipo')


}).drop('remetente','destinatario')

In [5]:
df_flatten.printSchema()

df_flatten.show()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)
 |-- remetente_nome: string (nullable = true)
 |-- remetente_banco: string (nullable = true)
 |-- remetente_tipo: string (nullable = true)
 |-- destinatario_nome: string (nullable = true)
 |-- destinatario_banco: string (nullable = true)
 |-- destinatario_tipo: string (nullable = true)

+------------+------------------+---------+-------------+-------------------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+
|id_transacao|             valor|chave_pix|    categoria|   transaction_date|fraude|    remetente_nome|remetente_banco|remetente_tipo|   destinatario_nome|destinatario_banco|destinatario_tipo|
+------------+------------------+---------+-------------+-------------

In [6]:
df_flatten.describe().show()

+-------+-----------------+------------------+---------+-----------+-------------------+------------------+------------------+---------------+--------------+-----------------+------------------+-----------------+
|summary|     id_transacao|             valor|chave_pix|  categoria|   transaction_date|            fraude|    remetente_nome|remetente_banco|remetente_tipo|destinatario_nome|destinatario_banco|destinatario_tipo|
+-------+-----------------+------------------+---------+-----------+-------------------+------------------+------------------+---------------+--------------+-----------------+------------------+-----------------+
|  count|           100000|            100000|   100000|     100000|             100000|            100000|            100000|         100000|        100000|           100000|            100000|           100000|
|   mean|          50999.5|10303.358732200059|     NULL|       NULL|               NULL|           0.15367|              NULL|           NULL|      

In [7]:
df_flatten.printSchema()

df_flatten.show()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)
 |-- remetente_nome: string (nullable = true)
 |-- remetente_banco: string (nullable = true)
 |-- remetente_tipo: string (nullable = true)
 |-- destinatario_nome: string (nullable = true)
 |-- destinatario_banco: string (nullable = true)
 |-- destinatario_tipo: string (nullable = true)

+------------+------------------+---------+-------------+-------------------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+
|id_transacao|             valor|chave_pix|    categoria|   transaction_date|fraude|    remetente_nome|remetente_banco|remetente_tipo|   destinatario_nome|destinatario_banco|destinatario_tipo|
+------------+------------------+---------+-------------+-------------

## Modelagem

Perguntas a serem respondidas?


- Para qual banco esse cliente mais transfere?
- Quantas transferências por período que esse cliente fez para cada banco?
- Baseando-se no valor das transferências, poderia dar um aumento de crédito?
- Para o que esse cliente mais usa as transferências?
- Executar um algoritmo de machine learning que identifique possíveis transações com fraude.


In [8]:
# Bancos que são os maiores destinos de transferências

df_flatten.groupBy('destinatario_banco').count().show()


+------------------+-----+
|destinatario_banco|count|
+------------------+-----+
|            Nubank|14297|
|                C6|14204|
|               BTG|14390|
|                XP|14401|
|             Caixa|14240|
|          Bradesco|14187|
|              Itau|14281|
+------------------+-----+



In [9]:
#Quaantas transferências por período que esse cliente fez para cada banco?
from pyspark.sql.functions import *

df_flatten.groupBy(date_format(col('transaction_date'),'yyyy-MM').alias('ano-mes'),'destinatario_banco').count().orderBy(col('ano-mes').asc()).show()

+-------+------------------+-----+
|ano-mes|destinatario_banco|count|
+-------+------------------+-----+
|2021-01|          Bradesco|  345|
|2021-01|             Caixa|  345|
|2021-01|            Nubank|  346|
|2021-01|               BTG|  326|
|2021-01|                XP|  362|
|2021-01|                C6|  349|
|2021-01|              Itau|  342|
|2021-02|               BTG|  552|
|2021-02|              Itau|  564|
|2021-02|             Caixa|  517|
|2021-02|                XP|  531|
|2021-02|          Bradesco|  530|
|2021-02|            Nubank|  536|
|2021-02|                C6|  564|
|2021-03|          Bradesco|  601|
|2021-03|             Caixa|  586|
|2021-03|                XP|  618|
|2021-03|              Itau|  592|
|2021-03|            Nubank|  589|
|2021-03|                C6|  560|
+-------+------------------+-----+
only showing top 20 rows



In [10]:
# Qual a média de envios por banco?
df_flatten.groupBy(col('destinatario_banco')).agg(avg('valor').alias('media_valor')).orderBy(col('media_valor').asc()).show()

+------------------+------------------+
|destinatario_banco|       media_valor|
+------------------+------------------+
|               BTG|10122.299803335622|
|              Itau|10230.876305580874|
|             Caixa|10254.864015449395|
|                C6|10309.499774711307|
|            Nubank|10316.475401133126|
|                XP|10328.071572113045|
|          Bradesco| 10564.19458870794|
+------------------+------------------+



In [11]:
# Descrição do tipo de gasto de acordo com o período e banco

df_flatten.groupBy(date_format(col('transaction_date'),'yyyy-MM').alias('ano-mes'),
                   'destinatario_banco',
                   'categoria').count().orderBy(col('ano-mes').asc()).show()

+-------+------------------+-------------+-----+
|ano-mes|destinatario_banco|    categoria|count|
+-------+------------------+-------------+-----+
|2021-01|             Caixa|     educacao|   24|
|2021-01|              Itau|        saude|   40|
|2021-01|            Nubank|        saude|   28|
|2021-01|                XP|    presentes|   38|
|2021-01|               BTG|   transporte|   29|
|2021-01|               BTG|  alimentacao|   31|
|2021-01|              Itau|transferencia|   75|
|2021-01|                XP|       outros|   34|
|2021-01|                XP|   transporte|   26|
|2021-01|          Bradesco|transferencia|   96|
|2021-01|              Itau|    vestuario|   28|
|2021-01|               BTG|        lazer|   34|
|2021-01|          Bradesco|   transporte|   40|
|2021-01|                XP|transferencia|   89|
|2021-01|                C6|  alimentacao|   38|
|2021-01|             Caixa|        lazer|   37|
|2021-01|          Bradesco|       outros|   25|
|2021-01|           

In [12]:
#Gasto anual por categoria
df_flatten.groupBy(
    date_format(col('transaction_date'), 'yyyy').alias('ano'),
    'categoria'
).agg(
    sum('valor').cast(DecimalType(38, 2)).alias('valor')
).orderBy(col('ano').asc(), col('valor').desc()).show(30)

+----+-------------+------------+
| ano|    categoria|       valor|
+----+-------------+------------+
|2021|transferencia|416670741.70|
|2021|    presentes| 10496042.02|
|2021|    vestuario| 10412651.93|
|2021|        saude| 10384662.82|
|2021|  alimentacao| 10237928.25|
|2021|     educacao| 10106095.07|
|2021|       outros|  9737076.38|
|2021|        lazer|  9622503.14|
|2021|   transporte|  9497893.47|
|2022|transferencia|430191098.79|
|2022|    vestuario| 10696006.48|
|2022|       outros| 10566645.10|
|2022|   transporte| 10553408.29|
|2022|  alimentacao| 10545783.23|
|2022|     educacao| 10367124.44|
|2022|    presentes| 10311585.74|
|2022|        lazer| 10295747.63|
|2022|        saude| 10048245.07|
|2023|transferencia| 16148682.62|
|2023|       outros|   501308.34|
|2023|        lazer|   469671.41|
|2023|    vestuario|   459528.70|
|2023|     educacao|   432305.66|
|2023|   transporte|   427790.54|
|2023|        saude|   400683.64|
|2023|  alimentacao|   392078.31|
|2023|    pres

In [13]:
# média de transferencia por período

df_flatten.groupBy(

                   date_format(col('transaction_date'), 'yyyy').alias('Ano')

).agg(avg('id_transacao').cast(IntegerType()).alias('Media de transacoes')).orderBy(col('Ano').asc()).show()

+----+-------------------+
| Ano|Media de transacoes|
+----+-------------------+
|2021|              51204|
|2022|              50823|
|2023|              50481|
+----+-------------------+



In [14]:
# Número de Fraudes

df_flatten.withColumn("Tipo_de_Transacao", when(df_flatten['fraude'] == 1, 'Fraude').otherwise('Não Fraude')).groupBy('Tipo_de_Transacao').count().show()

+-----------------+-----+
|Tipo_de_Transacao|count|
+-----------------+-----+
|           Fraude|15367|
|       Não Fraude|84633|
+-----------------+-----+



In [15]:
# Número de fraudes por categoria
df_flatten.withColumn("Tipo_de_Transacao", when(df_flatten['fraude'] == 1, 'Fraude').otherwise('Não Fraude')).groupBy("Categoria", "Tipo_de_Transacao").count().show()

+-------------+-----------------+-----+
|    Categoria|Tipo_de_Transacao|count|
+-------------+-----------------+-----+
|    vestuario|       Não Fraude| 9503|
|        lazer|       Não Fraude| 9464|
|   transporte|       Não Fraude| 9174|
|     educacao|       Não Fraude| 9460|
|transferencia|           Fraude|15367|
|transferencia|       Não Fraude| 9377|
|        saude|       Não Fraude| 9476|
|    presentes|       Não Fraude| 9254|
|  alimentacao|       Não Fraude| 9548|
|       outros|       Não Fraude| 9377|
+-------------+-----------------+-----+



In [16]:
# Analisando as fraudes por transferências

df_flatten.withColumn("Tipo_de_Transacao", when(df_flatten['Fraude'] == 1, 'Fraude').otherwise('Não Fraude')).groupBy(
    "Categoria",
    "Tipo_de_Transacao").agg(count("*").alias("Contagem")).where("Categoria = 'transferencia'").show()

+-------------+-----------------+--------+
|    Categoria|Tipo_de_Transacao|Contagem|
+-------------+-----------------+--------+
|transferencia|           Fraude|   15367|
|transferencia|       Não Fraude|    9377|
+-------------+-----------------+--------+



In [17]:
#Frequências de valores nas transações fraudulentas

df_range = df_flatten.withColumn("Tipo_de_Transacao", when(df_flatten['Fraude'] == 1, 'Fraude').otherwise('Não Fraude'))

df_range = df_range.filter(col("Tipo_de_Transacao") == 'Fraude').withColumn("Range", floor(col('valor')/1000)*1000).groupBy("Range").count().orderBy(col("Range").desc())

df_range.show()

+-----+-----+
|Range|count|
+-----+-----+
|89000|  222|
|88000|  208|
|87000|  230|
|86000|  203|
|85000|  205|
|84000|  245|
|83000|  206|
|82000|  206|
|81000|  214|
|80000|  213|
|79000|  205|
|78000|  230|
|77000|  237|
|76000|  232|
|75000|  190|
|74000|  207|
|73000|  237|
|72000|  234|
|71000|  234|
|70000|  222|
+-----+-----+
only showing top 20 rows



In [18]:
# Range das fraudes

df_range.select(max('Range'), min('Range')).show()

+----------+----------+
|max(Range)|min(Range)|
+----------+----------+
|     89000|     19000|
+----------+----------+



## Modelo de Predição de Fraudes

In [60]:
from pyspark.sql.functions import col, udf

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [48]:
df = df_flatten.drop('remetente', 'id')

In [49]:
indexer = StringIndexer(
    inputCols=[
        "destinatario_nome",
        "destinatario_banco",
        "destinatario_tipo",
        "categoria",
        "chave_pix"
    ],
    outputCols=[
        "destinatario_nome_index",
        "destinatario_banco_index",
        "destinatario_tipo_index",
        "categoria_index",
        "chave_pix_index"
    ])

In [50]:
df_index = indexer.fit(df).transform(df)
df_index.show()

+------------+------------------+---------+-------------+-------------------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|id_transacao|             valor|chave_pix|    categoria|   transaction_date|fraude|    remetente_nome|remetente_banco|remetente_tipo|   destinatario_nome|destinatario_banco|destinatario_tipo|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|categoria_index|chave_pix_index|
+------------+------------------+---------+-------------+-------------------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|        1000|            588.08|aleatoria|       outros|2021-07-16 05:00:55|     0|Jonathan Gonsalves|   

In [51]:
cols_filtradas = [

                     'valor',
                     'transaction_date',
                     'fraude',
                     'destinatario_nome_index',
                     'destinatario_banco_index',
                     'destinatario_tipo_index',
                     'categoria_index',
                     'chave_pix_index'

]

In [52]:
op_fraude = df_index.select(cols_filtradas).filter("fraude == 1")
op_nao_fraude = df_index.select(cols_filtradas).filter("fraude == 0")

op_nao_fraude = op_nao_fraude.sample(fraction=0.05, withReplacement=False,seed = 42 )

In [53]:
df_concat = op_nao_fraude.union(op_fraude)
df = df_concat.sort("transaction_date")
df.count()

19658

In [54]:
train, test = df.randomSplit([0.7, 0.3], seed = 123)
print("train =", train.count(), " test =", test.count())

train = 13698  test = 5960


In [55]:
op_fraude = udf(lambda fraud: 1.0 if fraud > 0 else 0.0, DoubleType())
train = train.withColumn("op_fraude", op_fraude(train.fraude))

In [56]:
#VectorAssembler

vetor = VectorAssembler(
    inputCols = [x for x in train.columns if x not in ['transaction_date', "fraude",'op_fraude']],
    outputCol = "features"
    )

#regressão logística:

lr = LogisticRegression().setParams(
    maxIter = 100000,
    labelCol = "op_fraude",
    predictionCol = "prediction")

model = Pipeline(stages = [vetor, lr]).fit(train)


In [57]:
predicted = model.transform(test)

predicted = predicted.withColumn("op_fraude", op_fraude(predicted.fraude))
predicted.crosstab("op_fraude", "prediction").show()

+--------------------+----+----+
|op_fraude_prediction| 0.0| 1.0|
+--------------------+----+----+
|                 1.0|   0|4646|
|                 0.0|1314|   0|
+--------------------+----+----+

