# Projeto Final | Big Data
----
**Desenvolvimento e Avaliação de uma Arquitetura Distribuída para um Relatório de Saldo Mensal da Conta**

## Load Functions and variables

Funções para testar a qualidade dos dados (Great Expectations)

In [None]:
%run ./modules/data-quality

Loading function *create_path* to create folders

In [None]:
%run ./modules/utils

Strings json para criação do schema dos campos atráves da StructType (Bronze e Silver)

In [None]:
%run ./modules/json_strings

In [None]:
%run ./modules/json_strings_silver

## Load bibs

In [None]:
import zipfile
import os
import json

from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DecimalType, LongType, DataType, TimestampType, DoubleType
from pyspark.sql.functions import expr, last_day, col, min, max, to_date, current_date, sum, lit, concat, lpad

## Load Paths and create dirs

In [None]:
source_path = '/FileStore/project_report_balance/'

landing_path = source_path + 'landing/'
bronze_path = source_path + 'bronze/'
silver_path = source_path + 'silver/'
gold_path = source_path + 'gold/'

path_list = [source_path, landing_path, bronze_path, silver_path, gold_path]

In [None]:
dbutils.fs.rm(bronze_path, True)
dbutils.fs.rm(silver_path, True)
dbutils.fs.rm(gold_path, True)

In [None]:
# create dirs
for path in path_list:
    create_path(path)

## Landing Zone

In [None]:
dir_path_list = ['accounts', 'city', 'country', 'customers', 'd_month', 'd_time', 'd_week', 'd_weekday', 'd_year', 'pix_movements', 'state', 'transfer_ins', 'transfer_outs']

In [None]:
for dir_path in dir_path_list:
    dbutils.fs.mkdirs(landing_path + dir_path)

## Bronze layer

In [None]:
## Carregamento dos dados da camada Bronze em parquet com schema definido 

for dir_name, json_str in zip(dir_path_list, json_str_list):
    print(f'Criando dir {dir_name} na camada Bronze')

    dir_path = landing_path + dir_name
    csv_file_path = [arquivos.path for arquivos in dbutils.fs.ls(dir_path) if arquivos.name.endswith(('.csv', '.CSV'))]

    print(f'Salvando dados em parquet no dir {dir_name} com schema definido')

    # Loading json schema to create tables
    schema_json = StructType.fromJson(json.loads(json_str))

    df_csv = (spark.read.csv(csv_file_path[0], sep=',',header=True, schema = schema_json))

    path_dir_bronze = bronze_path + dir_name
    (df_csv
        .write
        .option("compression","snappy")
        .mode("overwrite")
        .parquet(path_dir_bronze))
    print('Dados salvos! \n')

## Silver layer

#### Data Quality (Great Expectations)

###### Verificação de tipo

In [None]:
tables_type = 'accounts'

# Carregando dataframe
table_path = bronze_path + tables_type
df_table = spark.read.parquet(table_path) 

# Converter o DataFrame Spark em um DataFrame Great Expectations
ge_df_table = SparkDFDataset(df_table)

# Verificando os tipos das colunas
print(f'Tabela analisada: {tables_type} - COLUNAS DATETIME')
colunas_datetime = ['created_at']
verifica_colunas_datetime(ge_df_table, colunas_datetime)

###### Verificação de colunas categóricas

In [None]:
tables_cat = {
    'accounts': {
        'status': ["active", 'inactive']
    }, 
    'pix_movements': {
        'status': ["failed", 'completed'],
        'in_or_out': ["pix_in", 'pix_out']
    }
}

In [None]:
for table, cols in tables_cat.items():
    print(f'>>> Tabela analisada: {table}')
    # Carregando dataframe
    table_path = bronze_path + table
    df_table = spark.read.parquet(table_path) 

    # Converter o DataFrame Spark em um DataFrame Great Expectations
    ge_df_table = SparkDFDataset(df_table)

    for col_analisada, valores_esperados in cols.items():
        verificar_colunas_categoricas(ge_df_table, col_analisada, valores_esperados)

###### Verificação de colunas ID

In [None]:
tables_id = {
    'accounts': ['account_id', 'customer_id']
    , 'city': ["state_id","city_id"]
    , 'country': ["country_id"]
    , 'customers': ["customer_id"] 
    , 'pix_movements': ["id",'account_id']
}

In [None]:
for table, id_cols_list in tables_id.items():
    print(f'>>> Tabela analisada: {table}')
    # Carregando dataframe
    table_path = bronze_path + table
    df_table = spark.read.parquet(table_path) 

    # Converter o DataFrame Spark em um DataFrame Great Expectations
    ge_df_table = SparkDFDataset(df_table)

    verificar_colunas_id(ge_df_table, id_cols_list)

###### Verificação de colunas não vazias

