In [111]:
import os

# Defina a variável de ambiente SPARK_VERSION com a versão do Spark que você está usando
os.environ["SPARK_VERSION"] = "3.0"
import pandas as pd
from pyspark.ml import Pipeline
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import (
    udf,
    col,
    year,
    round,
    date_format,
    to_timestamp,
)
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    DoubleType,
    StringType,
    TimestampType
)
from pydeequ.analyzers import (
    AnalysisRunner, 
    AnalyzerContext,
    ApproxCountDistinct,
    Completeness,
    Compliance,
    Mean,
    Size,
)
from pydeequ.checks import (
    Check,
    CheckLevel,
    ConstrainableDataTypes
)
from pydeequ.verification import (
    VerificationResult,
    VerificationSuite
)
from pydeequ.suggestions import (
    ConstraintSuggestionRunner,
    DEFAULT
)

Fonte: [spark.apache.org](https://spark.apache.org/)

# Spark - RDD:

O que é um RDD?

O que é o SparkContext?

In [112]:
sc = SparkContext(
    master='local',
    appName='Primeiro Contato com o spark'
)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Primeiro Contato com o spark, master=local) created by __init__ at /tmp/ipykernel_11341/121648590.py:1 

## RDD:

In [None]:
path = r'../../../data/rdd_dataset.txt'
rdd = sc.textFile(
    path
)

In [None]:
rdd.collect()

['Cenoura',
 'Otimista',
 'Solitário',
 'Imperfeição',
 'Descoberta',
 'Fantasia',
 'DNC',
 'Maravilhoso',
 'Criatividade',
 'Compreensão',
 'Atraente',
 'Festa',
 'Intenção',
 'Encontro',
 'Destino',
 'Sucesso',
 'Conquistar',
 'Simplicidade',
 'Paz',
 'Existência',
 'Poderoso',
 'DNC',
 'Conseguir',
 'Forte',
 'Alegria',
 'Espírito',
 'Mudança',
 'Coragem',
 'Determinação',
 'DNC']

## Código:

In [None]:
class SparkRDD:
    pass

if __name__ == '__main__':
    spark_rdd = SparkRDD()

## Output:

Filtrando por DNC?

In [None]:
rdd_dnc = rdd.filter(lambda linha: 'DNC' in linha)
rdd_dnc.collect()

                                                                                

['DNC', 'DNC', 'DNC']

Quantas vezes a palavra 'DNC' aparece?

In [None]:
rdd_dnc.count()

3

#  PySpark - Leitura de Dados:

O que é o SparkSession?

In [None]:
spark = SparkSession.builder.appName("Primeiro dataframe no spark").getOrCreate()

## DataFrame:

In [None]:
path = r'../../../data/base_de_dados.csv'
df = spark.read.csv(
    path,
    sep=';',
    header=True,
)

In [None]:
df.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+----------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|  data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+----------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|18/02/2022 13:28|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|08/04/2022 01:47|
|  3|   57.58|    Arthur Goncalves|          

## Código:

In [None]:
class LeituraDeDados:
    pass

if __name__ == "__main__":
    leitura_de_dados = LeituraDeDados()

## Output:

Quais são os nomes das colunas?

In [None]:
df.schema.fieldNames()

['id',
 'valor',
 'parte_debitada_nome',
 'parte_debitada_conta',
 'parte_debitada_banco',
 'parte_creditada_nome',
 'parte_creditada_conta',
 'parte_creditada_banco',
 'chave_pix_tipo',
 'chave_pix_valor',
 'data_transacao']

Quais são os tipos?

In [None]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- valor: string (nullable = true)
 |-- parte_debitada_nome: string (nullable = true)
 |-- parte_debitada_conta: string (nullable = true)
 |-- parte_debitada_banco: string (nullable = true)
 |-- parte_creditada_nome: string (nullable = true)
 |-- parte_creditada_conta: string (nullable = true)
 |-- parte_creditada_banco: string (nullable = true)
 |-- chave_pix_tipo: string (nullable = true)
 |-- chave_pix_valor: string (nullable = true)
 |-- data_transacao: string (nullable = true)



Como alterar os tipos da colunas?

In [None]:
schemas_pix = StructType([
    StructField('id', IntegerType()),
    StructField('value', DoubleType()),
    StructField('parte_debitada_nome', StringType()),
    StructField('parte_debitada_conta', StringType()),
    StructField('parte_debitada_banco', StringType()),
    StructField('parte_creditada_nome', StringType()),
    StructField('parte_creditada_conta', StringType()),
    StructField('parte_creditada_banco', StringType()),
    StructField('chave_pix_tipo', StringType()),
    StructField('chave_pix_valor', StringType()),
    StructField('data_transacao', TimestampType()),

])

In [None]:
df = spark.read.csv(
    path=path,
    header=True,
    schema=schemas_pix,
    sep=";",
    timestampFormat="dd/MM/yyy HH:mm"
)

In [None]:
df.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   value| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

23/08/24 09:02:09 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: id, valor, parte_debitada_nome, parte_debitada_conta, parte_debitada_banco, parte_creditada_nome, parte_creditada_conta, parte_creditada_banco, chave_pix_tipo, chave_pix_valor, data_transacao
 Schema: id, value, parte_debitada_nome, parte_debitada_conta, parte_debitada_banco, parte_creditada_nome, parte_creditada_conta, parte_creditada_banco, chave_pix_tipo, chave_pix_valor, data_transacao
Expected: value but found: valor
CSV file: file:///home/guerrlr0/Documentos/010101110101/Programação/Samuel/Projetos/Python/outros/DataExpert/core/data/base_de_dados.csv


# PySpark- Manipulação de Dados I:

## DataFrame:

In [None]:
df = spark.read.csv(
    path=path,
    header=True,
    sep=';',
    # inferSchema=True
)

In [None]:
df.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+----------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|  data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+----------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|18/02/2022 13:28|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|08/04/2022 01:47|
|  3|   57.58|    Arthur Goncalves|          

