<a href="https://colab.research.google.com/github/MarcosWBeltrami/Case-Engenheiro-de-Dados-Safra/blob/main/Case_Engenheiro_de_Dados.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



#Case Engenheiro de Dados
##Projeto parta apresentar o saldo atualizado da conta corrente de todos os clientes separados por data onde seja possível reprocessar/identificar as alterações de saldos entre os dias.



## importando PySpark

In [1]:
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType


spark = SparkSession.builder.master("local").getOrCreate()



##Definindo caminho

In [2]:
# Definir o esquema dos DataFrames, para conseguir tratar os dados no formato correto
schema_movimentacao = "Nome STRING, CPF STRING, Movimentacao_dia DOUBLE, data STRING"
schema_saldo_inicial = "Nome STRING, CPF STRING, Saldo_Inicial_CC DOUBLE, data STRING"

#definição dos caminhos, pode ser tratado diretamente na leitura, porém decidi por colocar separado para facil manutenção
caminho_02_04_2022 = "movimentacao_dia_02_04_2022.txt"
caminho_03_04_2022 = "movimentacao_dia_03_04_2022.txt"
caminho_tabela_saldo_inicial = "tabela_saldo_inicial.txt"

# leitura dis arquivos de textos
movimentacao_dia_02_04_2022 = spark.read.csv(caminho_02_04_2022, sep=';', header=True, schema=schema_movimentacao)
movimentacao_dia_03_04_2022 = spark.read.csv(caminho_03_04_2022, sep=';', header=True, schema=schema_movimentacao)
tabela_saldo_inicial = spark.read.csv(caminho_tabela_saldo_inicial, sep=';', header=True, schema=schema_saldo_inicial)
tabela_saldo = tabela_saldo_inicial #criado para caso seja necessario alterar o tabela_saldo_inicial ter um historico dos dados iniciais

##Tratamento do DataFrame

In [3]:
# DataFrame com todos os CPFs únicos, foi necessario a criação para que possa ter o resultado de todos os CPFs mesmo os que não foram alterados para melhor vizualição e manuseio
todos_cpfs = tabela_saldo_inicial.select("CPF").distinct()

# DataFrame para a data 02/04/2022
df_saldo_02_04_2022 = (
    movimentacao_dia_02_04_2022
    .groupBy("CPF", "data")
    .agg(F.sum("Movimentacao_dia").alias("movimentacao_02_04_2022"))
)

# DataFrame para a data 03/04/2022
df_saldo_03_04_2022 = (
    movimentacao_dia_03_04_2022
    .groupBy("CPF", "data")
    .agg(F.sum("Movimentacao_dia").alias("movimentacao_03_04_2022"))
)

In [4]:
df_saldo_02_04_2022.show()

+-----------+----------+-----------------------+
|        CPF|      data|movimentacao_02_04_2022|
+-----------+----------+-----------------------+
|00000000010|02/04/2022|                 600.35|
|00000000007|02/04/2022|                 292.86|
|00000000006|02/04/2022|                 283.77|
|00000000004|02/04/2022|                 241.62|
|00000000005|02/04/2022|                  87.63|
+-----------+----------+-----------------------+



## Join com as 3 tabelas, baseando no CPF como chave primaria
#### Uma parte muito importante, onde realmente é feito o tratamento do codigo, e realizado os JOINs, que consiste em trabalhar em cada tabela da melhor maneira.

In [21]:
df_saldo_atualizado = (
    todos_cpfs
    .join(
        df_saldo_02_04_2022,
        on=["CPF"],
        how="left_outer"
    )
    .join(
        df_saldo_03_04_2022,
        on=["CPF"],
        how="full_outer"
    )
    .join(
        tabela_saldo_inicial.select("Nome", "CPF", "Saldo_Inicial_CC", F.col("data").alias("data_inicio")),
        on=["CPF"],
        how="left_outer"
    )
    .select(
        F.col("CPF"),
        F.col("data_inicio"),
        F.round(F.col("Saldo_Inicial_CC"), 2).alias("saldo_inicial"),
        F.round(F.when(F.col("movimentacao_02_04_2022").isNull(), 0).otherwise(F.col("movimentacao_02_04_2022")), 2).alias("movimentacao_02_04_2022"),
        F.round(
            F.when(F.col("movimentacao_02_04_2022") != 0.0, F.col("Saldo_Inicial_CC") + F.col("movimentacao_02_04_2022"))
            .otherwise(F.col("Saldo_Inicial_CC")), 2
        ).alias("saldo_total_02_04_2022"),
        F.round(F.when(F.col("movimentacao_03_04_2022").isNull(), 0).otherwise(F.col("movimentacao_03_04_2022")), 2).alias("movimentacao_03_04_2022")

    )
)

