# Análises de transações bancarias utilizando Pyspark

### Objetivos
- Limpar e pré-processar os dados das transações PIX
- Analisar padrões de uso do PIX, tais como os canais mais utilizados e os valores de transação mais comuns
- Usar o PySpark MLlib para treinar e avaliar um modelo de detecção de fraude
- Avaliar o desempenho do modelo e fazer recomendações para melhorias futuras

### Dados

O conjunto de dados inclui as seguintes informações para cada transação:
- Detalhes da transação: valor, tempo, remetente e receptor CPF/CNPJ, tipo
- Etiqueta de fraude: uma variável binária que indica se a transação foi fraudulenta (1) ou não (0)

### STEPES
- Normalização dos dados:
  - O dataset está em formato json.
  ```json
  {
      "id_transacao": inteiro,
      "valor": texto,
      "remetente": {
          "nome": texto,
          "banco": texto,
          "tipo": texto
      },
      "destinatario": {
          "nome": texto,
          "banco":texto,
          "tipo": texto
      },          
      "categoria":texto,
      "chave_pix":texto,
      "transaction_date":texto,
      "fraude":inteiro,
  }
    ```
  - Fazer sua transformação para formato colunar
- Análise Exploratória de Dados: Use o PySpark para analisar padrões de uso do PIX:
  - chaves pix mais usadas;
  - os valores de transação mais comuns;
  - distribuição dos valores de transação por hora e dia;
  - quais bancos receberam mais transferências por dia;
  - para qual tipo de pessoa (PF ou PJ) foram realizadas mais transações
- Engenharia de Recursos: Apresentar novas características que podem ser úteis para a detecção de fraudes, tais como o número de transações feitas pelo mesmo remetente em um período de tempo específico.
- Modelagem: Use o PySpark MLlib para treinar e detectar possíveis transações que contenham fraude.

### Observação
É importante notar que este é um caso simplificado, e em cenários do mundo real você teria que lidar com dados mais complexos, usar técnicas mais avançadas como métodos de conjuntos e considerar o conhecimento de domínio, bem como leis e regulamentos das instituições financeiras no Brasil.


Para iniciar essa análise vamos iniciar instalando o Spark como estamos realizando no colab toda vez que rodar tem que instalar a apliação spark

In [1]:
!pip install pyspark #==3.3.1
# Instalar o NGROK




Apos instalada o spark inicie uma nova sessão

In [5]:
# Iniciar a sessão spark
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
      .config('spark.ui.port', '4050')
      .appName("CaseFinal")
      .getOrCreate()
)

Nosso data.set é um json na hora de importar ja da pra setar os tipos de dados como esta descrito no codigo abaixo ou se preferir importe primeiro de um printSchema para verificar os tipos de variaveis depois prepare as colunas

In [7]:
from pyspark.sql.types import *

caminho_json = './case_final.json'

schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType())
])


schema_base_pix = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('chave_pix', StringType()),
    StructField('categoria', StringType()),
    StructField('transaction_date', StringType()),
    StructField('fraude', IntegerType())
])


# 2022-10-20 10:57:36

df = spark.read.json(
    caminho_json,
    schema=schema_base_pix,
    timestampFormat="yyyy-MM-dd HH:mm:ss",
)

In [8]:
df.printSchema()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- remetente: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- destinatario: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)



df.show()

In [11]:
from pyspark.sql.functions import col

df_flatten = df.withColumns({
    'remetente_nome':  col('remetente').getField('nome'),
    'remetente_banco':  col('remetente').getField('banco'),
    'remetente_tipo':  col('remetente').getField('tipo'),
    'destinatario_nome':  col('destinatario').getField('nome'),
    'destinatario_banco':  col('destinatario').getField('banco'),
    'destinatario_tipo':  col('destinatario').getField('tipo'),
}).drop('remetente', 'destinatario')

In [12]:
df_flatten.printSchema()

df_flatten.show()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)
 |-- remetente_nome: string (nullable = true)
 |-- remetente_banco: string (nullable = true)
 |-- remetente_tipo: string (nullable = true)
 |-- destinatario_nome: string (nullable = true)
 |-- destinatario_banco: string (nullable = true)
 |-- destinatario_tipo: string (nullable = true)

+------------+------------------+---------+-------------+-------------------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+
|id_transacao|             valor|chave_pix|    categoria|   transaction_date|fraude|    remetente_nome|remetente_banco|remetente_tipo|   destinatario_nome|destinatario_banco|destinatario_tipo|
+------------+------------------+---------+-------------+-------------

In [13]:
df_flatten.describe().show()

