In [2]:
from subprocess import  Popen
from os import makedirs, path
import pandas as pd
import urllib
import pyarrow

In [37]:
def download_store_file():
    url = 'https://github.com/raizen-analytics/data-engineering-test/raw/master/assets/vendas-combustiveis-m3.xls'
    xls_filename = url.split("/")[-1]
    xls_dir = "./raw_data/"
    makedirs(xls_dir, exist_ok=True)
    response = urllib.request.urlretrieve(url, f'{xls_dir}/{xls_filename}') 

download_store_file()

def pre_process_file():
    convert_dir = './converted_data'
    file_path = "./raw_data/vendas-combustiveis-m3.xls"
    makedirs(convert_dir, exist_ok=True)
    # if you are using windows, libreoffice installation is required
    loffice = 'C:/Program Files/LibreOffice/program/soffice.exe'
    cmd = f"{loffice} --headless --convert-to xls --outdir {convert_dir} {file_path}"
    pr = Popen(cmd.split())
    pr.communicate()

pre_process_file()

def process_data(df):
    
    df.columns = ['combustivel', 'ano', 'região', 'uf', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', 'total']
    df = df.melt(id_vars=['combustivel', 'ano', 'região', 'uf'])
    #df_totals = df.loc[df['variable'] == 'total']
    df = df.loc[df['variable'] != 'total']
    df["year_month"] = pd.to_datetime(df["ano"].astype(str) + "-" + df["variable"], format="%Y-%m")
    df["product"] = df["combustivel"].str.split("(").str[0].str.strip()
    df["unit"] = df["combustivel"].str.split("(").str[1].replace('\)', '', regex = True).str.strip()
    df["created_at"] = pd.Timestamp.today()
    df = df.drop(labels=["combustivel", "ano", "região", 'variable'], axis=1)
    df.rename(columns={"estado": "uf", "value": "volume"}, inplace=True)
    df['volume'] = pd.to_numeric(df['volume'])
    df.fillna(0, inplace=True)
    df = df.iloc[:, [2,0,3,4,1,5]]

    return df

def store_parquet(xls_sheet_name):
    """
    Store the result in .parquet format
    in this scenario we dont used partition because of the small size of the files, if that is needed use the code bellow as example:
    df.to_parquet(f'{parquet_dir}', partition_cols=['year_month'])
    """
    df = pd.read_excel('./converted_data/vendas-combustiveis-m3.xls', sheet_name=xls_sheet_name)
    df = process_data(df)
    p_dir = "./partition_files/"
    #p_filename = "sales_oil_derivative_fuels.parquet"
    p_filename = "sales_diesel.parquet"
    makedirs(p_dir, exist_ok=True)
    df.to_parquet(p_dir + p_filename)


In [38]:
#store_parquet("DPCache_m3")
store_parquet("DPCache_m3_2")

## Validate Schema

In [39]:
sales_diesel = "./partition_files/sales_diesel.parquet"
sales_oil_derivative_fuels = "./partition_files/sales_oil_derivative_fuels.parquet"
df_sales_diesel = pd.read_parquet(sales_diesel)
df_sales_oil_derivative_fuels = pd.read_parquet(sales_oil_derivative_fuels)

In [40]:
df_sales_diesel

Unnamed: 0,year_month,uf,product,unit,volume,created_at
0,2013-01-01,RONDÔNIA,ÓLEO DIESEL S-10,m3,81453.67,2022-11-13 18:20:21.544073
1,2013-01-01,ACRE,ÓLEO DIESEL S-10,m3,1483.00,2022-11-13 18:20:21.544073
2,2013-01-01,AMAZONAS,ÓLEO DIESEL S-10,m3,6836.30,2022-11-13 18:20:21.544073
3,2013-01-01,RORAIMA,ÓLEO DIESEL S-10,m3,1475.30,2022-11-13 18:20:21.544073
4,2013-01-01,PARÁ,ÓLEO DIESEL S-10,m3,40913.48,2022-11-13 18:20:21.544073
...,...,...,...,...,...,...
12955,2020-12-01,RIO GRANDE DO SUL,ÓLEO DIESEL,OUTROS,10.00,2022-11-13 18:20:21.544073
12956,2020-12-01,MATO GROSSO DO SUL,ÓLEO DIESEL,OUTROS,0.00,2022-11-13 18:20:21.544073
12957,2020-12-01,MATO GROSSO,ÓLEO DIESEL,OUTROS,90.00,2022-11-13 18:20:21.544073
12958,2020-12-01,GOIÁS,ÓLEO DIESEL,OUTROS,0.00,2022-11-13 18:20:21.544073


In [41]:
df_sales_diesel.dtypes

year_month    datetime64[ns]
uf                    object
product               object
unit                  object
volume               float64
created_at    datetime64[ns]
dtype: object

## Verify Parquet with Spark

In [42]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import year
spark=SparkSession.builder.appName("parquetFile").getOrCreate()

In [43]:
parDFdiesel=spark.read.parquet("./partition_files/sales_diesel.parquet")
parDFdiesel.show(truncate=False)

+-------------------+-------------------+----------------+----+------------------+--------------------------+-----------------+
|year_month         |uf                 |product         |unit|volume            |created_at                |__index_level_0__|
+-------------------+-------------------+----------------+----+------------------+--------------------------+-----------------+
|2012-12-31 22:00:00|RONDÔNIA           |ÓLEO DIESEL S-10|m3  |81453.67          |2022-11-13 15:20:21.544073|0                |
|2012-12-31 22:00:00|ACRE               |ÓLEO DIESEL S-10|m3  |1483.0            |2022-11-13 15:20:21.544073|1                |
|2012-12-31 22:00:00|AMAZONAS           |ÓLEO DIESEL S-10|m3  |6836.3            |2022-11-13 15:20:21.544073|2                |
|2012-12-31 22:00:00|RORAIMA            |ÓLEO DIESEL S-10|m3  |1475.3            |2022-11-13 15:20:21.544073|3                |
|2012-12-31 22:00:00|PARÁ               |ÓLEO DIESEL S-10|m3  |40913.48          |2022-11-13 15:20:21.54

In [44]:
parDFderifuel=spark.read.parquet("./partition_files/sales_oil_derivative_fuels.parquet")
parDFderifuel.show(truncate=False)

+-------------------+-------------------+----------+----+------------------+--------------------------+-----------------+
|year_month         |uf                 |product   |unit|volume            |created_at                |__index_level_0__|
+-------------------+-------------------+----------+----+------------------+--------------------------+-----------------+
|1999-12-31 22:00:00|RONDÔNIA           |GASOLINA C|m3  |136073.25300000003|2022-11-13 15:19:53.291731|0                |
|1999-12-31 22:00:00|ACRE               |GASOLINA C|m3  |3358.346          |2022-11-13 15:19:53.291731|1                |
|1999-12-31 22:00:00|AMAZONAS           |GASOLINA C|m3  |20766.918         |2022-11-13 15:19:53.291731|2                |
|1999-12-31 22:00:00|RORAIMA            |GASOLINA C|m3  |3716.032          |2022-11-13 15:19:53.291731|3                |
|1999-12-31 22:00:00|PARÁ               |GASOLINA C|m3  |29755.907         |2022-11-13 15:19:53.291731|4                |
|1999-12-31 22:00:00|AMA