# Entendimento do Negócio
Trablho em um banco e o principal meio de pagamento utilizado no seu banco é o Pix.

Através da base de transações do pix o banco deseja entender qual é o perfil dos clientes que utilizam o pix, além de verificar possíveis transações que tenham fraude. Porém, eles tem um cliente específico que tem um relacionamento muito bom, por isso, recebi base de transações de cliente dos últimos 2 anos e preciso a partir dela criar um relatório contendo as principais características das transações.


Então, resumindo, temos dois principais objetivos para esse case:
1. Obter valor a partir dos dados
  - Para qual banco esse cliente mais transfere?
  - Qual é a média de transferências por período que esse cliente faz?
  - Baseando-se no valor das transferências, poderia dar um aumento de crédito?
  - Para o que esse cliente mais usa as transferências?
2. Executar um algoritmo de machine learning que identifique possíveis transações com fraude.
3. Pós Processamento
  - Definir cinco métricas de qualidade para os dados
  - Explicar se os dados estão com uma boa qualidade

# Preparação do Ambiente de Desenvolvimento

In [None]:
# Instalar a última versão do PySpark
!pip install pyspark #==3.3.1

# Instalar o NGROK
!wget -qnc https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip -n -q ngrok-stable-linux-amd64.zip

# Autenticar a sessão do SparkUI com NGROK
!./ngrok authtoken 2KBeQEmmd1YNlQ86GGKf3KFOkb3_6sQH7JEnvEhDxwn9A7WnT
get_ipython().system_raw('./ngrok http 4050 &')
!sleep 10
!curl -s http://localhost:4040/api/tunnels | grep -Po 'public_url":"(?=https)\K[^"]*'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Data Undesrtanting

Primeiramente, devemos entender tudo sobre a fonte dos dados
- Como o dado chega até nós?
- Qual formato virá?
- Aonde o processamento será executado (AWS EMR, Cluster On-Premise)?
- De quanto em quanto tempo eu preciso gerar esse relatório (mensal, diário, near-real time)?

```json
{
  "id_transacao": inteiro,
  "valor": texto,
  "remetente": {
      "nome": texto,
      "banco": texto,
      "tipo": texto
  },
  "destinatario": {
      "nome": texto,
      "banco":texto,
      "tipo": texto
  },
  "categoria": texto,
  "transaction_date":texto,
  "chave_pix":texto,
  "fraude":inteiro,
}
```



# Preparação dos Dados


In [None]:
# Iniciar a sessão spark
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
      .config('spark.ui.port', '4050')
      .appName("CaseFinal")
      .getOrCreate()
)

In [None]:
from pyspark.sql.types import *

caminho_json = '/content/case_final.json'

schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType())
])


schema_base_pix = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('chave_pix', StringType()),
    StructField('categoria', StringType()),
    StructField('transaction_date', StringType()),
    StructField('fraude', IntegerType())
])


# 2022-10-20 10:57:36

df = spark.read.json(
    caminho_json,
    schema=schema_base_pix,
    timestampFormat="yyyy-MM-dd HH:mm:ss",
)

In [None]:
df.printSchema()

In [None]:
df.show()

In [None]:
from pyspark.sql.functions import col

df_flatten = df.withColumns({
    'remetente_nome':  col('remetente').getField('nome'),
    'remetente_banco':  col('remetente').getField('banco'),
    'remetente_tipo':  col('remetente').getField('tipo'),
    'destinatario_nome':  col('destinatario').getField('nome'),
    'destinatario_banco':  col('destinatario').getField('banco'),
    'destinatario_tipo':  col('destinatario').getField('tipo'),
}).drop('remetente', 'destinatario')

In [None]:
df_flatten.printSchema()

df_flatten.show()

In [None]:
df_flatten.describe().show()

# Modelagem
Aqui você encontrará utilidade para os dados levantados.

Aqui será onde teremos insights e, a partir desses, novos conhecimentos sobre o business (se tudo até aqui foi feito corretamente).


- Para qual banco esse cliente mais transfere?
- Qual é a média de transferências por período que esse cliente faz?
- Baseando-se no valor das transferências, poderia dar um aumento de crédito?
- Para o que esse cliente mais usa as transferências?
- Executar um algoritmo de machine learning que identifique possíveis transações com fraude.


In [None]:
# Para Qual Banco Mais Transfere
df_flatten.groupBy('destinatario_banco').count().orderBy('count').show()

In [None]:
# Qual Banco mais usa no Mês
from pyspark.sql.functions import date_format

df_flatten.groupBy(
    date_format(col('transaction_date'), 'yyyy-MM').alias('ano_mes'),
    'destinatario_banco'
).count().orderBy(col('ano_mes').desc()).show()

In [None]:
# Valor medio das transações
df_flatten.groupBy(
    'destinatario_banco'
).avg('valor').orderBy('avg(valor)').show()

In [None]:
# Categoria das Despesas
df_flatten.groupBy(
    date_format(col('transaction_date'), 'yyyy-MM').alias('ano_mes'),
    'destinatario_banco',
    'categoria'
).count().orderBy('ano_mes').show()

In [None]:
# soma de quanto ele gasta em cada categoria por mês
from pyspark.sql.functions import date_format

df_flatten.groupBy(
    date_format(col('transaction_date'), 'yyyy').alias('ano'),
    'categoria'
).sum('valor').select('ano', 'categoria', col('sum(valor)').cast(DecimalType(38, 3)).alias('valor')).orderBy('valor').show(30)