+-------+-----------------+------------------+---------+-----------+-------------------+------------------+------------------+---------------+--------------+-----------------+------------------+-----------------+
|summary|     id_transacao|             valor|chave_pix|  categoria|   transaction_date|            fraude|    remetente_nome|remetente_banco|remetente_tipo|destinatario_nome|destinatario_banco|destinatario_tipo|
+-------+-----------------+------------------+---------+-----------+-------------------+------------------+------------------+---------------+--------------+-----------------+------------------+-----------------+
|  count|           100000|            100000|   100000|     100000|             100000|            100000|            100000|         100000|        100000|           100000|            100000|           100000|
|   mean|          50999.5|10303.358732200059|     NULL|       NULL|               NULL|           0.15367|              NULL|           NULL|      

# Modelagem
Aqui você encontrará utilidade para os dados levantados.

Aqui será onde teremos insights e, a partir desses, novos conhecimentos sobre o business (se tudo até aqui foi feito corretamente).


- Para qual banco esse cliente mais transfere?
- Qual é a média de transferências por período que esse cliente faz?
- Baseando-se no valor das transferências, poderia dar um aumento de crédito?
- Para o que esse cliente mais usa as transferências?
- Executar um algoritmo de machine learning que identifique possíveis transações com fraude.


In [None]:
df_flatten.groupBy('destinatario_banco').count().orderBy('count').show()

Vamos tratar alguns agrupamentos como para qual banco que que ele mais tem transação recebida.

In [18]:
from pyspark.sql.functions import date_format, round, col

df_flatten.groupBy(
    date_format(col('transaction_date'), 'yyyy-MM').alias('ano_mes'),
    'destinatario_banco'
).count().orderBy(col('ano_mes').desc()).show()

+-------+------------------+-----+
|ano_mes|destinatario_banco|count|
+-------+------------------+-----+
|2023-01|              Itau|  267|
|2023-01|             Caixa|  277|
|2023-01|                XP|  277|
|2023-01|          Bradesco|  280|
|2023-01|            Nubank|  290|
|2023-01|                C6|  290|
|2023-01|               BTG|  278|
|2022-12|                XP|  615|
|2022-12|               BTG|  603|
|2022-12|                C6|  576|
|2022-12|          Bradesco|  575|
|2022-12|            Nubank|  602|
|2022-12|              Itau|  633|
|2022-12|             Caixa|  616|
|2022-11|          Bradesco|  579|
|2022-11|               BTG|  580|
|2022-11|              Itau|  614|
|2022-11|            Nubank|  620|
|2022-11|             Caixa|  543|
|2022-11|                C6|  561|
+-------+------------------+-----+
only showing top 20 rows



Agora vamos avaliar a media de transferencia recebida por cada banco

In [19]:
df_flatten.groupBy('destinatario_banco') \
    .avg('valor') \
    .withColumn('avg(valor)', round(col('avg(valor)'), 2)) \
    .orderBy('avg(valor)') \
    .show()


+------------------+----------+
|destinatario_banco|avg(valor)|
+------------------+----------+
|               BTG|   10122.3|
|              Itau|  10230.88|
|             Caixa|  10254.86|
|                C6|   10309.5|
|            Nubank|  10316.48|
|                XP|  10328.07|
|          Bradesco|  10564.19|
+------------------+----------+



Podemos avaliar o comportamentos dessas dispesas e verificar a quantidade de transferências mensais e a categoria de cada uma destas trasferencia

In [20]:
df_flatten.groupBy(
    date_format(col('transaction_date'), 'yyyy-MM').alias('ano_mes'),
    'destinatario_banco',
    'categoria'
).count().orderBy('ano_mes').show()

+-------+------------------+-------------+-----+
|ano_mes|destinatario_banco|    categoria|count|
+-------+------------------+-------------+-----+
|2021-01|             Caixa|     educacao|   24|
|2021-01|              Itau|        saude|   40|
|2021-01|            Nubank|        saude|   28|
|2021-01|                XP|    presentes|   38|
|2021-01|               BTG|   transporte|   29|
|2021-01|               BTG|  alimentacao|   31|
|2021-01|              Itau|transferencia|   75|
|2021-01|                XP|       outros|   34|
|2021-01|                XP|   transporte|   26|
|2021-01|          Bradesco|transferencia|   96|
|2021-01|              Itau|    vestuario|   28|
|2021-01|               BTG|        lazer|   34|
|2021-01|          Bradesco|   transporte|   40|
|2021-01|                XP|transferencia|   89|
|2021-01|                C6|  alimentacao|   38|
|2021-01|             Caixa|        lazer|   37|
|2021-01|          Bradesco|       outros|   25|
|2021-01|           

E podemos fazer a soma de quanto ele gasta por cada categoria
vamos exibir os 30 primeiros resultados

In [23]:
# soma de quanto ele gasta em cada categoria por mês
df_flatten.groupBy(
    date_format(col('transaction_date'), 'yyyy').alias('ano'),
    'categoria'
).sum('valor').select('ano', 'categoria', col('sum(valor)').cast(DecimalType(38, 2)).alias('valor')).orderBy('valor').show(30)