## Código:

In [None]:
class ManipulaçãoDeDadosI:
    pass

## Output:

Quais são os tipos?

In [None]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- valor: string (nullable = true)
 |-- parte_debitada_nome: string (nullable = true)
 |-- parte_debitada_conta: string (nullable = true)
 |-- parte_debitada_banco: string (nullable = true)
 |-- parte_creditada_nome: string (nullable = true)
 |-- parte_creditada_conta: string (nullable = true)
 |-- parte_creditada_banco: string (nullable = true)
 |-- chave_pix_tipo: string (nullable = true)
 |-- chave_pix_valor: string (nullable = true)
 |-- data_transacao: string (nullable = true)



Como altera o tipo de uma coluna?

In [None]:
df_cast = (
    df
    .withColumn(
        'id', col('id').cast('int')
    ).withColumn(
        'valor', col('valor').cast('double')
    ).withColumn(
        'data_transacao', to_timestamp('data_transacao', 'dd/MM/yyy HH:mm')
    )
)

In [None]:
df_cast.printSchema()

root
 |-- id: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- parte_debitada_nome: string (nullable = true)
 |-- parte_debitada_conta: string (nullable = true)
 |-- parte_debitada_banco: string (nullable = true)
 |-- parte_creditada_nome: string (nullable = true)
 |-- parte_creditada_conta: string (nullable = true)
 |-- parte_creditada_banco: string (nullable = true)
 |-- chave_pix_tipo: string (nullable = true)
 |-- chave_pix_valor: string (nullable = true)
 |-- data_transacao: timestamp (nullable = true)



Como ficou o DataFrame?

In [None]:
df_cast.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

Como vejo somente as colunas 'id' e 'valor'?

In [None]:
df_cast.select(
    'id',
    'valor',
).show()

+---+--------+
| id|   valor|
+---+--------+
|  1|    9.93|
|  2|   15.38|
|  3|   57.58|
|  4|53705.13|
|  5|25299.69|
|  6| 7165.06|
|  7|    6.16|
|  8|  136.36|
|  9|  574.39|
| 10|   42.88|
| 11|33629.97|
| 12| 4374.56|
| 13|  507.18|
| 14|67758.87|
| 15|  815.53|
| 16|    2.73|
| 17|    0.54|
| 18|49836.72|
| 19|    9.68|
| 20| 9837.22|
+---+--------+
only showing top 20 rows



Como crio uma nova coluna?

In [None]:
df_dolar = df_cast.select(
    'id',
    'valor',
).withColumn(
    'valor_dolar',
    round(col('valor')*2)
)

In [None]:
df_dolar.show()

+---+--------+-----------+
| id|   valor|valor_dolar|
+---+--------+-----------+
|  1|    9.93|       20.0|
|  2|   15.38|       31.0|
|  3|   57.58|      115.0|
|  4|53705.13|   107410.0|
|  5|25299.69|    50599.0|
|  6| 7165.06|    14330.0|
|  7|    6.16|       12.0|
|  8|  136.36|      273.0|
|  9|  574.39|     1149.0|
| 10|   42.88|       86.0|
| 11|33629.97|    67260.0|
| 12| 4374.56|     8749.0|
| 13|  507.18|     1014.0|
| 14|67758.87|   135518.0|
| 15|  815.53|     1631.0|
| 16|    2.73|        5.0|
| 17|    0.54|        1.0|
| 18|49836.72|    99673.0|
| 19|    9.68|       19.0|
| 20| 9837.22|    19674.0|
+---+--------+-----------+
only showing top 20 rows



Como removo a coluna 'valor_dolar'?

In [None]:
df_dolar.drop('valor_dolar').show()

+---+--------+
| id|   valor|
+---+--------+
|  1|    9.93|
|  2|   15.38|
|  3|   57.58|
|  4|53705.13|
|  5|25299.69|
|  6| 7165.06|
|  7|    6.16|
|  8|  136.36|
|  9|  574.39|
| 10|   42.88|
| 11|33629.97|
| 12| 4374.56|
| 13|  507.18|
| 14|67758.87|
| 15|  815.53|
| 16|    2.73|
| 17|    0.54|
| 18|49836.72|
| 19|    9.68|
| 20| 9837.22|
+---+--------+
only showing top 20 rows



Como renomeo o nome da coluna 'valor_dolar'?

In [None]:
df_dolar.withColumnRenamed(
    'valor_dolar',
    'dolar_valor'
).show()

+---+--------+-----------+
| id|   valor|dolar_valor|
+---+--------+-----------+
|  1|    9.93|       20.0|
|  2|   15.38|       31.0|
|  3|   57.58|      115.0|
|  4|53705.13|   107410.0|
|  5|25299.69|    50599.0|
|  6| 7165.06|    14330.0|
|  7|    6.16|       12.0|
|  8|  136.36|      273.0|
|  9|  574.39|     1149.0|
| 10|   42.88|       86.0|
| 11|33629.97|    67260.0|
| 12| 4374.56|     8749.0|
| 13|  507.18|     1014.0|
| 14|67758.87|   135518.0|
| 15|  815.53|     1631.0|
| 16|    2.73|        5.0|
| 17|    0.54|        1.0|
| 18|49836.72|    99673.0|
| 19|    9.68|       19.0|
| 20| 9837.22|    19674.0|
+---+--------+-----------+
only showing top 20 rows



Como vejo todos os pix do tipo 'cpf'?

In [None]:
df_cast.select(
    'id',
    'valor',
    'chave_pix_tipo'
).filter(
    col('chave_pix_tipo') == 'cpf'
).show()

