##### Ingestão do extrato do cartão de crédito em .csv da landing layer para a bronze layer

01. Utilizar o spark.read.text() para coletar dados do cartão de crédito
02. Criar um dicionário com a relação do file_name: card_name
03. Definir schemas como StringType()
04. Realizar a leitura do dataframe com spark.read()
05. Alterar colunas para snake_case
06. Filtrar apenas datas com regex
07. Adicionar ingestion_timestamp e ingestion_date
08. Adicionar coluna com card_name
09. Salvar com mode overwrite na bronze layer: personalfinance.bronze.gastos

In [0]:
%run "./support"

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, lit, trim, udf, to_date, upper, regexp_extract, count

In [0]:
dict_file_card = {}

for file in dbutils.fs.ls('/Volumes/personalfinance/landing/vol_landing/bradesco'):
    df = spark.read.text(file.path)
    card_final = df.collect()[4][0]
    card_final = str(card_final).split(';')[-1]
    card_final = card_final.strip()

    status_fatura = df.filter(df.value.contains('* Fatura em Aberto:')).count()
    status_fatura = 'aberta' if status_fatura > 0 else 'fechada'

    dict_file_card[file.name] = (card_final, status_fatura)

In [0]:
df_cards = spark.createDataFrame(
    [(file_name, card_final[0], card_final[1]) for file_name, card_final in dict_file_card.items()],
    ['file_name', 'card_final', 'status_fatura'])
    
df_cards.show()

In [0]:
schema_gastos = StructType(fields=[
    StructField('Data', StringType(), True),
    StructField('Histórico', StringType(), True),
    StructField('Valor(US$)', StringType(), True),
    StructField('Valor(R$)', StringType(), True),
])

In [0]:
df = spark.read \
    .format('csv') \
    .option('delimiter', ';') \
    .option('skipRows', 5) \
    .option('header', 'true') \
    .option('encoding', 'ISO-8859-1') \
    .schema(schema_gastos) \
    .load('dbfs:/Volumes/personalfinance/landing/vol_landing/bradesco')

In [0]:
replace_colunas = {
    'Data': 'data_da_compra',
    'Histórico': 'descricao',
    'Valor(US$)': 'valor_usd',
    'Valor(R$)': 'valor_brl'
}

In [0]:
df = df.withColumnsRenamed(replace_colunas)

In [0]:
df = add_ingestion_timestamp(df)
df = add_file_metadata(df)

In [0]:
df = df.join(df_cards, on='file_name', how='left')

In [0]:
colunas_ordem = [
 'card_final',
 'data_da_compra',
 'descricao',
 'valor_usd',
 'valor_brl',
 'status_fatura',
 'file_name',
 'file_path',
 'ingestion_timestamp']

df = df.select(*colunas_ordem)

In [0]:
df.write \
    .format('delta') \
    .mode('overwrite') \
    .option('overwriteSchema', 'true') \
    .saveAsTable('personalfinance.bronze.gastos_bradesco')