+----+-------------+------------+
| ano|    categoria|       valor|
+----+-------------+------------+
|2023|    presentes|   362584.45|
|2023|  alimentacao|   392078.31|
|2023|        saude|   400683.64|
|2023|   transporte|   427790.54|
|2023|     educacao|   432305.66|
|2023|    vestuario|   459528.70|
|2023|        lazer|   469671.41|
|2023|       outros|   501308.34|
|2021|   transporte|  9497893.47|
|2021|        lazer|  9622503.14|
|2021|       outros|  9737076.38|
|2022|        saude| 10048245.07|
|2021|     educacao| 10106095.07|
|2021|  alimentacao| 10237928.25|
|2022|        lazer| 10295747.63|
|2022|    presentes| 10311585.74|
|2022|     educacao| 10367124.44|
|2021|        saude| 10384662.82|
|2021|    vestuario| 10412651.93|
|2021|    presentes| 10496042.02|
|2022|  alimentacao| 10545783.23|
|2022|   transporte| 10553408.29|
|2022|       outros| 10566645.10|
|2022|    vestuario| 10696006.48|
|2023|transferencia| 16148682.62|
|2021|transferencia|416670741.70|
|2022|transfer

Outro insid que da para extrair é uma média de trasferencia por período

In [25]:
# média de transferencia por período
df_flatten.groupBy(
    date_format(col('transaction_date'), 'yyyy').alias('ano')
).avg('id_transacao') \
 .select(
     col('ano'),
     round(col('avg(id_transacao)'), 2).alias('avg')
 ) \
 .orderBy('ano') \
 .show(30)

+----+--------+
| ano|     avg|
+----+--------+
|2021|51204.04|
|2022|50823.49|
|2023|50481.68|
+----+--------+



Podemos verificar quantas das transferencias são fraudes, esta é uma variável binária 1 indica transação fraudulenta e 0 corresponde não fraude

In [26]:
df_flatten.groupBy('fraude').count().show()

+------+-----+
|fraude|count|
+------+-----+
|     1|15367|
|     0|84633|
+------+-----+



Nota - se que mais de 84 mil registros são transações não fraudulenta e apenas 15 mil fraudes.

Tambem podemos verificar quais categoria possui essas transferencias fraudulentas

In [27]:
df_flatten.groupBy('categoria', 'fraude').count().show()

+-------------+------+-----+
|    categoria|fraude|count|
+-------------+------+-----+
|       outros|     0| 9377|
|     educacao|     0| 9460|
|transferencia|     0| 9377|
|transferencia|     1|15367|
|    presentes|     0| 9254|
|        saude|     0| 9476|
|        lazer|     0| 9464|
|   transporte|     0| 9174|
|    vestuario|     0| 9503|
|  alimentacao|     0| 9548|
+-------------+------+-----+



Nota que a única modalidade que tem fraude é transferencia.

In [28]:
df_flatten.filter(
    col('categoria') == 'transferencia'
).groupBy('categoria', 'fraude').count().show()

+-------------+------+-----+
|    categoria|fraude|count|
+-------------+------+-----+
|transferencia|     0| 9377|
|transferencia|     1|15367|
+-------------+------+-----+



Agora podemos fazer um range de valores das transações

In [29]:
from pyspark.sql.functions import floor

df_flatten.filter(col('fraude') == 1).withColumn(
    "range",
    floor(col("valor")/1000)*1000
).groupBy('range').count().orderBy(col('range').desc()).show()

+-----+-----+
|range|count|
+-----+-----+
|89000|  222|
|88000|  208|
|87000|  230|
|86000|  203|
|85000|  205|
|84000|  245|
|83000|  206|
|82000|  206|
|81000|  214|
|80000|  213|
|79000|  205|
|78000|  230|
|77000|  237|
|76000|  232|
|75000|  190|
|74000|  207|
|73000|  237|
|72000|  234|
|71000|  234|
|70000|  222|
+-----+-----+
only showing top 20 rows



Agora vamos verificar nesse range so os valores que tem fraude para verificar se as fraudes ocorrem dentro de um range.

In [30]:
from pyspark.sql.functions import floor, max, min

df_flatten.filter(
    col('fraude') == 1
).withColumn(
    "range", floor(col("valor")/1000)*1000
).select(max('range'), min('range')).show()

+----------+----------+
|max(range)|min(range)|
+----------+----------+
|     89000|     19000|
+----------+----------+



Note que a faixa de valores que possui fraude esta entre 19 mil a 89 mil não existe fraude em valores menores que 19 mil. Após esses insids em descritivas vamos ir para o Step de modelagem

## Modelo de Predição de Fraudes
Vamos importar algumas bibliotecas para o nosso modelo

In [31]:
from pyspark.sql.functions import col, udf

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

