In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=dc4fa7dd733a09fb7c377b5e9a239b23519833a656fb2449bab546cbe2a748dc
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# ConfigureSparkUI
conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
sc.stop()

spark = (
    SparkSession.builder                  # Método da classe que constrói a sessão spark
      .appName("Meu Primeiro App Spark")  # Nome do App Spark
      .getOrCreate())                     # Verifica se há uma sessão ativa, e se não há, cria uma nova sessão


In [34]:
#lendo os dados
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType

schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType()),
])


schema_base_pix = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('transaction_date', TimestampType()),
    StructField('chave_pix', StringType()),
    StructField('fraude', IntegerType()),
    StructField('categoria',StringType())
])


caminho_json = '/content/case_final.json'

df = spark.read.json(
    caminho_json,
    schema=schema_base_pix,
    timestampFormat="yyyy-MM-dd HH:mm:ss"
)

In [35]:
df.show()

+------------+------------------+--------------------+--------------------+-------------------+---------+------+-------------+
|id_transacao|             valor|           remetente|        destinatario|   transaction_date|chave_pix|fraude|    categoria|
+------------+------------------+--------------------+--------------------+-------------------+---------+------+-------------+
|        1000|            588.08|{Jonathan Gonsalv...|{Calebe Melo, Cai...|2021-07-16 05:00:55|aleatoria|     0|       outros|
|        1001|           80682.5|{Jonathan Gonsalv...|{Davi Lucas Perei...|2022-04-20 12:34:01|  celular|     1|transferencia|
|        1002|             549.9|{Jonathan Gonsalv...|{Sabrina Castro, ...|2022-07-10 16:51:34|      cpf|     0|        lazer|
|        1003|             90.83|{Jonathan Gonsalv...|{Francisco da Con...|2022-10-20 10:57:36|aleatoria|     0|   transporte|
|        1004|13272.619999999999|{Jonathan Gonsalv...|{Isabelly Ferreir...|2021-04-06 20:26:51|    email|     0

In [36]:
df.printSchema()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- remetente: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- destinatario: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- transaction_date: timestamp (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- fraude: integer (nullable = true)
 |-- categoria: string (nullable = true)



In [37]:
from pyspark.sql.functions import *
df_flatten = df.withColumns({
    'destinatario_nome':col('destinatario').getField('nome'),
    'destinatario_banco':col('destinatario').getField('banco'),
    'destinatario_tipo':col('destinatario').getField('tipo'),
}).drop('remetente','destinatario')

In [38]:
df_flatten.show()

+------------+------------------+-------------------+---------+------+-------------+--------------------+------------------+-----------------+
|id_transacao|             valor|   transaction_date|chave_pix|fraude|    categoria|   destinatario_nome|destinatario_banco|destinatario_tipo|
+------------+------------------+-------------------+---------+------+-------------+--------------------+------------------+-----------------+
|        1000|            588.08|2021-07-16 05:00:55|aleatoria|     0|       outros|         Calebe Melo|             Caixa|               PF|
|        1001|           80682.5|2022-04-20 12:34:01|  celular|     1|transferencia|  Davi Lucas Pereira|             Caixa|               PJ|
|        1002|             549.9|2022-07-10 16:51:34|      cpf|     0|        lazer|      Sabrina Castro|            Nubank|               PF|
|        1003|             90.83|2022-10-20 10:57:36|aleatoria|     0|   transporte|Francisco da Conc...|            Nubank|               PJ|

In [17]:
from pyspark.ml.feature import StringIndexer

In [39]:
indexer = StringIndexer(
    inputCols=[
        'destinatario_nome',
        'destinatario_banco',
        'destinatario_tipo',
        'categoria',
        'chave_pix'
    ],
    outputCols=[
        'destinatario_nome_index',
        'destinatario_banco_index',
        'destinatario_tipo_index',
        'categoria_index',
        'chave_pix_index'
    ]

)



In [40]:
df_index = indexer.fit(df_flatten).transform(df_flatten)

In [41]:
df_index.show()

+------------+------------------+-------------------+---------+------+-------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|id_transacao|             valor|   transaction_date|chave_pix|fraude|    categoria|   destinatario_nome|destinatario_banco|destinatario_tipo|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|categoria_index|chave_pix_index|
+------------+------------------+-------------------+---------+------+-------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|        1000|            588.08|2021-07-16 05:00:55|aleatoria|     0|       outros|         Calebe Melo|             Caixa|               PF|                12045.0|                     4.0|                    1.0|            6.0|            3.0|
|       

In [42]:
is_fraud = df_index.filter("fraude == 1")
no_fraud = df_index.filter("fraude == 0")

In [43]:
no_fraud = no_fraud.sample(False,0.01,seed=123)

In [44]:
df_concat = no_fraud.union(is_fraud)
df = df_concat.sort('transaction_date')
df.count()

16202

In [45]:
train,test = df.randomSplit([0.7,0.3],seed=123)

In [47]:
is_fraud = udf(lambda fraud: 1.0 if fraud >0 else 0.0, DoubleType())
train = train.withColumn('is_fraud',is_fraud(train.fraude))

In [50]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import pipeline

In [60]:
assembler = VectorAssembler(
    inputCols=[x for x in train.columns if x not in ['transaction_date','fraude','is_fraud','destinatario_nome','destinatario_tipo','destinatario_banco','chave_pix','categoria']],
    outputCol='features'
)

In [61]:
lr = LogisticRegression().setParams(
    maxIter=10000,
    labelCol = 'is_fraud',
    predictionCol= 'prediction'
)

In [62]:
model = pipeline.Pipeline(stages=[assembler,lr]).fit(train)

In [63]:
predicted = model.transform(test)

In [64]:
predicted.show()

+------------+--------+-------------------+---------+------+-------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+--------------------+--------------------+--------------------+----------+
|id_transacao|   valor|   transaction_date|chave_pix|fraude|    categoria|   destinatario_nome|destinatario_banco|destinatario_tipo|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|categoria_index|chave_pix_index|            features|       rawPrediction|         probability|prediction|
+------------+--------+-------------------+---------+------+-------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+--------------------+--------------------+--------------------+----------+
|        1011|21345.91|2021-10-31 04:31:51|      cpf|     1|tran

In [65]:
predicted = predicted.withColumn('is_fraud',is_fraud(predicted.fraude))

In [66]:
predicted.crosstab('is_fraud','prediction').show()

+-------------------+---+----+
|is_fraud_prediction|0.0| 1.0|
+-------------------+---+----+
|                1.0|  0|4682|
|                0.0|242|   0|
+-------------------+---+----+



In [71]:
df_teste_cols = [
    'id_transacao',
    'valor',
    'transaction_date',
    'destinatario_nome_index',
    'destinatario_banco_index',
    'destinatario_tipo_index',
    'chave_pix_index',
    'categoria_index',
    'fraude'
]
df_test_data = [
    (999,103.2,'2023-01-01 11:56:41',328.0,4.0,1.0,3.0,5.0,0),
    (124,500000.0,'2023-01-01 11:56:41',328.0,2.0,3.0,2.0,5.0,1)

]

df_teste = spark.createDataFrame(df_test_data).toDF(*df_teste_cols)

In [72]:
new_prediction = model.transform(df_teste)

In [73]:
new_prediction.show()

+------------+--------+-------------------+-----------------------+------------------------+-----------------------+---------------+---------------+------+--------------------+--------------------+-----------+----------+
|id_transacao|   valor|   transaction_date|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|chave_pix_index|categoria_index|fraude|            features|       rawPrediction|probability|prediction|
+------------+--------+-------------------+-----------------------+------------------------+-----------------------+---------------+---------------+------+--------------------+--------------------+-----------+----------+
|         999|   103.2|2023-01-01 11:56:41|                  328.0|                     4.0|                    1.0|            3.0|            5.0|     0|[999.0,103.2,328....|[426.491663444874...|  [1.0,0.0]|       0.0|
|         124|500000.0|2023-01-01 11:56:41|                  328.0|                     2.0|                    3.0|