In [0]:
import sys
sys.path.append("../scripts") 

import os
from dotenv import load_dotenv
from datetime import datetime
from pyspark.sql import functions as F
from transformacoes import tratamento_uniao_df

In [0]:
load_dotenv()

DEFAULT_PATH = os.getenv("DEFAULT_PATH")
LOG_PATH = os.getenv("LOG_PATH")

In [0]:
log_messages = []
log_messages.append(f"{datetime.now()} - INFO - Iniciando ingestão")

In [0]:
_current_datetime = dbutils.widgets.get('current_datetime')
_rate = dbutils.widgets.get('rate')
log_messages.append(f"{datetime.now()} - INFO - Rate: {_rate}")
log_messages.append(f"{datetime.now()} - INFO - Current Datetime: {_current_datetime}")

current_date = datetime.today().strftime('%Y-%m-%d')

raw_path = f"{DEFAULT_PATH}/Raw/{current_date}.json"
silver_path = f"{DEFAULT_PATH}/Silver/{current_date}.parquet"

log_messages.append(f"{datetime.now()} - INFO - Raw Path: {raw_path}")
log_messages.append(f"{datetime.now()} - INFO - Silver Path: {silver_path}")

In [0]:
df = spark.read.json(raw_path)
df_moedas = spark.table('python_teste.mbaengenhariadedados.moedas')

log_messages.append(f"{datetime.now()} - INFO - Lendo dados do arquivo")


In [0]:

# df = df.withColumn('base_currency', F.lit(_rate).cast('string'))\
#        .withColumn('timestamp', F.lit(_current_datetime).cast('timestamp'))\
#        .withColumnRenamed('currency', 'moeda')\
#        .withColumnRenamed('value', 'taxa')\
#        .withColumn('moeda', F.col('moeda').cast('string'))\
#        .withColumn('taxa', F.col('taxa').cast('double'))\
#        .filter(F.col('moeda') != _rate)\
#        .filter((F.col('taxa') > 0) & (F.col('taxa').isNotNull()))

df_tratado = tratamento_uniao_df(df, rate=_rate, current_datetime=_current_datetime)

df_join = df_tratado.join(df_moedas,
                  on=[df_tratado['moeda'] == df_moedas['sigla_moeda']],
log_messages.append(f"{datetime.now()} - INFO - Transformações realizadas")

df_join = df.join(df_moedas,
                  on=[df['moeda'] == df_moedas['sigla_moeda']],
                  how='left')\
              .drop(df_moedas['sigla_moeda'])



log_messages.append(f"{datetime.now()} - INFO - Join realizado")

In [0]:
if os.path.exists(silver_path):
       df_join.write.mode('append').format('parquet').save(silver_path)
else:
       df_join.coalesce(1).write.mode('overwrite').format('parquet').save(silver_path)

log_messages.append(f"{datetime.now()} - INFO - Escrita realizada")

log_text = "\n".join(log_messages)
dbutils.fs.put(f"{LOG_PATH}/transformacao.log", log_text, overwrite=True)
