In [1]:
import datetime
import pyspark.sql.functions as f
import pyspark.pandas as pd
from pyspark.sql import SparkSession

import os

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
spark.conf.set('spark.sql.legacy.timeParserPolicy', 'LEGACY')

from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))



In [2]:
# Criação da sessão spark e definição da variável com a data e hora da execução

spark = SparkSession.builder.getOrCreate()

now = datetime.datetime.now()

In [3]:
# Leitura do arquivo xlsx em um dataframe pyspark.pandas


df_pandas = pd.read_parquet('datalake/bronze/oil')

month_dict = {
    "Jan": 1, 
    "Fev": 2, 
    "Mar": 3, 
    "Abr": 4, 
    "Mai": 5,
    "Jun": 6, 
    "Jul": 7, 
    "Ago": 8, 
    "Set": 9, 
    "Out": 10, 
    "Nov": 11, 
    "Dez": 12
}

# Transformação com pd.melt para realizar unpivot parcial do dataframe
df_pandas_melt = pd.melt(df_pandas, id_vars = ["ANO", "ESTADO", "COMBUSTÍVEL", "UNIDADE"], value_vars = month_dict.keys(), var_name = "MES", value_name = "VOLUME")
df_pandas_melt = df_pandas_melt.fillna(0)
df_pandas_melt = df_pandas_melt.replace({"MES": month_dict})



In [4]:
# Transformação do df pyspark.pandas para spark

df_spark = df_pandas_melt.to_spark()

# Aplicação da formatação das colunas

df_spark = df_spark.withColumn('ANO', f.col('ANO').cast('integer'))\
                   .withColumn('MES', f.when(f.length(f.col('MES')) == 1, f.concat(f.lit('0'), f.col('MES'))).otherwise(f.col('MES')).cast('integer'))\
                   .withColumn('ANO', f.col('ANO').cast('string'))\
                   .withColumn('year_month', f.to_date(f.concat_ws('/', f.col('MES'), f.col('ANO')), 'MM/yyyy'))\
                   .withColumn('uf', f.col('ESTADO').cast('string'))\
                   .withColumn('product', f.regexp_replace('COMBUSTÍVEL', '\([^\)]*\)', '').cast('string'))\
                   .withColumn('unit', f.col('UNIDADE').cast('string'))\
                   .withColumn('volume', f.col('VOLUME').cast('double'))\
                   .withColumn('created_at', f.lit(str(now.strftime("%Y-%m-%d %H:%M:%S"))).cast('timestamp'))\
                   .select('year_month', 'uf', 'product', 'unit', 'volume', 'created_at')
                



In [5]:
# Verificação do Schema do dataframe
df_spark.printSchema()

root
 |-- year_month: date (nullable = true)
 |-- uf: string (nullable = true)
 |-- product: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- volume: double (nullable = true)
 |-- created_at: timestamp (nullable = true)



In [6]:
# Criação do dataframe para validação realizando join entre a base de dados inicial e final

df_pandas_validation = df_pandas.to_spark()

df_validation = df_spark.groupBy([f.substring(f.to_str('year_month'), 0,4).alias('year'), 'uf', 'product']).sum('volume').withColumnRenamed('sum(volume)', 'totall')

df_validation_f = df_validation.join(df_pandas_validation, (df_validation.year == df_pandas_validation.ANO) & (df_validation.uf == df_pandas_validation.ESTADO) & (f.regexp_replace('COMBUSTÍVEL', '\([^\)]*\)', '') == f.col('product')) ,'inner')

In [7]:
#Validação comparando com o dataframe origem do fluxo, procurando as diferenças entre os totais de origem e pós processo
df_validation_f.select('year', 'uf', 'product', f.col('TOTAL').alias('total pré processo').cast('float'), f.col('totall').alias('total pós processo').cast('float')).filter(f.col('totall').cast('float') != f.col('TOTAL').cast('float')).show()

+----+-------------------+--------------------+------------------+------------------+
|year|                 uf|             product|total pré processo|total pós processo|
+----+-------------------+--------------------+------------------+------------------+
|2020|       MINAS GERAIS|QUEROSENE ILUMINA...|            1250.0|            1400.0|
|2020|     RIO DE JANEIRO|QUEROSENE ILUMINA...|            97.996|           106.996|
|2020|  RIO GRANDE DO SUL|QUEROSENE ILUMINA...|             464.0|             534.0|
|2020|             PARANÁ|QUEROSENE ILUMINA...|              87.0|             112.0|
|2020|              BAHIA|QUEROSENE ILUMINA...|             155.4|             160.8|
|2020|          SÃO PAULO|QUEROSENE ILUMINA...|             139.8|             164.8|
|2020|     ESPÍRITO SANTO|QUEROSENE ILUMINA...|              21.0|              23.0|
|2020|RIO GRANDE DO NORTE|QUEROSENE ILUMINA...|              14.0|              18.0|
|2020|     SANTA CATARINA|QUEROSENE ILUMINA...|       

In [8]:
# Escrita da base em parquet, particionado pela coluna 'product'
df_spark.write.mode('overwrite').partitionBy('product').format('parquet').save('datalake/silver/oil')