+---+--------+--------------+
| id|   valor|chave_pix_tipo|
+---+--------+--------------+
|  1|    9.93|           cpf|
|  2|   15.38|           cpf|
|  3|   57.58|           cpf|
|  4|53705.13|           cpf|
|  5|25299.69|           cpf|
|  6| 7165.06|           cpf|
|  7|    6.16|           cpf|
|  8|  136.36|           cpf|
|  9|  574.39|           cpf|
| 10|   42.88|           cpf|
| 11|33629.97|           cpf|
| 12| 4374.56|           cpf|
| 13|  507.18|           cpf|
| 14|67758.87|           cpf|
| 15|  815.53|           cpf|
| 16|    2.73|           cpf|
| 17|    0.54|           cpf|
| 18|49836.72|           cpf|
| 41|   60.55|           cpf|
| 42|    6.62|           cpf|
+---+--------+--------------+
only showing top 20 rows



Como vejo todos os pixs que não são do tipo 'cpf'?

In [None]:
df_cast.select(
    'id',
    'valor',
    'chave_pix_tipo',
).filter(
    col('chave_pix_tipo') != 'cpf' 
).show()

+---+--------+--------------+
| id|   valor|chave_pix_tipo|
+---+--------+--------------+
| 19|    9.68|       celular|
| 20| 9837.22|       celular|
| 21|    9.36|       celular|
| 22|   22.43|       celular|
| 23|    7.44|       celular|
| 24|   40.36|       celular|
| 25|   28.66|       celular|
| 26|  154.98|       celular|
| 27|35859.11|       celular|
| 28|   89.94|       celular|
| 29|  890.47|       celular|
| 30| 3035.83|       celular|
| 31|20875.64|       celular|
| 32| 1508.83|       celular|
| 33|    1.58|       celular|
| 34|58083.62|       celular|
| 35| 7944.02|       celular|
| 36|48714.95|       celular|
| 37|19799.16|       celular|
| 38|   32.79|       celular|
+---+--------+--------------+
only showing top 20 rows



Como vejo todos os pixs que são do tipo 'celular' & tem um valor maior que R$ 100,00?

In [None]:
df_cast.select(
    'id',
    'valor',
    'chave_pix_tipo',
).filter(
    (col('chave_pix_tipo') == 'celular') & 
    (col('valor') > 100)
).show()

+---+--------+--------------+
| id|   valor|chave_pix_tipo|
+---+--------+--------------+
| 20| 9837.22|       celular|
| 26|  154.98|       celular|
| 27|35859.11|       celular|
| 29|  890.47|       celular|
| 30| 3035.83|       celular|
| 31|20875.64|       celular|
| 32| 1508.83|       celular|
| 34|58083.62|       celular|
| 35| 7944.02|       celular|
| 36|48714.95|       celular|
| 37|19799.16|       celular|
| 40|  829.87|       celular|
+---+--------+--------------+



Como vejo todos os pixs que são do tipo 'celular' ou 'cpf'?

In [None]:
df_cast.select(
    'id',
    'valor',
    'chave_pix_tipo',
).filter(
    (col('chave_pix_tipo') == 'celular') | 
    (col('chave_pix_tipo') == 'cpf')
).show()

+---+--------+--------------+
| id|   valor|chave_pix_tipo|
+---+--------+--------------+
|  1|    9.93|           cpf|
|  2|   15.38|           cpf|
|  3|   57.58|           cpf|
|  4|53705.13|           cpf|
|  5|25299.69|           cpf|
|  6| 7165.06|           cpf|
|  7|    6.16|           cpf|
|  8|  136.36|           cpf|
|  9|  574.39|           cpf|
| 10|   42.88|           cpf|
| 11|33629.97|           cpf|
| 12| 4374.56|           cpf|
| 13|  507.18|           cpf|
| 14|67758.87|           cpf|
| 15|  815.53|           cpf|
| 16|    2.73|           cpf|
| 17|    0.54|           cpf|
| 18|49836.72|           cpf|
| 19|    9.68|       celular|
| 20| 9837.22|       celular|
+---+--------+--------------+
only showing top 20 rows



Como deletar todos os nulos?

In [None]:
df_cast.na.drop('any').show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

Como deletar somente as linhas que estão totalmente nulas?

In [None]:
df_cast.na.drop('all').show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

Como deletar somente os nulos nas colunas 'id' e 'valor'?

In [None]:
df_cast.na.drop('all', subset=['id', 'valor']).show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

Como substitui os valores nulos na coluna 'valor' por 0?

In [None]:
df_cast.na.fill(0, subset=['valor']).show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

Qual a quantidade de transações feitas por tipo de chave pix?

In [None]:
(
    df_cast.select('chave_pix_tipo')
    .groupby('chave_pix_tipo')
    .count()
    .show()
)

+--------------+-----+
|chave_pix_tipo|count|
+--------------+-----+
|       celular|   22|
|         email|   29|
|           cpf|   49|
+--------------+-----+



Qual a montante de transações feitas por tipo de chave pix?

In [None]:
(
    df_cast.select('chave_pix_tipo', 'valor')
    .groupby('chave_pix_tipo')
    .sum('valor')
    .show()
)

+--------------+------------------+
|chave_pix_tipo|        sum(valor)|
+--------------+------------------+
|       celular|         207778.46|
|         email|499009.38000000006|
|           cpf| 659513.3499999997|
+--------------+------------------+



Qual o valor médio de transações feitas por tipo de chave pix?

In [None]:
(
    df_cast.select('chave_pix_tipo', 'valor')
    .groupby('chave_pix_tipo')
    .avg('valor')
    .show()
)

+--------------+------------------+
|chave_pix_tipo|        avg(valor)|
+--------------+------------------+
|       celular| 9444.475454545454|
|         email|          17207.22|
|           cpf|13459.456122448973|
+--------------+------------------+



# Pyspark - Escrita de Dados:

## DataFrame:

In [None]:
df_not_null = df_cast.na.fill(0, subset=['valor'])

In [None]:
df_not_null.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

## Código:

## Output:

Quais são os tipos?

In [None]:
df_not_null.printSchema()

