# Teste NAVA

Simular os resultados em dataframes e apresentar o resultado

**Objetivo**: Apresentar o saldo atualizado da conta corrente de todos os clientes
separados por data em que seja possível reprocessar/identificar as alterações de
saldos entre os dias.


**Requisitos**

Pyspark

**Execução**

O código foi desenvolvido em Python 3.11.9 e utiliza o Pyspark para processamento dos dados, em ambiente local utilizando Windows Subsystem for Linux (WSL), com o Spark instalado.


**Contato**

Bruno Gonçalves Oliveira

bruoli3@gmail.com

## Importando bibliotecas necessárias

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, lag, lit, first, max
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("teste_nava").getOrCreate()

24/10/17 01:43:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Leitura dos arquivos

Leitura dos arquivos de entrada e criação dos dataframes iniciais.

União das movimentações dos dias 02 e 03/04/2022 para facilitar a análise.

In [11]:
initial_balance = spark.read.csv("data/tabela_saldo_inicial.txt", header=True, sep=";")
mov_02_04 = spark.read.csv("data/movimentacao_dia_02_04_2022.txt", header=True, sep=";")
mov_03_04 = spark.read.csv("data/movimentacao_dia_03_04_2022.txt", header=True, sep=";")
movimentations = mov_02_04.union(mov_03_04)

## Movimentações

Cria coluna Saldo Inicial com valor 0, para posteriormente calcular o saldo inicial, e colunas Credito e Debito se valor maior que 0 na coluna Movimentacao_dia e menor que 0, respectivamente.

Os valores da coluna Movimentacao_dia são substituidos com a soma de Credito e Debito. Por fim, cria-se a coluna Saldo_Final com valor 0. Renomeia a coluna data para Data e seleciona as colunas desejadas.

In [12]:
movimentations = movimentations.withColumn(
    "Saldo_Inicial",
    lit(0).cast("float")
).withColumn(
    "Credito",
    when(
        col("Movimentacao_dia") > 0,
        col("Movimentacao_dia").cast("float")
    ).otherwise(0)
).withColumn(
    "Debito",
    when(
        col("Movimentacao_dia") < 0,
        col("Movimentacao_dia").cast("float")
    ).otherwise(0)
).withColumn(
    "Movimentacao_dia",
    (col("Credito") + col("Debito")).cast("float")
).withColumn(
    "Saldo_Final",
    lit(0).cast("float")
).withColumnRenamed(
    "data",
    "Data"
).select(
    "Nome",
    "CPF",
    "Data",
    "Saldo_Inicial",
    "Credito",
    "Debito",
    "Movimentacao_dia",
    "Saldo_Final"
)

## Saldo Inicial

Renomeia a coluna Saldo_Inicial_CC para Saldo_Inicial, renomeia a coluna data para Data, cria colunas Credito e Debito com valor 0, cria coluna Movimentacao_dia com a soma de Credito e Debito, cria coluna Saldo_final com valor de Saldo_Inicial. Seleciona as colunas desejadas.

In [13]:
initial_balance = initial_balance.withColumnRenamed(
    "Saldo_Inicial_CC",
    "Saldo_Inicial"
).withColumnRenamed(
    "data",
    "Data"
).withColumn(
    "Credito",
    lit(0).cast("float")
).withColumn(
    "Debito",
    lit(0).cast("float")
).withColumn(
    "Movimentacao_dia",
    (col("Credito") + col("Debito")).cast("float")
).withColumn(
    "Saldo_final",
    col("Saldo_Inicial").cast("float")
).select(
    "Nome",
    "CPF",
    "Data",
    "Saldo_Inicial",
    "Credito",
    "Debito",
    "Movimentacao_dia",
    "Saldo_final"
)

## União de todas as movimentações

Concatena os dataframes movimentacoes e saldo_inicial

In [14]:
all_operations = initial_balance.union(movimentations)

## Calculo do Saldo Final

Cria uma janela de partionamento por CPF ordenada pela data da movimentação, calcula a soma da movimentação do dia (Crédito - Débito) e o saldo inicial para resultar no saldo final.

