# Inicialização

O script começa importando as bibliotecas necessárias e definindo os esquemas de leitura para os arquivos de saldo inicial e movimentações diárias.

Uma sessão Spark é criada para permitir a leitura e processamento dos dados.

In [1]:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from datetime import datetime, timedelta

DATA_PATH = "data/"
BALANCE_FILE = "tabela_saldo_inicial.txt"
MOV_FILE = "movimentacao_dia_{}.txt"

spark = SparkSession.builder.appName("SaldoClientes").getOrCreate()
spark

# Leitura dos Dados Iniciais

O saldo inicial é lido de um arquivo CSV. O schema define os tipos de dados esperados para cada coluna.

In [2]:
schema_saldo_inicial = "Nome STRING, CPF STRING, Saldo_Inicial_CC FLOAT, data STRING"
schema_movimentacao = "Nome STRING, CPF STRING, Movimentacao_dia FLOAT, data_mov STRING"

In [3]:
balance_df = spark.read.csv(f"{DATA_PATH}{BALANCE_FILE}", header=True, schema=schema_saldo_inicial, sep=";")
balance_df.show()

+-------+-----------+----------------+----------+
|   Nome|        CPF|Saldo_Inicial_CC|      data|
+-------+-----------+----------------+----------+
|  Maria|00000000001|          523.86|01/04/2022|
|   José|00000000002|          917.78|01/04/2022|
|   João|00000000003|          321.84|01/04/2022|
|  Paulo|00000000004|          271.51|01/04/2022|
|  Pedro|00000000005|          225.55|01/04/2022|
|Antonio|00000000006|           875.5|01/04/2022|
| Marcos|00000000007|          365.88|01/04/2022|
|   Luiz|00000000008|          832.63|01/04/2022|
| Arthur|00000000009|          221.12|01/04/2022|
+-------+-----------+----------------+----------+



# Preparação do processamento e DataFrame de Saldo Final

Inicia-se um DataFrame com o saldo inicial que será atualizado ao longo das movimentações diárias.

In [4]:
# Data inicial
initial_date_str = balance_df.select("data").first()[0].replace('/', '_')
initial_date = datetime.strptime(initial_date_str, "%d_%m_%Y")

# Nome da coluna de saldo final atual
current_balance_column = f"Saldo_Final_{initial_date_str}"

# Inicializar o DataFrame final com saldo inicial
final_df = balance_df.select(["CPF", "Saldo_Inicial_CC"]).withColumnRenamed("Saldo_Inicial_CC", current_balance_column)

# Funções de Processamento

Duas funções são definidas para atualizar os saldos com base nas movimentações diárias e aplicar estornos:

`atualizar_saldo`:

Atualiza o saldo final com base nas movimentações diárias.

In [5]:
# Função para atualizar saldos com base nas movimentações diárias
def atualizar_saldo(saldo_df, mov_df, current_balance_column, current_date_str):
    movimentacao = mov_df.filter(F.col(f"data_mov_{current_date_str}") == current_date_str.replace('_', '/'))
    mov_agrupada = movimentacao.groupBy("CPF", f"data_mov_{current_date_str}").agg(F.sum("Movimentacao_dia").alias(f"Total_mov_{current_date_str}")).orderBy("CPF")

    saldo_atualizado = saldo_df.join(mov_agrupada, on="CPF", how="fullouter").na.fill(0)
    saldo_atualizado = saldo_atualizado.withColumn("Saldo_Final", saldo_atualizado[current_balance_column] + saldo_atualizado[f"Total_mov_{current_date_str}"])

    return saldo_atualizado

`aplicar_estornos`:

Aplica estornos em movimentações de dias anteriores.