root
 |-- id: integer (nullable = true)
 |-- valor: double (nullable = false)
 |-- parte_debitada_nome: string (nullable = true)
 |-- parte_debitada_conta: string (nullable = true)
 |-- parte_debitada_banco: string (nullable = true)
 |-- parte_creditada_nome: string (nullable = true)
 |-- parte_creditada_conta: string (nullable = true)
 |-- parte_creditada_banco: string (nullable = true)
 |-- chave_pix_tipo: string (nullable = true)
 |-- chave_pix_valor: string (nullable = true)
 |-- data_transacao: timestamp (nullable = true)



Como filtra pelo ano de 2022?

In [None]:
df_2022 = df_not_null.filter(
    year(col('data_transacao')) == '2022'
)

Qual foi a quantidade de transações em 2022? 

In [None]:
(
    df_2022
    .withColumn(
        'year', year(col('data_transacao'))
    )
    .groupby('year')
    .count()
    .show()
)

+----+-----+
|year|count|
+----+-----+
|2022|   48|
+----+-----+



Como guarda os dados no formato csv?

In [None]:
df_2022.write.mode('overwrite').csv('base_de_2022.csv')

Como guarda os dados no formato parquet?

In [None]:
df_2022.write.mode('overwrite').parquet('base_de_2022.parquet')

                                                                                

# PySpark - Particionamento:

## DataFrame:

In [None]:
df_pp = df_cast

In [None]:
df_pp.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

## Código:

# output:

In [None]:
df_pp.printSchema()

root
 |-- id: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- parte_debitada_nome: string (nullable = true)
 |-- parte_debitada_conta: string (nullable = true)
 |-- parte_debitada_banco: string (nullable = true)
 |-- parte_creditada_nome: string (nullable = true)
 |-- parte_creditada_conta: string (nullable = true)
 |-- parte_creditada_banco: string (nullable = true)
 |-- chave_pix_tipo: string (nullable = true)
 |-- chave_pix_valor: string (nullable = true)
 |-- data_transacao: timestamp (nullable = true)



Como verficar as partições?

In [None]:
df_pp.rdd.getNumPartitions()

1

Como verificar as especificações da marquina?

In [None]:
# !lscpu

Como particionar a coluna 'chave_pix_tipo'?

In [None]:
(
    df_pp
    .write
    .mode('overwrite')
    .partitionBy('chave_pix_tipo')
    .parquet('chave_pix_tipo')
)

Como criar uma coluna de data nesse formato 'yyy-mm-dd'?

In [None]:
df_final = df_cast.withColumn(
    'data_particao',
    date_format(col('data_transacao'), 'yyyy-mm-dd')
)

In [None]:
df_final.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+-------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|data_particao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+-------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|   2022-28-18|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    2714538

Como guarda os dados no formato parquet?

In [None]:
(
    df_final
    .write
    .mode('overwrite')
    .partitionBy('data_particao')
    .parquet('data_particao')
)

                                                                                

Como ler o df particionado?

In [None]:
df_particionado = (
    spark
    .read
    .parquet('data_particao')
)

                                                                                

In [None]:
df_particionado.show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+--------------------+-------------------+-------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|     chave_pix_valor|     data_transacao|data_particao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+--------------------+-------------------+-------------+
| 87|    0.93|     Mariane da Mota|            84120695|            Bradesco|Dra. Alana das Neves|             96784904|             Bradesco|         email|alana.das.neves@h...|2021-06-20 16:41:00|   2021-41-20|
| 88|78347.58|     Lorenzo Ribeiro|            50434631|                 BTG|Maria Clara Rodri...|             26325899|                 Itau|      

Qual é a quantidade de partições?

In [None]:
df_particionado.rdd.getNumPartitions()

4

Como diminuir o número de partições?

In [None]:
df_particionado_coalesce = (
    df_particionado
    .coalesce(2)
)

In [None]:
df_particionado_coalesce.rdd.getNumPartitions()

2

Como criar uma coluna de data nesse formato 'yyyy-mm'?

In [None]:
df_final_ano_mes = df_cast.withColumn(
    'data_particao',
    date_format(col('data_transacao'), 'yyyy-MM')
)

Como guardar os dados no formato parquet?

In [None]:
(
    df_final_ano_mes
    .write
    .mode('overwrite')
    .partitionBy('data_particao')
    .parquet('data_particao')
)

# SparkUI - Prática:

## Schema:

In [None]:
schema_remetente_distinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType()),
])

schema = StructType([
    StructField('id_transacao', StringType()),
    StructField('chave_pix', StringType()),
    StructField('destinatario', schema_remetente_distinatario),
    StructField('remetente', schema_remetente_distinatario),
    StructField('valor', DoubleType()),
    StructField('transaction_date', TimestampType()),
])

## DataFrame:

In [None]:
path = r'../../../data/pix_transactions.json'
df = spark.read.json(
    path,
    schema=schema,
    timestampFormat='yyyy-MM-dd'
)

In [None]:
df.show()

[Stage 44:>                                                         (0 + 1) / 1]