In [15]:
window_spec = Window.partitionBy("CPF").orderBy("Data").rowsBetween(Window.unboundedPreceding, Window.currentRow)

all_operations = all_operations.withColumn(
    "Saldo_Final",
    sum(col("Saldo_Inicial") + col("Movimentacao_dia")).over(window_spec)
)

## Preenche os valores de saldo inicial

Preenche os valores de saldo inicial das linhas de movimentação que naturalmente não possuiam valor de saldo inicial, se baseando no saldo final da linha anterior `lag("Saldo_Final", 1).over(window)`. E preenche o saldo inicial da primeira linha de cada CPF com o saldo inicial da conta corrente `when(col("Saldo_Inicial").isNull(), col("Saldo_Final")).otherwise(col("Saldo_Inicial"))`

In [16]:
window_spec = Window.partitionBy("CPF").orderBy("Data")

final_balance = all_operations.withColumn(
    "Saldo_Inicial",
    lag("Saldo_Final", 1).over(window_spec)
)

final_balance = final_balance.withColumn(
    "Saldo_Inicial",
    when(
        col("Saldo_Inicial").isNull(),
        col("Saldo_Final")
    ).otherwise(
        col("Saldo_Inicial")
    )
)

## Resultado

In [17]:
final_balance.show()

+-----+-----------+----------+-------------------+-------+-------+----------------+-------------------+
| Nome|        CPF|      Data|      Saldo_Inicial|Credito| Debito|Movimentacao_dia|        Saldo_Final|
+-----+-----------+----------+-------------------+-------+-------+----------------+-------------------+
|Maria|00000000001|01/04/2022|             523.86|    0.0|    0.0|             0.0|             523.86|
|Maria|00000000001|03/04/2022|             523.86| 157.62|    0.0|          157.62|  681.4799951171875|
|Maria|00000000001|03/04/2022|  681.4799951171875| 518.89|    0.0|          518.89| 1200.3700097656251|
|Maria|00000000001|03/04/2022| 1200.3700097656251| 214.81|    0.0|          214.81| 1415.1800073242189|
| José|00000000002|01/04/2022|             917.78|    0.0|    0.0|             0.0|             917.78|
| José|00000000002|03/04/2022|             917.78|   56.8|    0.0|            56.8|  974.5799992370605|
| José|00000000002|03/04/2022|  974.5799992370605| 492.13|    0.

                                                                                

## Resultado consolidado

In [24]:
consolidated_balance = final_balance.groupBy('CPF').agg(
    first('Saldo_Inicial').alias('Saldo_Inicial'),
    sum('Movimentacao_dia').alias('Total_Movimentacao_dia'),
    max('Data').alias('Last_Transaction_Date')
).withColumn(
    'Real_Balance', col('Saldo_Inicial') + col('Total_Movimentacao_dia')
)

In [25]:
consolidated_balance.show()

+-----------+-----------------+----------------------+---------------------+------------------+
|        CPF|    Saldo_Inicial|Total_Movimentacao_dia|Last_Transaction_Date|      Real_Balance|
+-----------+-----------------+----------------------+---------------------+------------------+
|00000000001|           523.86|     891.3200073242188|           03/04/2022|1415.1800073242189|
|00000000002|           917.78|      548.930004119873|           03/04/2022| 1466.710004119873|
|00000000003|           321.84|   -309.27001190185547|           03/04/2022|12.569988098144506|
|00000000004|           271.51|    241.61999130249023|           02/04/2022| 513.1299913024902|
|00000000005|           225.55|    257.13999938964844|           02/04/2022|482.68999938964845|
|00000000006|            875.5|      444.989990234375|           02/04/2022| 1320.489990234375|
|00000000007|           365.88|    426.53001403808594|           02/04/2022| 792.4100140380859|
|00000000008|           832.63|    190.3

In [22]:
final_balance.write.csv("data/final_balance.csv", header=True, mode="overwrite")

In [26]:
consolidated_balance.write.csv("data/consolidated_balance.csv", header=True, mode="overwrite")

In [9]:
spark.stop()