## Tratamento do resultado

In [22]:
# Ordenar por CPF
df_saldo_atualizado = df_saldo_atualizado.orderBy("CPF")


# Substituir valores nulos por zero, necessario para realizar os calculos, e ter um resultado final.
df_saldo_atualizado = df_saldo_atualizado.fillna(0, subset=["saldo_inicial", "movimentacao_02_04_2022", "movimentacao_03_04_2022"])


# Calcular o saldo total
df_saldo_atualizado = df_saldo_atualizado.withColumn(
    "saldo_total",
    F.round(F.expr("saldo_inicial + coalesce(movimentacao_02_04_2022, 0) + coalesce(movimentacao_03_04_2022, 0)"), 2)
)


# Coluna indicando se o saldo total é negativo, achei necessario já que é uma conta corrente considero uma informação impórtante e até facilita caso seja necesasario tomar alguma atitude
df_saldo_atualizado = df_saldo_atualizado.withColumn("saldo_negativo", F.when(F.col("saldo_total") < 0, "Sim").otherwise("Nao"))


## Salvando um CSV

In [7]:
# Definindo caminho de Salvamento
caminho_resultado_csv = "saldo_atualizado.csv"

# Salvando CVS
df_saldo_atualizado.write.csv(caminho_resultado_csv, header=True, mode="overwrite")

## Apresentando o resultado

In [23]:
#Apresentando DataFrame
df_saldo_atualizado.show(truncate=False)

+-----------+-----------+-------------+-----------------------+----------------------+-----------------------+-----------+--------------+
|CPF        |data_inicio|saldo_inicial|movimentacao_02_04_2022|saldo_total_02_04_2022|movimentacao_03_04_2022|saldo_total|saldo_negativo|
+-----------+-----------+-------------+-----------------------+----------------------+-----------------------+-----------+--------------+
|00000000001|01/04/2022 |523.86       |0.0                    |523.86                |891.32                 |1415.18    |Nao           |
|00000000002|01/04/2022 |917.78       |0.0                    |917.78                |548.93                 |1466.71    |Nao           |
|00000000003|01/04/2022 |321.84       |0.0                    |321.84                |-309.27                |12.57      |Nao           |
|00000000004|01/04/2022 |271.51       |241.62                 |513.13                |0.0                    |513.13     |Nao           |
|00000000005|01/04/2022 |225.55   

In [9]:
#Apresentando DataFrame
df_saldo_atualizado.show(truncate=False)

+-----------+-----------+-------------+-----------------------+----------------------+-----------------------+----------------------+-----------+--------------+
|CPF        |data_inicio|saldo_inicial|movimentacao_02_04_2022|saldo_total_02_04_2022|movimentacao_03_04_2022|saldo_total_03_04_2022|saldo_total|saldo_negativo|
+-----------+-----------+-------------+-----------------------+----------------------+-----------------------+----------------------+-----------+--------------+
|00000000001|01/04/2022 |523.86       |523.86                 |1047.72               |891.32                 |1415.18               |1939.04    |Nao           |
|00000000002|01/04/2022 |917.78       |917.78                 |1835.56               |548.93                 |1466.71               |2384.49    |Nao           |
|00000000003|01/04/2022 |321.84       |321.84                 |643.68                |-309.27                |12.57                 |334.41     |Nao           |
|00000000004|01/04/2022 |271.51   

In [10]:
# Demonstração dos tipos de dados, que foi definido no inicio do codigo
df_saldo_atualizado

DataFrame[CPF: string, data_inicio: string, saldo_inicial: double, movimentacao_02_04_2022: double, saldo_total_02_04_2022: double, movimentacao_03_04_2022: double, saldo_total_03_04_2022: double, saldo_total: double, saldo_negativo: string]