In [None]:
tables_non_empty  = {
    'accounts': ['account_id', 'customer_id', 'account_branch', 'account_check_digit', 'account_number']
    , 'city': ['city']
    , 'country': ['country']
    , 'customers': ['first_name', 'last_name', 'country_name', 'customer_city','cpf']
    , 'pix_movements': ['account_id', 'id', 'pix_amount', 'pix_requested_at','pix_completed_at']
}

In [None]:
for table, cols_no_empty in tables_non_empty.items():
    print(f'>>> Tabela analisada: {table}')
    # Carregando dataframe
    table_path = bronze_path + table
    df_table = spark.read.parquet(table_path) 

    # Converter o DataFrame Spark em um DataFrame Great Expectations
    ge_df_table = SparkDFDataset(df_table)

    verificar_colunas_com_none(ge_df_table, cols_no_empty)

###### Verificação de valores MIN e MAX

In [None]:
tables_min_max  = {
    'customers': [['cpf'],  0, 99999999999]
    , 'pix_movements': [['pix_amount'],  0, 10000]
}

In [None]:
for table, values in tables_min_max.items():
    print(f'>>> Tabela analisada: {table}')
    # Carregando dataframe
    table_path = bronze_path + table
    df_table = spark.read.parquet(table_path) 

    # Converter o DataFrame Spark em um DataFrame Great Expectations
    ge_df_table = SparkDFDataset(df_table)

    list_of_columns, minimo, maximo = values[0], values[1], values[2]
    verificar_valores_min_max(ge_df_table, list_of_columns, minimo, maximo)


#### Creating Silver layer

In [None]:
%sql
CREATE SCHEMA IF NOT EXISTS silver LOCATION '/FileStore/project_report_balance/silver'

#### Creating Silver tables with StructType object from JSON file

In [None]:
lista_df_silver = {}

In [None]:
for dir_name, json_str in zip(dir_path_list, json_str_list):
    print('Criando dataframe: ' + dir_name)

    path_dir_bronze = bronze_path + dir_name

    # Loading json schema to create tables
    schema_json = StructType.fromJson(json.loads(json_str))

    df_parquet = (spark.read.parquet(path_dir_bronze, sep = ',', header = True, schema = schema_json))
    lista_df_silver[dir_name] = df_parquet

    df_parquet.createOrReplaceTempView(dir_name)


#### Creating Table silver.d_accounts

In [None]:
df_accounts = spark.read.table('accounts')
df_accounts = df_accounts.withColumn('account', concat(col('account_branch'),col('account_check_digit'), col('account_number')))
df_accounts = df_accounts.select(['account_id', 'status','account','created_at'])


schema_json = StructType.fromJson(json.loads(json_str_list_silver[0]))

path_dir_silver_d_account = silver_path + 'd_account'

(df_accounts
    .write
    .saveAsTable('silver.d_accounts', compression = "snappy", mode = "overwrite", path = path_dir_silver_d_account, schema = schema_json)
)

#### Creating Table silver.f_movements


In [None]:
df_d_time = spark.read.table('d_time').select(['time_id', 'action_timestamp'])
df_d_time = df_d_time.withColumn('ultimo_dia_mes', last_day('action_timestamp'))
df_d_time = df_d_time.select(['time_id', 'ultimo_dia_mes'])    

# pix
df_pix = spark.read.table('pix_movements').select(['account_id','pix_amount','in_or_out','status','pix_requested_at', 'pix_completed_at'])
df_pix = df_pix.join(df_d_time.alias('d_time').withColumnRenamed('ultimo_dia_mes','requested_at')
            , col('d_time.time_id') == col('pix_movements.pix_requested_at')
            , 'inner'
    ).join(df_d_time.alias('d_time_completed').withColumnRenamed('ultimo_dia_mes','completed_at')
            , col('d_time_completed.time_id') == col('pix_movements.pix_completed_at')
            , 'left'
    )
df_pix = df_pix.withColumnRenamed('pix_amount','amount')
df_pix = df_pix.select(['account_id','amount', 'in_or_out', 'status','requested_at','completed_at'])

#transfer in
df_transfer_ins = spark.read.table('transfer_ins').select(['account_id','amount','status','transaction_requested_at', 'transaction_completed_at'])
df_transfer_ins = df_transfer_ins.withColumn('in_or_out', lit('transfer_in'))
df_transfer_ins = df_transfer_ins.join(df_d_time.alias('d_time').withColumnRenamed('ultimo_dia_mes','requested_at')
            , col('d_time.time_id') == col('transfer_ins.transaction_requested_at')
            , 'inner'
    ).join(df_d_time.alias('d_time_completed').withColumnRenamed('ultimo_dia_mes','completed_at')
            , col('d_time_completed.time_id') == col('transfer_ins.transaction_completed_at')
            , 'left'
    )
