## ETL - Gera Tabela Silver

In [None]:
%python
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from datetime import datetime
import pyspark.sql.functions as f 

import DadosIO as Db
import Function as Ut_f

date = dbutils.widgets.get('date')
process = 'ETL_TECH_CHALLENGE_4'

class ETL():
  def __init__(self, _dados_io):
    self._dados_io = _dados_io
    self._spark_session = _dados_io.spark_session()

  def generate_rules(self):
    df_schema = self.create_schema()
    df_raw = self.load_files(df_schema)
    df_acoes_disney = self.select_columns(df_schema, df_raw)
    self.create_location(df_acoes_disney)
    return df_acoes_disney

  def load_files(self, parameters_schema):
    path = f"/mnt/data/raw/projeto_fiap_4tc/{date}/*"
    return self._spark_session.read.format('csv').schema(parameters_schema).option('header', 'true').load(path)
  
  def create_schema(self):
    parameters_schema = StructType([
      StructField('Adj Close', FloatType(), True),
      StructField('Close', FloatType(), True),
      StructField('High', FloatType(), True),
      StructField('Low', FloatType(), True),
      StructField('Open', FloatType(), True),
      StructField('Volume', IntegerType(), True)
    ])
    return parameters_schema
  
  def select_columns(self, schema, df_raw):
    df_select = df_raw.select(
      f.col('Adj Close').alias('adj_cluster'),
      f.col('Close').alias('close'),
      f.col('High').alias('High'),
      f.col('Low').alias('low'),
      f.col('Open').alias('open'),
      f.col('Volume').alias('volume')
    ).withColumn("dt_carga", f.lit(date))
    df = df_select.dropDuplicates().na.drop()
    return df
  
  def create_location(self, df_acoes_disney):
    df_acoes_disney.createOrReplaceTempView("df_extracao_disney")

    tableExists = spark._jsparkSession.catalog().tableExists("sandbox.silver_acoes_disney")

    if not tableExists:
        spark.sql("""
        DROP TABLE IF EXISTS sandbox.silver_acoes_disney;
        """)
        spark.sql("""
        CREATE TABLE sandbox.silver_acoes_disney
        USING DELTA
        PARTITIONED BY (dt_carga)
        LOCATION '/mnt/data/sandbox/silver/acoes_disney'
        TBLPROPERTIES (
        'delta.deletedFileRetentionDuration' = '30 days',
        'delta.logRetentionDuration'         = 'interval 4 week'
        )
        AS SELECT *
        FROM  df_extracao_disney
        """)

    else:
        print("Tabela já existe. Considerar inserção de dados ou outra ação.")

dt_inicio = datetime.now()
dados_io = Db.DadosIO(process, delta = True)
etl = ETL(dados_io)
df_acoes_disney = etl.generate_rules()


try:
    dados_io.gravar_parquet_insert_table_delta(df = df_acoes_disney, 
                                            nome = f"sandbox.silver_acoes_disney", 
                                            modo = False, 
                                            ehParticionada=True)
except Exception as ex:
    dt_fim = datetime.now()
    log = "ERRO: {}".format(ex)

    Ut_f.gera_log_execucao(process,
                           dt_inicio.strftime('%Y-%m-%d %H:%M:%S'),
                           dt_fim.strftime('%Y-%m-%d %H:%M:%S'), str(dt_fim - dt_inicio),
                           log)
    raise Exception(ex)
else:
    dt_fim = datetime.now()
    log = 'succeeded'
    Ut_f.gera_log_execucao(process,
                           dt_inicio.strftime('%Y-%m-%d %H:%M:%S'),
                           dt_fim.strftime('%Y-%m-%d %H:%M:%S'), str(dt_fim - dt_inicio),
                           log)