# Notebook responsável pela carga de dados da tabela movimento_flat

### Importação das libs necessárias

In [None]:
import pyspark
import os
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import when

### Carrega as funções utilizadas no etl

In [102]:
%run ./utils.ipynb

### Define o diretório que o usuário quer salvar o arquivo

In [103]:
caminho_arquivo = 'output/tabela'

### Cria uma sessão do spark

In [104]:
spark = SparkSession.builder.appName('desafio').getOrCreate()

### Busca os dados do banco de dados

In [None]:
df_associado = read_table('associado')
df_conta = read_table('conta')
df_cartao = read_table('cartao')
df_movimento = read_table('movimento')

### Faz o cruzamento dos dados e seleciona as colunas que vão compor o arquivo 

In [106]:
df_movimento_flat = (
    df_movimento
    .join(df_cartao, df_cartao['id'] == df_movimento["id_cartao"])
    .join(df_conta, df_conta['id'] == df_cartao["id_conta"])
    .join(df_associado, df_associado['id'] == df_cartao["id_associado"])
    .select(
        df_associado['nome'].alias('nome_associado'),
        df_associado['sobrenome'].alias('sobrenome_associado'),
        df_associado['data_nascimento'],
        df_movimento['vlr_transacao'].alias('vlr_transacao_movimento'),
        df_movimento['des_transacao'].alias('des_transacao_movimento'),
        df_movimento['data_movimento'],
        df_cartao['num_cartao'],
        df_cartao['nom_impresso'].alias('nome_impresso_cartao'),
        df_cartao['data_criacao_cartao'],
        when(df_conta['tipo'] == 1, "Conta Corrente").otherwise("Conta Salário").alias('tipo_conta'),        
        df_conta['data_criacao'].alias('data_criacao_conta')        
    )
)

### Faz a persistência dos dados em um arquivo temporário (Necessário para deixar o arquivo flat final com o nome personalizado)

In [107]:
caminho_temp = 'output/temp'

#Executa com coalesce(1) para gravar em apenas 1 arquivo
df_movimento_flat.coalesce(1).write.mode("overwrite").option("header", "true").csv(caminho_temp)

### Ajusta para o nome final do arquivo juntamente no diretório que o usuário selecionou

In [108]:
final_csv = f"{caminho_arquivo}/movimento_flat.csv"

### Move o arquivo do diretório temporário para o diretório final, alterando o nome do arquivo

In [109]:
#Pega o arquivo gerado pelo Spark e renomeia para movimento_flat
for file_name in os.listdir(caminho_temp):
    if file_name.startswith("part-") and file_name.endswith(".csv"):
        temp_file = os.path.join(caminho_temp, file_name)
        shutil.move(temp_file, final_csv)
        break

### Finaliza a sessão do spark

In [110]:
spark.stop()