df_transfer_ins = df_transfer_ins.select(['account_id','amount', 'in_or_out', 'status','requested_at','completed_at'])

# transfer outs
df_transfer_outs = spark.read.table('transfer_outs').select(['account_id','amount','status','transaction_requested_at', 'transaction_completed_at'])
df_transfer_outs = df_transfer_outs.withColumn('in_or_out', lit('transfer_out'))
df_transfer_outs = df_transfer_outs.join(df_d_time.alias('d_time').withColumnRenamed('ultimo_dia_mes','requested_at')
            , col('d_time.time_id') == col('transfer_outs.transaction_requested_at')
            , 'inner'
    ).join(df_d_time.alias('d_time_completed').withColumnRenamed('ultimo_dia_mes','completed_at')
            , col('d_time_completed.time_id') == col('transfer_outs.transaction_completed_at')
            , 'left'
    )

df_transfer_outs = df_transfer_outs.select(['account_id','amount', 'in_or_out', 'status','requested_at','completed_at'])

df_f_movements = df_pix.unionAll(df_transfer_ins).unionAll(df_transfer_outs)

schema_json_f_movements = StructType.fromJson(json.loads(json_str_list_silver[1]))
path_dir_silver_f_movements= silver_path + 'f_movements'

(df_f_movements
    .write
    .saveAsTable('silver.f_movements', compression = "snappy", mode = "overwrite", path = path_dir_silver_f_movements, schema = schema_json_f_movements)
)

#### Creating Table silver.d_calendar

In [None]:
d_year_df = spark.read.table("d_year").drop(col("year_id")).dropDuplicates(['action_year'])
d_month_df = spark.read.table("d_month").drop(col("month_id")).dropDuplicates(['action_month'])
d_calendar_df = d_year_df.join(d_month_df)
d_calendar_df = d_calendar_df.withColumnRenamed("action_year","year").withColumnRenamed("action_month","month")

d_calendar_df = d_calendar_df.withColumn("ultimo_dia_mes", last_day(to_date(concat(d_calendar_df["year"], lit("-"), lpad(d_calendar_df["month"],2,"0"), lit("-"), lit("01")), "yyyy-MM-dd"))) 

d_calendar_df = d_calendar_df.alias("d_calendar")
schema_df_d_calendar = StructType([
    StructField("year", IntegerType(), False)
    , StructField("month", IntegerType(), False)
    , StructField("ultimo_dia_mes", DataType(), False)
])

d_calendar_path = silver_path + 'd_calendar'

(d_calendar_df
    .write
    .saveAsTable('silver.d_calendar', compression="snappy", mode="overwrite", path=d_calendar_path, schema=schema_df_d_calendar)
)

In [None]:
%sql
SELECT * FROM silver.d_calendar LIMIT 5

## Gold layer

### Tabela/View 1: Lancamentos por mes 

In [None]:
# Tabela/View 1 - Lancamentos por mes
# Consulta ira criar uma tabela/view que vai apresentar as soma das entradas e saidas por mês e conta.
sql_lancamentos_por_mes = " \
   SELECT \
      account_id \
      , ultimo_dia_mes \
      , SUM(valor_saida) AS VALOR_SAIDA \
      , SUM(valor_entrada) AS VALOR_ENTRADA \
   FROM ( \
      SELECT account_id \
         , requested_at ultimo_dia_mes \
         , SUM(amount) valor_saida \
         , 0 valor_entrada \
      FROM silver.f_movements \
      WHERE status = 'completed' \
         and in_or_out in ('pix_out', 'transfer_out') \
      GROUP BY account_id \
         , requested_at \
      UNION ALL \
      SELECT account_id \
         , completed_at ultimo_dia_mes \
         , 0 valor_saida \
         , SUM(amount) valor_entrada \
      FROM silver.f_movements \
      WHERE status = 'completed' \
         and in_or_out in ('pix_in', 'transfer_in') \
         and completed_at is not NULL \
      GROUP BY account_id \
         , completed_at \
   ) lancamentos_por_mes \
   GROUP BY account_id, ultimo_dia_mes \
"
 
df_lancamentos_por_mes = spark.sql(sql_lancamentos_por_mes)
df_lancamentos_por_mes.createOrReplaceTempView("lancamentos_por_mes")


### Tabela/View 2: Total Por Mes

In [None]:
   # Tabela/View 2 - Total Por Mes
   # Essa consulta sql vai gerar uma tabela/view que vai listar a soma de todas as entrada e saida até o mes de analise.
   sql_total_por_mes = "\
      SELECT \
         d_calendar.ultimo_dia_mes As ultimo_dia_mes \
         , lancamentos_por_mes.account_id \
         , SUM(lancamentos_por_mes.VALOR_ENTRADA) TOTAL_ENTRADA \
         , SUM(lancamentos_por_mes.VALOR_SAIDA) TOTAL_SAIDA  \
          \
      FROM silver.d_calendar \
         LEFT JOIN lancamentos_por_mes  \
            ON lancamentos_por_mes.ultimo_dia_mes <= d_calendar.ultimo_dia_mes \
 \
      GROUP BY d_calendar.ultimo_dia_mes \
         , lancamentos_por_mes.account_id \
   "