In [None]:
# média de transferencia por período

from pyspark.sql.functions import date_format

df_flatten.groupBy(
    date_format(col('transaction_date'), 'yyyy').alias('ano')
).avg('id_transacao').select('ano',col('avg(id_transacao)').alias('avg')).orderBy('ano').show(30)

In [None]:
# Quantidade de trasanções com fraude
df_flatten.groupBy('fraude').count().show()


In [None]:
# Qual categoria das trasanções com fraude
df_flatten.groupBy('categoria', 'fraude').count().show()

In [None]:
# Filtro de Transferencias com fraude
df_flatten.filter(
    col('categoria') == 'transferencia'
).groupBy('categoria', 'fraude').count().show()

In [None]:
# Coluna Range para indetificar numero de Tranferencias com Fraude por valor
from pyspark.sql.functions import floor

df_flatten.filter(col('fraude') == 1).withColumn(
    "range",
    floor(col("valor")/1000)*1000
).groupBy('range').count().orderBy(col('range').desc()).show()

In [None]:
# Coluna Range para indetificar numero de Tranferencias com Fraude por valor maximo e minimo
from pyspark.sql.functions import floor, max, min

df_flatten.filter(
    col('fraude') == 1
).withColumn(
    "range", floor(col("valor")/1000)*1000
).select(max('range'), min('range')).show()

## Modelo de Predição de Fraudes

In [None]:
# Importar Bibliotecas necessarias
from pyspark.sql.functions import col, udf
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [None]:
# Excluir colunas disnecessarias
df = df_flatten.drop('remetente', 'id')

In [None]:
# Criar Indexer
indexer = StringIndexer(
    inputCols=[
        "destinatario_nome",
        "destinatario_banco",
        "destinatario_tipo",
        "categoria",
        "chave_pix"
    ],
    outputCols=[
        "destinatario_nome_index",
        "destinatario_banco_index",
        "destinatario_tipo_index",
        "categoria_index",
        "chave_pix_index"
    ])

In [None]:
# Tranformar Dataframe apartir do index
df_index = indexer.fit(df).transform(df)
df_index.show()

In [None]:
# Filtrar Colunas
cols_para_filtrar = [
  "valor",
  "transaction_date",
  "destinatario_nome_index",
  "destinatario_banco_index",
  "destinatario_tipo_index",
  "chave_pix_index",
  "categoria_index",
  "fraude"
]

In [None]:
# Separ Dataframes com e sem Fraude
is_fraud = df_index.select(cols_para_filtrar).filter("fraude == 1")
no_fraud = df_index.select(cols_para_filtrar).filter("fraude == 0")


no_fraud = no_fraud.sample(False, 0.01, seed = 123)

In [None]:
# Conectar dataframes
df_concat = no_fraud.union(is_fraud)
df = df_concat.sort("transaction_date")
df.count()

In [None]:
# Dataframe de Treino e teste
train, test = df.randomSplit([0.7, 0.3], seed = 123)
print("train =", train.count(), " test =", test.count())

In [None]:
# Criar função UDF
is_fraud = udf(lambda fraud: 1.0 if fraud > 0 else 0.0, DoubleType())
train = train.withColumn("is_fraud", is_fraud(train.fraude))

In [None]:
# Crie os vetores de características.
# VectorAssembler é um transformador que combina uma dada lista de colunas em uma única coluna de vetor.
assembler = VectorAssembler(
  inputCols = [x for x in train.columns if x not in ["transaction_date", "fraude", "is_fraud"]],
  outputCol = "features")

# Use Regressão Logística.
# é um algoritmo de aprendizado de máquina que é usado para tarefas de classificação.
lr = LogisticRegression().setParams(
    maxIter = 100000,
    labelCol = "is_fraud",
    predictionCol = "prediction")


# Isto irá treinar um modelo de regressão logística nos dados de entrada e retornar um
# objeto LogisticRegressionModel que pode ser usado para fazer previsões em novos dados.
model = Pipeline(stages = [assembler, lr]).fit(train)

In [None]:
# Modelo Predição
predicted = model.transform(test)

predicted = predicted.withColumn("is_fraud", is_fraud(predicted.fraude))
predicted.crosstab("is_fraud", "prediction").show()

# Avaliação do Modelo

1.   O modelo identifica corretamente transações acima de R$ 20 mil como fraudulentas?  Sim

2.   Ele consegue responder quais são os bancos e categorias mais utilizados? Sim

3. A acurácia de 95% atende ao mínimo exigido? Sim

# Deployment
Apresente o relatório com os resultados obtidos.

Foi identificado que o cliente Jonathan Gonsalve tem uma alta taxa de transaferências pix mensal. Através de análise realizadas foi possível perceber que a maior categoria de transação é a transferência bancária.

Nota-se que o segundo banco que o cliente mais transaciona é o banco BTG, porém, o mesmo possui o menor valor transacionado, o que indica que o cliente faz muitas transações de menor valor para esse banco.

Também foi possível verificar que há um alto índice de tentativas de fraude na conta desse cliente, sendo que todas as tentativas de fraude foram com valores acima de R$19.999,00 e com a categoria de transferência. Por isso, foi criado um algoritimo de machine learning que identifica esses tipos de transações que contém fraude.


Conclui-se que há uma alta tentativa de transações com fraude e uma ação possível seria diminuir o limite máximo de transferência de pix do cliente.Possivelmente o cliente esteja usando essa conta PF com propósitos de PJ, devido a alta taxa de transferências s altos valores.  