In [6]:
# Função para aplicar estornos nas movimentações de dias anteriores
def aplicar_estornos(saldo_df, mov_df, current_date_str):
    estornos = mov_df.filter(F.col(f"data_mov_{current_date_str}") < current_date_str)
    
    for estorno_row in estornos.collect():
        cpf = estorno_row['CPF']
        estorno_date = estorno_row[f'data_mov_{current_date_str}']
        estorno_valor = estorno_row['Movimentacao_dia']

        estorno_balance_column = f"Saldo_Final_{estorno_date.replace('/', '_')}"

        if estorno_balance_column in saldo_df.columns:
            saldo_df = saldo_df.withColumn(
                estorno_balance_column,
                F.when(saldo_df["CPF"] == cpf, saldo_df[estorno_balance_column] + estorno_valor)
                .otherwise(saldo_df[estorno_balance_column])
            )
    return saldo_df

# Processamento das Movimentações Diárias

O script itera sobre os arquivos de movimentações diárias e aplica as atualizações de saldo e estornos conforme necessário de maneira dinâmica.

O progresso das movimentações e estornos são atualizados e mostrados a cada iteração.

In [7]:
number_of_files = len([file for file in os.listdir(DATA_PATH) if os.path.isfile(os.path.join(DATA_PATH, file))]) - 1

# Iterar sobre os dias e processar as movimentações
for i in range(1, number_of_files + 1):
    current_date = initial_date + timedelta(days=i)
    current_date_str = current_date.strftime("%d_%m_%Y")
    
    current_mov_file = DATA_PATH + MOV_FILE.format(current_date_str)
    
    if os.path.exists(current_mov_file):
        mov_df = spark.read.csv(current_mov_file, header=True, schema=schema_movimentacao, sep=";")
        mov_df = mov_df.withColumnRenamed("data_mov", f"data_mov_{current_date_str}")
        
        final_df = aplicar_estornos(final_df, mov_df, current_date_str)
        final_df = atualizar_saldo(final_df, mov_df, current_balance_column, current_date_str)

        current_balance_column = f"Saldo_Final_{current_date_str}"
        
        final_df = final_df.withColumnRenamed("Saldo_Final", f"Saldo_Final_{current_date_str}")
        print(f"Saldo final atualizado para o dia {current_date_str}:")
        final_df.orderBy("CPF").show()

Saldo final atualizado para o dia 02_04_2022:
+-----------+----------------------+-------------------+--------------------+----------------------+
|        CPF|Saldo_Final_01_04_2022|data_mov_02_04_2022|Total_mov_02_04_2022|Saldo_Final_02_04_2022|
+-----------+----------------------+-------------------+--------------------+----------------------+
|00000000001|                523.86|               NULL|                 0.0|     523.8599853515625|
|00000000002|                917.78|               NULL|                 0.0|      917.780029296875|
|00000000003|                321.84|               NULL|                 0.0|     321.8399963378906|
|00000000004|                271.51|         02/04/2022|  241.61999130249023|     513.1300010681152|
|00000000005|                225.55|         02/04/2022|    87.6300048828125|     313.1800079345703|
|00000000006|                 875.5|         02/04/2022|   283.7699890136719|    1159.2699890136719|
|00000000007|                365.88|         

# Exibição do Resultado Final

Após processar todos os arquivos, o saldo final acumulado é exibido.

In [8]:
final_df = final_df.orderBy("CPF")
print("Resultado final acumulado:")
final_df.select("CPF", f"Saldo_Final_{current_date_str}").show()

Resultado final acumulado:
+-----------+----------------------+
|        CPF|Saldo_Final_03_04_2022|
+-----------+----------------------+
|00000000001|    1415.1799926757812|
|00000000002|     1466.710033416748|
|00000000003|    12.569984436035156|
|00000000004|     513.1300010681152|
|00000000005|    482.69000244140625|
|00000000006|     1320.489990234375|
|00000000007|     792.4100189208984|
|00000000008|    1022.9900054931641|
|00000000009|     221.1199951171875|
|00000000010|     325.6099739074707|
|00000000011|     768.3599853515625|
+-----------+----------------------+



# Encerramento da Sessão Spark

A sessão Spark é encerrada para liberar os recursos.

In [9]:
spark.stop()