Vamos dropar algumas colunas para que não serve para o nosso modelo em específico.

In [32]:
df = df_flatten.drop('remetente', 'id')

In [33]:
indexer = StringIndexer(
    inputCols=[
        "destinatario_nome",
        "destinatario_banco",
        "destinatario_tipo",
        "categoria",
        "chave_pix"
    ],
    outputCols=[
        "destinatario_nome_index",
        "destinatario_banco_index",
        "destinatario_tipo_index",
        "categoria_index",
        "chave_pix_index"
    ])

In [34]:
df_index = indexer.fit(df).transform(df)
df_index.show()

+------------+------------------+---------+-------------+-------------------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|id_transacao|             valor|chave_pix|    categoria|   transaction_date|fraude|    remetente_nome|remetente_banco|remetente_tipo|   destinatario_nome|destinatario_banco|destinatario_tipo|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|categoria_index|chave_pix_index|
+------------+------------------+---------+-------------+-------------------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|        1000|            588.08|aleatoria|       outros|2021-07-16 05:00:55|     0|Jonathan Gonsalves|   

para esta análise podemos apenas usar algumas colunas então vamos selecionar abaixo.

In [35]:
cols_para_filtrar = [
  "valor",
  "transaction_date",
  "destinatario_nome_index",
  "destinatario_banco_index",
  "destinatario_tipo_index",
  "chave_pix_index",
  "categoria_index",
  "fraude"
]

Vamos separa dois data.frame o que possui fraude e o que não possui fraude. deposis vamos pegar uma amostra dos dados que não tem fraude e vamos reunir um data.frame posteriormente

In [36]:
is_fraud = df_index.select(cols_para_filtrar).filter("fraude == 1")
no_fraud = df_index.select(cols_para_filtrar).filter("fraude == 0")


no_fraud = no_fraud.sample(False, 0.01, seed = 123)

Vamos unir o dado que tem fraude que foi selecionado ao acaso juntamente com os que possui fraude

In [37]:
df_concat = no_fraud.union(is_fraud)
df = df_concat.sort("transaction_date")
df.count()

16202

In [38]:
train, test = df.randomSplit([0.7, 0.3], seed = 123)
print("train =", train.count(), " test =", test.count())

train = 11278  test = 4924


vamos criar um rotulo de que é fraude.

In [39]:
is_fraud = udf(lambda fraud: 1.0 if fraud > 0 else 0.0, DoubleType())
train = train.withColumn("is_fraud", is_fraud(train.fraude))

vamos utilizar o VectorAssembler que combina a lista de colunas em um vetor e Pipline ira treinar o nosso modelo

In [40]:
# Create the feature vectors.
# VectorAssembler is a transformer that combines a given list of columns into a single vector column.
assembler = VectorAssembler(
  inputCols = [x for x in train.columns if x not in ["transaction_date", "fraude", "is_fraud"]],
  outputCol = "features")

# Use Logistic Regression.
# is a machine learning algorithm that is used for classification tasks
lr = LogisticRegression().setParams(
    maxIter = 100000,
    labelCol = "is_fraud",
    predictionCol = "prediction")


# This will train a logistic regression model on the input data and return a
# LogisticRegressionModel object which can be used to make predictions on new data.
model = Pipeline(stages = [assembler, lr]).fit(train)

Com o modelo treinado podemos realizar um predict do nosso modelo.

In [41]:
predicted = model.transform(test)

predicted = predicted.withColumn("is_fraud", is_fraud(predicted.fraude))
predicted.crosstab("is_fraud", "prediction").show()

+-------------------+---+----+
|is_fraud_prediction|0.0| 1.0|
+-------------------+---+----+
|                1.0|  0|4660|
|                0.0|262|   2|
+-------------------+---+----+



# Avaliação do Modelo
Será que seu modelo atingiu todas as necessidades que foram definidas inicialmente?

---
# Resultados


Foi identificado que o cliente Jonathan Gonsalve tem uma alta taxa de transaferências pix mensal. Através de análise realizadas foi possível perceber que a maior categoria de transação é a transferência bancária.

Também foi possível notar que o segundo banco que o cliente mais transaciona é o banco BTG, porém, o mesmo possui o menor valor transacionado, o que indica que o cliente faz muitas transações de menor valor para esse banco.

Também foi possível verificar que há um alto índice de tentativas de fraude na conta desse cliente, sendo que todas as tentativas de fraude foram com valores acima de R$19.999,00 e com a categoria de transferência. Por isso, foi criado um algoritimo de machine learning que identifica esses tipos de transações que contém fraude.


Conclui-se que há uma alta tentativa de transações com fraude e uma ação possível seria diminuir o limite máximo de transferência de pix do cliente.Possivelmente o cliente esteja usando essa conta PF com propósitos de PJ, devido a alta taxa de transferências s altos valores.  