+------------+---------+--------------------+--------------------+--------+-------------------+
|id_transacao|chave_pix|        destinatario|           remetente|   valor|   transaction_date|
+------------+---------+--------------------+--------------------+--------+-------------------+
|        1000|      cpf|{Gabriel Cunha, I...|{Jonathan Gonsalv...|    7.05|2022-03-19 00:00:00|
|        1001|aleatoria|{Diego Souza, XP,...|{Jonathan Gonsalv...|   37.28|2021-01-26 00:00:00|
|        1002|aleatoria|{Nicole Nunes, BT...|{Jonathan Gonsalv...|  282.73|2022-05-31 00:00:00|
|        1003|aleatoria|{Maria Fernanda C...|{Jonathan Gonsalv...| 8447.92|2022-07-04 00:00:00|
|        1004|aleatoria|{Isabel Silva, C6...|{Jonathan Gonsalv...|   58.51|2021-09-11 00:00:00|
|        1005|  celular|{Anthony Carvalho...|{Jonathan Gonsalv...| 6655.12|2022-02-11 00:00:00|
|        1006|      cpf|{Eloah Monteiro, ...|{Jonathan Gonsalv...| 9912.25|2022-05-10 00:00:00|
|        1007|aleatoria|{Sophie Rocha, B

                                                                                

## Código:

## Output:

In [None]:
df.printSchema()

root
 |-- id_transacao: string (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- destinatario: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- remetente: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- valor: double (nullable = true)
 |-- transaction_date: timestamp (nullable = true)



# SparkSQL - Introdução:

## Schema:

In [None]:
schema = StructType([
    StructField('id', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('parte_debitada_nome', StringType()),
    StructField('parte_debitada_conta', StringType()),
    StructField('parte_debitada_banco', StringType()),
    StructField('parte_creditada_nome', StringType()),
    StructField('parte_creditada_conta', StringType()),
    StructField('parte_creditada_banco', StringType()),
    StructField('chave_pix_tipo', StringType()),
    StructField('chave_pix_valor', StringType()),
    StructField('data_transacao', TimestampType())
])

## DataFrame:

In [None]:
path = r'../../../data/base_de_dados.csv'
df = spark.read.csv(
    path=path,
    header=True,
    schema=schema,
    sep=';',
    timestampFormat='dd/MM/yyyy HH:mm'
).createOrReplaceTempView("base_pix")

In [None]:
spark.sql('select * from base_pix').show()

+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|   valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+--------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1|    9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|   15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|   57.58|    Arthur Gonca

## Código:

## Output:

Qual foi a quantidade de transações por tido de chave pix?

In [None]:
query = """
select
    chave_pix_tipo,
    count(id) as quantidade
from base_pix

group by chave_pix_tipo
"""
spark.sql(query).show()

+--------------+----------+
|chave_pix_tipo|quantidade|
+--------------+----------+
|       celular|        22|
|         email|        29|
|           cpf|        49|
+--------------+----------+



Qual foi o valor total transacionado por tipo de chave pix?

In [None]:
query = """
select
    chave_pix_tipo,
    round(sum(valor), 2) as valor_total
from base_pix

group by chave_pix_tipo
"""
spark.sql(query).show()

+--------------+-----------+
|chave_pix_tipo|valor_total|
+--------------+-----------+
|       celular|  207778.46|
|         email|  499009.38|
|           cpf|  659513.35|
+--------------+-----------+



Qual foi a média transacionado por tipo de chave pix?

In [None]:
query = """
select
    chave_pix_tipo,
    round(avg(valor), 2) as media
from base_pix

group by chave_pix_tipo
"""

spark.sql(query).show()

+--------------+--------+
|chave_pix_tipo|   media|
+--------------+--------+
|       celular| 9444.48|
|         email|17207.22|
|           cpf|13459.46|
+--------------+--------+



# SparkSQL - II:

## Schema:

In [None]:
schema_remetente_distinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType()),
])

schema = StructType([
    StructField('id_transacao', StringType()),
    StructField('chave_pix', StringType()),
    StructField('destinatario', schema_remetente_distinatario),
    StructField('remetente', schema_remetente_distinatario),
    StructField('valor', DoubleType()),
    StructField('transaction_date', TimestampType()),
])

## DataFrame:

In [None]:
path = r'../../../data/pix_transactions.json'
df = spark.read.json(
    path,
    schema=schema,
    timestampFormat='yyyy-MM-dd'
).createOrReplaceTempView("base_pix_json")

In [None]:
spark.sql('select * from base_pix_json').show()

[Stage 55:>                                                         (0 + 1) / 1]

+------------+---------+--------------------+--------------------+--------+-------------------+
|id_transacao|chave_pix|        destinatario|           remetente|   valor|   transaction_date|
+------------+---------+--------------------+--------------------+--------+-------------------+
|        1000|      cpf|{Gabriel Cunha, I...|{Jonathan Gonsalv...|    7.05|2022-03-19 00:00:00|
|        1001|aleatoria|{Diego Souza, XP,...|{Jonathan Gonsalv...|   37.28|2021-01-26 00:00:00|
|        1002|aleatoria|{Nicole Nunes, BT...|{Jonathan Gonsalv...|  282.73|2022-05-31 00:00:00|
|        1003|aleatoria|{Maria Fernanda C...|{Jonathan Gonsalv...| 8447.92|2022-07-04 00:00:00|
|        1004|aleatoria|{Isabel Silva, C6...|{Jonathan Gonsalv...|   58.51|2021-09-11 00:00:00|
|        1005|  celular|{Anthony Carvalho...|{Jonathan Gonsalv...| 6655.12|2022-02-11 00:00:00|
|        1006|      cpf|{Eloah Monteiro, ...|{Jonathan Gonsalv...| 9912.25|2022-05-10 00:00:00|
|        1007|aleatoria|{Sophie Rocha, B

                                                                                

## Código:

## Output:

In [None]:
query = """
with cte_base_window(
    select
        destinatario.banco,
        valor,
        row_number() over(partition by destinatario.banco order by valor desc) as row_number
    from base_pix_json
) 

select
    banco,
    valor
from cte_base_window
where row_number in (1, 2)
"""
spark.sql(query).show()

[Stage 56:>                                                         (0 + 1) / 1]

+--------+--------+
|   banco|   valor|
+--------+--------+
|     BTG|99946.78|
|     BTG| 99913.9|
|Bradesco|99910.87|
|Bradesco|99887.88|
|      C6|99980.03|
|      C6|99964.99|
|   Caixa|99969.06|
|   Caixa|99933.09|
|    Itau|99999.54|
|    Itau|99951.02|
|  Nubank|99935.45|
|  Nubank|99914.35|
|      XP|99961.28|
|      XP|99934.01|
+--------+--------+



                                                                                

# SparkML - Intodução e Preparação dos Dados:

# Schema:

In [None]:
schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType())
])

schema = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('chave_pix', StringType()),
    StructField('categoria', StringType()),
    StructField('transaction_date', StringType()),
    StructField('fraude', IntegerType())
])

# DataFrame:

In [None]:
path = r'../../../data/case_final.json'
df = spark.read.json(
    path=path,
    schema=schema,
    timestampFormat="yyyy-MM-dd HH:mm:ss"
)

In [None]:
df.show()

+------------+------------------+--------------------+--------------------+---------+-------------+-------------------+------+
|id_transacao|             valor|           remetente|        destinatario|chave_pix|    categoria|   transaction_date|fraude|
+------------+------------------+--------------------+--------------------+---------+-------------+-------------------+------+
|        1000|            588.08|{Jonathan Gonsalv...|{Calebe Melo, Cai...|aleatoria|       outros|2021-07-16 05:00:55|     0|
|        1001|           80682.5|{Jonathan Gonsalv...|{Davi Lucas Perei...|  celular|transferencia|2022-04-20 12:34:01|     1|
|        1002|             549.9|{Jonathan Gonsalv...|{Sabrina Castro, ...|      cpf|        lazer|2022-07-10 16:51:34|     0|
|        1003|             90.83|{Jonathan Gonsalv...|{Francisco da Con...|aleatoria|   transporte|2022-10-20 10:57:36|     0|
|        1004|13272.619999999999|{Jonathan Gonsalv...|{Isabelly Ferreir...|    email|transferencia|2021-04-06 2

                                                                                

## Código:

## Output:

### Entendimento dos dados:

In [None]:
df.printSchema()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- remetente: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- destinatario: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)



### Preparação dos dados:

Como transforma a struct na coluna 'destinatario' em novas colunas?

In [None]:
df_flatten = df.withColumns({
    'destinatario_nome': col('destinatario').getField('nome'),
    'destinatario_banco': col('destinatario').getField('banco'),
    'destinatario_tipo': col('destinatario').getField('tipo'),
})

Como deletar as colunas 'remetente' e 'destinatario'?

In [None]:
df_flatten = df_flatten.drop('remetente', 'destinatario')

Qual foi o resultado final da preparação dos dados?

In [None]:
df_flatten.show()

+------------+------------------+---------+-------------+-------------------+------+--------------------+------------------+-----------------+
|id_transacao|             valor|chave_pix|    categoria|   transaction_date|fraude|   destinatario_nome|destinatario_banco|destinatario_tipo|
+------------+------------------+---------+-------------+-------------------+------+--------------------+------------------+-----------------+
|        1000|            588.08|aleatoria|       outros|2021-07-16 05:00:55|     0|         Calebe Melo|             Caixa|               PF|
|        1001|           80682.5|  celular|transferencia|2022-04-20 12:34:01|     1|  Davi Lucas Pereira|             Caixa|               PJ|
|        1002|             549.9|      cpf|        lazer|2022-07-10 16:51:34|     0|      Sabrina Castro|            Nubank|               PF|
|        1003|             90.83|aleatoria|   transporte|2022-10-20 10:57:36|     0|Francisco da Conc...|            Nubank|               PJ|

Como transforma as variáveis categoricas em númericas?

In [None]:
indexer = StringIndexer(
    inputCols=[
        'destinatario_nome',
        'destinatario_banco',
        'destinatario_tipo',
        'categoria',
        'chave_pix'
    ],
    outputCols=[
        'destinatario_nome_index',
        'destinatario_banco_index',
        'destinatario_tipo_index',
        'categoria_index',
        'chave_pix_index'        
    ]
)

df_index = indexer.fit(df_flatten).transform(df_flatten)

                                                                                

Qual foi o resultado?

In [None]:
df_index.printSchema()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)
 |-- destinatario_nome: string (nullable = true)
 |-- destinatario_banco: string (nullable = true)
 |-- destinatario_tipo: string (nullable = true)
 |-- destinatario_nome_index: double (nullable = false)
 |-- destinatario_banco_index: double (nullable = false)
 |-- destinatario_tipo_index: double (nullable = false)
 |-- categoria_index: double (nullable = false)
 |-- chave_pix_index: double (nullable = false)



In [None]:
df_index.show()

23/08/24 09:02:33 WARN DAGScheduler: Broadcasting large task binary with size 1275.1 KiB


+------------+------------------+---------+-------------+-------------------+------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|id_transacao|             valor|chave_pix|    categoria|   transaction_date|fraude|   destinatario_nome|destinatario_banco|destinatario_tipo|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|categoria_index|chave_pix_index|
+------------+------------------+---------+-------------+-------------------+------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|        1000|            588.08|aleatoria|       outros|2021-07-16 05:00:55|     0|         Calebe Melo|             Caixa|               PF|                12045.0|                     4.0|                    1.0|            6.0|            3.0|
|       

                                                                                

Como separar a fraude da não fraude?

In [None]:
is_fraud = df_index.filter("fraude == 1")
no_fraud = df_index.filter("fraude == 0")

Como separar uma amostra dos dados?

In [None]:
no_fraud = no_fraud.sample(False, 0.01, seed=123) 

Como unir dois dataframes e ordernar pela data?

In [None]:
df_concat = no_fraud.union(is_fraud)
df = df_concat.sort("transaction_date")
df.count()

                                                                                

16202

### Modelagem:

Como separar os dados de traino dos dados de test?

In [None]:
train, test = df.randomSplit([0.7, 0.3], seed = 123)
print("train =", train.count(), " test =", test.count())

23/08/24 09:02:35 WARN DAGScheduler: Broadcasting large task binary with size 1292.6 KiB
23/08/24 09:02:37 WARN DAGScheduler: Broadcasting large task binary with size 1304.9 KiB
23/08/24 09:02:38 WARN DAGScheduler: Broadcasting large task binary with size 1295.9 KiB
23/08/24 09:02:39 WARN DAGScheduler: Broadcasting large task binary with size 1292.6 KiB
23/08/24 09:02:40 WARN DAGScheduler: Broadcasting large task binary with size 1304.9 KiB

train = 11278  test = 4924


23/08/24 09:02:41 WARN DAGScheduler: Broadcasting large task binary with size 1295.9 KiB
                                                                                

Como indentificar fraudes?

In [None]:
is_fraud = udf(lambda fraud: 1.0 if fraud > 0 else 0.0, DoubleType())
train = train.withColumn("is_fraud", is_fraud(train.fraude))

In [None]:
train.show()

23/08/24 09:02:42 WARN DAGScheduler: Broadcasting large task binary with size 1292.6 KiB
23/08/24 09:02:43 WARN DAGScheduler: Broadcasting large task binary with size 1304.9 KiB
23/08/24 09:02:44 WARN DAGScheduler: Broadcasting large task binary with size 1303.5 KiB
[Stage 85:>                                                         (0 + 1) / 1]

+------------+------------------+---------+-------------+-------------------+------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+--------+
|id_transacao|             valor|chave_pix|    categoria|   transaction_date|fraude|   destinatario_nome|destinatario_banco|destinatario_tipo|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|categoria_index|chave_pix_index|is_fraud|
+------------+------------------+---------+-------------+-------------------+------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+--------+
|        1001|           80682.5|  celular|transferencia|2022-04-20 12:34:01|     1|  Davi Lucas Pereira|             Caixa|               PJ|                  259.0|                     4.0|                    0.0|            0.

                                                                                

Como ficou o Schema do dataframe train?

In [None]:
train.printSchema()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)
 |-- destinatario_nome: string (nullable = true)
 |-- destinatario_banco: string (nullable = true)
 |-- destinatario_tipo: string (nullable = true)
 |-- destinatario_nome_index: double (nullable = false)
 |-- destinatario_banco_index: double (nullable = false)
 |-- destinatario_tipo_index: double (nullable = false)
 |-- categoria_index: double (nullable = false)
 |-- chave_pix_index: double (nullable = false)
 |-- is_fraud: double (nullable = true)



Quais são as colunas do dataframe train?

In [None]:
train.columns

['id_transacao',
 'valor',
 'chave_pix',
 'categoria',
 'transaction_date',
 'fraude',
 'destinatario_nome',
 'destinatario_banco',
 'destinatario_tipo',
 'destinatario_nome_index',
 'destinatario_banco_index',
 'destinatario_tipo_index',
 'categoria_index',
 'chave_pix_index',
 'is_fraud']

O que é um 'Vector Assembler'?

In [None]:
assembler = VectorAssembler(
    inputCols = [x for x in train.columns if x not in ['transaction_date', 'fraude', 'is_fraud', 'destinatario_nome', 'destinatario_banco', 'destinatario_tipo', 'chave_pix', 'categoria']],
    outputCol="features"
)

Como criar um módelo de regressão logística?

In [None]:
lr = LogisticRegression().setParams(
    maxIter=100000,
    labelCol = "is_fraud",
    predictionCol="prediction"
)

Como treinar o modelo?

In [None]:
model = Pipeline(stages=[assembler, lr]).fit(train)

23/08/24 09:02:46 WARN DAGScheduler: Broadcasting large task binary with size 1292.6 KiB
23/08/24 09:02:47 WARN DAGScheduler: Broadcasting large task binary with size 1304.9 KiB
23/08/24 09:02:49 WARN DAGScheduler: Broadcasting large task binary with size 1292.6 KiB
23/08/24 09:02:51 WARN DAGScheduler: Broadcasting large task binary with size 1304.9 KiB
23/08/24 09:02:52 WARN DAGScheduler: Broadcasting large task binary with size 1341.3 KiB
23/08/24 09:02:55 WARN DAGScheduler: Broadcasting large task binary with size 1342.0 KiB
23/08/24 09:02:56 WARN DAGScheduler: Broadcasting large task binary with size 1342.0 KiB
23/08/24 09:02:56 WARN DAGScheduler: Broadcasting large task binary with size 1342.0 KiB
23/08/24 09:02:56 WARN DAGScheduler: Broadcasting large task binary with size 1342.0 KiB
23/08/24 09:02:56 WARN DAGScheduler: Broadcasting large task binary with size 1342.0 KiB
23/08/24 09:02:56 WARN DAGScheduler: Broadcasting large task binary with size 1342.0 KiB
23/08/24 09:02:56 WAR

Como realizar uma predição?

In [None]:
predicted = model.transform(test)

Qual foi resultado da predição?

In [None]:
predicted.show()

23/08/24 09:03:05 WARN DAGScheduler: Broadcasting large task binary with size 1292.6 KiB
23/08/24 09:03:06 WARN DAGScheduler: Broadcasting large task binary with size 1304.9 KiB

+------------+--------+---------+-------------+-------------------+------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+--------------------+--------------------+--------------------+----------+
|id_transacao|   valor|chave_pix|    categoria|   transaction_date|fraude|   destinatario_nome|destinatario_banco|destinatario_tipo|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|categoria_index|chave_pix_index|            features|       rawPrediction|         probability|prediction|
+------------+--------+---------+-------------+-------------------+------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+--------------------+--------------------+--------------------+----------+
|        1011|21345.91|      cpf|transferencia|2021-10-31 04:31:

23/08/24 09:03:07 WARN DAGScheduler: Broadcasting large task binary with size 1341.1 KiB
                                                                                

Como posso verificar a acurácia do modelo?

In [None]:
predicted = predicted.withColumn('is_fraud', is_fraud(predicted.fraude))
predicted.crosstab('is_fraud', 'prediction').show()

23/08/24 09:03:08 WARN DAGScheduler: Broadcasting large task binary with size 1292.6 KiB
23/08/24 09:03:09 WARN DAGScheduler: Broadcasting large task binary with size 1304.9 KiB
23/08/24 09:03:11 WARN DAGScheduler: Broadcasting large task binary with size 1340.3 KiB
23/08/24 09:03:11 WARN DAGScheduler: Broadcasting large task binary with size 1330.7 KiB
23/08/24 09:03:11 WARN DAGScheduler: Broadcasting large task binary with size 1330.4 KiB
23/08/24 09:03:11 WARN DAGScheduler: Broadcasting large task binary with size 1328.0 KiB
23/08/24 09:03:11 WARN DAGScheduler: Broadcasting large task binary with size 1292.6 KiB
23/08/24 09:03:13 WARN DAGScheduler: Broadcasting large task binary with size 1304.9 KiB
23/08/24 09:03:14 WARN DAGScheduler: Broadcasting large task binary with size 1346.9 KiB
23/08/24 09:03:15 WARN DAGScheduler: Broadcasting large task binary with size 1334.5 KiB
23/08/24 09:03:15 WARN DAGScheduler: Broadcasting large task binary with size 1337.8 KiB


+-------------------+---+----+
|is_fraud_prediction|0.0| 1.0|
+-------------------+---+----+
|                1.0|  0|4682|
|                0.0|242|   0|
+-------------------+---+----+



                                                                                

Como criar novos valores para testando o nosso modelo?

In [None]:
df_teste_cols = [
    'id_transacao',
    'valor', 
    'transaction_date',
    'destinatario_nome_index',
    'destinatario_banco_index',
    'destinatario_tipo_index',
    'chave_pix_index',
    'categoria_index',
    'fraude'
]

df_teste_data = [
    (999,103.2, "2023-01-01 11:56:41", 328.0, 4.0, 1.0, 3.0, 5.0, 0),
    (998, 500000.0, "2023-01-01 11:56:41", 328.0, 2.0, 3.0, 2.0, 5.0, 1),
    (997, 19999.0, "2023-01-01 11:56:41", 328.0, 1.0, 2.0, 1.0, 5.0, 0),
]

df_teste = spark.createDataFrame(df_teste_data).toDF(*df_teste_cols)

Como realizar a predição com os novos valores?

In [None]:
new_prediction = model.transform(df_teste)

Qual foi o resultado da predição?

In [None]:
new_prediction.show()

+------------+--------+-------------------+-----------------------+------------------------+-----------------------+---------------+---------------+------+--------------------+--------------------+-----------+----------+
|id_transacao|   valor|   transaction_date|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|chave_pix_index|categoria_index|fraude|            features|       rawPrediction|probability|prediction|
+------------+--------+-------------------+-----------------------+------------------------+-----------------------+---------------+---------------+------+--------------------+--------------------+-----------+----------+
|         999|   103.2|2023-01-01 11:56:41|                  328.0|                     4.0|                    1.0|            3.0|            5.0|     0|[999.0,103.2,328....|[426.491663442539...|  [1.0,0.0]|       0.0|
|         998|500000.0|2023-01-01 11:56:41|                  328.0|                     2.0|                    3.0|

# Data Quality - Deequ:

Fonte: [Deequ AWS](https://aws.amazon.com/pt/blogs/big-data/test-data-quality-at-scale-with-deequ/)

Fonte: [Deequ Doc](https://github.com/awslabs/deequ)

## Schema:

In [None]:
schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType())
])

schema = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('chave_pix', StringType()),
    StructField('categoria', StringType()),
    StructField('transaction_date', StringType()),
    StructField('fraude', IntegerType())
])

## DataFrame:

In [None]:
path = '../../../data/case_final.json'

df = spark.read.json(
    path,
    schema=schema,
    timestampFormat="yyyy-MM-dd HH:mm:ss"
)

## Código:

## Output:

In [None]:
df.printSchema()

Como transformo uma Struct em uma coluna?

In [None]:
df = df.withColumn(
      'destinatario_nome', col('destinatario').getField('nome')
    ).withColumn(
      'destinatario_banco', col('destinatario').getField('banco')
    ).withColumn(
      'destinatario_tipo', col('destinatario').getField('tipo')
    ).withColumn(
      'remetente_nome', col('remetente').getField('nome')
    ).withColumn(
      'remetente_banco', col('remetente').getField('banco')
    ).withColumn(
      'remetente_tipo', col('remetente').getField('tipo')
)

Como removo as colunas 'destinatario' e 'remetente'?

In [None]:
df = df.drop('remetente', 'destinatario')

Como posso guardar o resultado das análises?

In [None]:
analysisResult = (
    AnalysisRunner(spark).onData(df)
    .addAnalyzer(Size())
    .addAnalyzer(Completeness('id_transacao'))
    .addAnalyzer(Compliance("valor", "valor > 0"))
    .run()
)

O que é o 'analysisResult'?

In [None]:
analysisResult

Como transformar um 'JavaObject id=o89' em um DataFrame?

In [None]:
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)

In [None]:
analysisResult_df.show()

Como posso ter sugestões de métricas de qualidade?

In [None]:
suggestionResult = ConstraintSuggestionRunner(spark).onData(df).addConstraintRule(DEFAULT()).run()

analysisResult_df.show()

Como vejo todas as sugestões?

In [None]:
for sugg in suggestionResult['constraint_suggestions']:
  print(f"Sugestao de Constraint: \'{sugg['column_name']}\': {sugg['description']}")
  print(f"PySpark Code: {sugg['code_for_constraint']}\n")

Como verifico 'error' e 'Warning'?

In [None]:
check = Check(spark, CheckLevel.Warning, "Review Check")
error = Check(spark, CheckLevel.Error, "Error")

Como posso criar métricas de qualidade?

In [None]:
checkResult = (
    VerificationSuite(spark)
    .onData(df)
    .addCheck(
        check.hasDataType("id_transacao",ConstrainableDataTypes.Integral)
        .isNonNegative("id_transacao")
        .isComplete("id_transacao") 
        .isUnique('id_transcao')
    )
    .run()
)

Como posso verificar as métricas de qualidade?

In [None]:
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

Como posso testando se a verificação de qualidade está funcionando corretamente?  

In [None]:
checkResult = (
    VerificationSuite(spark)
    .onData(df)
    .addCheck(
        error
        .isContainedIn("remetente_tipo", ["CNPJ"])
    )
    .run()
)

Como posso verificar o teste?

In [None]:
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)