df_total_por_mes =  spark.sql(sql_total_por_mes)    
df_total_por_mes.createOrReplaceTempView('total_por_mes') 

### Tabela/View 3: Acumulado por mes

In [None]:
# Tabela/View 3 - Acumulado por mes
# Essa tabela apresenta o join das duas tabelas anteriores, e um calculo de saldo final por mes e conta.
sql_acumulado_por_mes = "\
    SELECT \
        d_accounts.account_id \
        , d_calendar.ultimo_dia_mes As ultimo_dia_mes \
        , lancamentos_por_mes.VALOR_ENTRADA TOTAL_ENTRADA \
        , lancamentos_por_mes.VALOR_SAIDA TOTAL_SAIDA \
        , COALESCE(total_por_mes.TOTAL_ENTRADA,0) - COALESCE(total_por_mes.TOTAL_SAIDA,0) AS SALDO_FINAL \
    FROM silver.d_accounts \
    INNER JOIN silver.d_calendar \
        ON d_calendar.ultimo_dia_mes >= TO_DATE(d_accounts.created_at, 'yyyy-MM-dd') \
    LEFT JOIN lancamentos_por_mes \
        ON lancamentos_por_mes.account_id = d_accounts.account_id \
        AND lancamentos_por_mes.ultimo_dia_mes = d_calendar.ultimo_dia_mes \
    LEFT JOIN total_por_mes \
        ON total_por_mes.account_id = d_accounts.account_id \
        AND total_por_mes.ultimo_dia_mes = d_calendar.ultimo_dia_mes \
"
df_acumulado_por_mes = spark.sql(sql_acumulado_por_mes)
df_acumulado_por_mes.createOrReplaceTempView("acumulado_por_mes")

### Tabela/View Final: Saldo por mensal

In [None]:
# Tabela Final - Saldo Mensal - agg_saldo_mensal
# Esse select vai aprensentar o saldo mensal final de cada conta e as entradas e saida de cada mês.
sql_saldo_mensal = "\
   SELECT \
       d_accounts.account_id \
       , d_accounts.account \
       , acumulado_por_mes.ultimo_dia_mes \
       , FORMAT_NUMBER(coalesce(acumulado_por_mes.total_entrada,0), 2) total_entrada \
       , FORMAT_NUMBER(coalesce(acumulado_por_mes.total_saida,0), 2) total_saida \
       , FORMAT_NUMBER(coalesce(acumulado_por_mes.saldo_final,0), 2) saldo_final \
   FROM silver.d_accounts \
      LEFT JOIN acumulado_por_mes \
         ON acumulado_por_mes.account_id = d_accounts.account_id \
   "
df_agg_saldo_mensal = spark.sql(sql_saldo_mensal)
df_agg_saldo_mensal = df_agg_saldo_mensal.alias('saldo_mensal')



#### Salvando o dataframe em tabela

In [None]:
%sql
CREATE SCHEMA IF NOT EXISTS gold LOCATION '/FileStore/project_report_balance/gold'

In [None]:
schema_gold_agg_saldo_mensal = StructType([
    StructField("account_id", LongType(), False),
    StructField("account", StringType(), False),    
    StructField("ultimo_dia_mes", DataType(), False),
    StructField("valor_entrada", DoubleType(), False),
    StructField("valor_saida", DoubleType(), False),
    StructField("saldo_final", DoubleType(), False)
])


agg_saldo_mensal_path = gold_path + 'agg_saldo_mensal'
(df_agg_saldo_mensal
    .write
    .saveAsTable('gold.agg_saldo_mensal', compression="snappy", mode="overwrite", path=agg_saldo_mensal_path, schema=schema_gold_agg_saldo_mensal)
)

In [None]:
%sql 
SELECT * FROM gold.agg_saldo_mensal LIMIT 100

### Exemplos de Accounts e Saldo Mensal

#### Account com algum registro de entrada igual a zero

In [None]:
%sql 
SELECT * FROM gold.agg_saldo_mensal where account_id = 1910868644230470 order by account_id, ultimo_dia_mes

#### Account com algum mes com entrada e saida zerado, ou seja, sem movimentacao no mês

In [None]:
%sql 
SELECT * FROM gold.agg_saldo_mensal where account_id = 100642855136823056 order by account_id, ultimo_dia_mes

#### Account com algum mês com saldo negativo

In [None]:
%sql 
SELECT * FROM gold.agg_saldo_mensal where account_id = 1972174676324008704 order by account_id, ultimo_dia_mes