In [None]:
# AQUI COMEÇA A CONSTRUÇÂO DA CAMADA silver
from pyspark.sql import functions as f

# Funcao
def create_table(table_name:str, has_input:bool, in_fld:str='/FileStore/tables/raw_file/delta'):
    if has_input:
        # Deleta a tabela caso exista
        spark.sql(f"""
        DROP TABLE IF EXISTS {table_name}""")
        # cria uma nova tabela com os dados delta tratados anteriormente
        spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {table_name}
        USING DELTA
        LOCATION '{in_fld}'
        """)
        # Por termos uma serie de arquivos pequenos, 'e necessario
        # rodar o optimize para reduzir arquivos pequenos em maiores
        spark.sql("""
        OPTIMIZE default.bronze
        """)
    else:
        # Deleta a tabela caso exista
        spark.sql(f"""
        DROP TABLE IF EXISTS {table_name};
        """)
        # cria uma nova tabela com os dados delta tratados anteriormente
        spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {table_name}
            (
            dateTimeReference timestamp,
            modifiedDate timestamp,
            idRetailerSKU string,
            seller string,
            retailerProductCode string,
            retailerAverageRating float,
            retailerRatingCount int,
            retailerPrice float, 
            manufacturerPrice float, 
            priceVariation float,
            available boolean,
            unavailable boolean,
            notListed boolean,
            titleFlag boolean,
            titlePercentage float
            );
        """)


In [None]:
try:
    # Carrega as informações da camada bronze e reestrutura para 
    # que cada elemento dentro de assortment seja uma coluna
    df = spark.sql("select * from default.bronze")
    df_2 = df.withColumn('dateTimeReference', f.explode(f.col('assortment.dateTimeReference')))\
            .withColumn('modifiedDate', f.explode(f.col('assortment.modifiedDate')))\
            .withColumn('idRetailerSKU', f.explode(f.col('assortment.idRetailerSKU')))\
            .withColumn('seller', f.explode(f.col('assortment.seller')))\
            .withColumn('retailerProductCode', \
                        f.explode(f.col('assortment.retailerProductCode')))\
            .withColumn('retailerAverageRating', \
                        f.explode(f.col('assortment.retailerAverageRating')))\
            .withColumn('retailerRatingCount', \
                        f.explode(f.col('assortment.retailerRatingCount')))\
            .withColumn('retailerPrice', f.explode(f.col('assortment.retailerPrice')))\
            .withColumn('manufacturerPrice', f.explode(f.col('assortment.manufacturerPrice')))\
            .withColumn('priceVariation', f.explode(f.col('assortment.priceVariation')))\
            .withColumn('available', f.explode(f.col('assortment.available')))\
            .withColumn('unavailable', f.explode(f.col('assortment.unavailable')))\
            .withColumn('notListed', f.explode(f.col('assortment.notListed')))\
            .withColumn('titleFlag', f.explode(f.col('assortment.titleFlag')))\
            .withColumn('titlePercentage', f.explode(f.col('assortment.titlePercentage')))\
            .drop(f.col('assortment'))
                        
    # Essa parte é feita a tipagem das colunas
    # TODO - Remover pois a tipagem já foi feita na camada bronze                    
    df_2 = df_2.select(f.col('dateTimeReference')
                     , f.col('modifiedDate')
                     , f.col('idRetailerSKU')
                     , f.col('seller')
                     , f.col('retailerProductCode')
                     , f.col('retailerAverageRating')
                     , f.col('retailerRatingCount')
                     , f.col('retailerPrice')
                     , f.col('manufacturerPrice')
                     , f.col('priceVariation')
                     , f.col('available')
                     , f.col('unavailable')
                     , f.col('notListed')
                     , f.col('titleFlag')
                     , f.col('titlePercentage')
                    )
                        
except Exception as e:
    print("Nao foi possivel fazer a carga da tabela" + '\n' + f'Erro : {e}')
else:
    # Monitoramento dos dados
    # Nessa parte há a validação das informações antes e depois da modificação     
    # para monitorar se houve alguma perda de dado
    if df.count() - df_2.count() == 0:
        # se não houve, a gente cria a tabela atraves da função descrita
        # e posteriormente salva os valores na camada silver
        create_table(table_name='default.silver', has_input=False)
        df_2.write.format("delta").mode('overwrite').saveAsTable('default.silver')
        print('Dados carregados com sucesso!')
    # Caso haja inconsistencia dos dados, não salva a tabela e emite a divergência 
    # encontrada
    else: 
        print(f'Incosistencia nos dados da carga. \nTendo {df_2.count()} linhas na camada \
              silver e {df.count()} linhas na camada bronze')