# Solution to Data Engineering Test

*by github.com/albertotav*

This is my particular attempt to solve to the Data Engineering Test created by Raízen Analytics, which can be found at github.com/raizen-analytics/data-engineering-test.

It consists of a two stages approach. First, the underlying data from the two requested tables on the ANP pivot table (on vendas-combustiveis-m3.xls workbook) is extracted using a VBA Macro whose code can be found at github.com/albertotav/raizen-data-engineering-test/blob/main/raw_data/pivot_data_macro.bas. This method was chosen as its simplicity allow direct extraction of the wanted underlying data from the pivot table without having to convert it to other formats (other than enabling macros), using VBA’s Range.ShowDetail property and saving both resulting sheets as .csv files.

The second stage divides into two possible solutions: 

A) A Jupyter notebook (raizen-analytics_data-engineering-test.ipynb), that read the raw csv files from stage one and exports it as refined data with the challenge required schema set that has been written with the intention of delivering easily replicable results using Google’s Colab platform and includes, as requested, a verify function that check if the refined data sum matches that of the raw data total sum column. This notebook intends to show how each step was developed and how it works.

B) An Apache Airflow DAG. The pipeline reads the raw data from stage one, set the required schema and save into Parquet format as a single dataframe.

More info at https://github.com/albertotav/raizen-data-engineering-test

In [1]:
import urllib.request

# Get raw csv from githbub folder in albertotav/raizen-data-engineering-test/tree/main/raw_data

target_url = 'https://github.com/albertotav/raizen-data-engineering-test/blob/main/raw_data/diesel_by_uf_and_type.csv.gz?raw=true'
filename = 'diesel_by_uf_and_type.csv.gz'

urllib.request.urlretrieve(target_url, filename)

target_url = 'https://github.com/albertotav/raizen-data-engineering-test/blob/main/raw_data/oil_derivative_fuels_by_uf_and_product.csv.gz?raw=true'
filename = 'oil_derivative_fuels_by_uf_and_product.csv.gz'

urllib.request.urlretrieve(target_url, filename)

('oil_derivative_fuels_by_uf_and_product.csv.gz',
 <http.client.HTTPMessage at 0x7fc92ee04910>)

In [2]:
import pandas as pd
import datetime

# Creates dataframes with Pandas for both csv files. 

df_diesel_raw = pd.read_csv('diesel_by_uf_and_type.csv.gz',compression='gzip', encoding='iso-8859-1')
df_oil_raw = pd.read_csv('oil_derivative_fuels_by_uf_and_product.csv.gz',compression='gzip', encoding='iso-8859-1')

In [3]:
def translate_setdate(df):
  """
  Translate dataframe colums names to english and creates year_month datetype column.

  :param pd.DataFrame df: diesel-by-uf-and-type.csv or oil-derivative-fuels-by-uf-and-product.csv derivate Pandas dataframe

  :returns: DataFrame with processed data.
  :rtype: pd.DataFrame
  """

  df.columns = ['product','year','region','uf','unit', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec','TOTAL']

  df = pd.melt(df, 
     id_vars=list(df.columns[0:5]), 
     value_vars=list(df.columns[5:-1]),
     var_name='month',
     value_name='volume')

  df['year_month'] = df['year'].astype(str)+'-'+df['month']
  df['year_month'] = pd.to_datetime(df['year_month'], format= '%Y-%b').dt.date

  df = df[['year_month','uf','product','unit','volume']]
  df['volume'].fillna(0, inplace=True) # Fills volume column NaNs with zero. Those are present in the months of 2020 not yet accounted for.

  return df

In [4]:
# Creates Pandas df to be used with pyspark

df_oil = translate_setdate(df_oil_raw)
df_diesel = translate_setdate(df_diesel_raw)

print('df_oil row count: ',len(df_oil.index))
print('df_oil columns: ',list(df_oil.columns))
print('\ndf_diesel row count: ',len(df_diesel.index))
print('df_diesel columns: ',list(df_diesel.columns))

df_oil row count:  54432
df_oil columns:  ['year_month', 'uf', 'product', 'unit', 'volume']

df_diesel row count:  12960
df_diesel columns:  ['year_month', 'uf', 'product', 'unit', 'volume']


## Pyspark

#### Installing pyspark

In [5]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 67kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 20.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=dff0f0729d52f61fe4a464e0fd13ee1bc87531ee0882f6e6a15b334cc17cfe34
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


#### Start pyspark session

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, TimestampType, DoubleType
from pyspark.sql.functions import lit, unix_timestamp
from pyspark.sql import SparkSession

# Start Spark session

spark = SparkSession.builder \
.appName('Python-spark-session') \
.config('spark.sql.session.timeZone', 'America/Sao_Paulo') \
.getOrCreate()

#### Creates spark dataframes

In [7]:
import time

# Defines schema as requested by the challenge

schema = StructType([
  StructField('year_month', DateType(), False),
  StructField('uf', StringType(), False),
  StructField('product', StringType(), False),
  StructField('unit', StringType(), False),
  StructField('volume', DoubleType(), True),
  ])

# Creates Spark Dataframes with required schema and timestamp column

spark_df_oil = spark.createDataFrame(df_oil, schema=schema) \
  .withColumn('created_at', unix_timestamp(lit(datetime.datetime.fromtimestamp(time.time()) \
  .strftime('%Y-%m-%d %H:%M:%S')),'yyyy-MM-dd HH:mm:ss') \
  .cast("timestamp")) 

spark_df_diesel = spark.createDataFrame(df_diesel, schema=schema) \
  .withColumn('created_at', unix_timestamp(lit(datetime.datetime.fromtimestamp(time.time()) \
  .strftime('%Y-%m-%d %H:%M:%S')),'yyyy-MM-dd HH:mm:ss') \
  .cast("timestamp"))

#### Verifying schema

In [8]:
print('spark_df_oil schema:')
spark_df_oil.printSchema()

print('spark_df_diesel schema:')
spark_df_diesel.printSchema()

spark_df_oil schema:
root
 |-- year_month: date (nullable = false)
 |-- uf: string (nullable = false)
 |-- product: string (nullable = false)
 |-- unit: string (nullable = false)
 |-- volume: double (nullable = true)
 |-- created_at: timestamp (nullable = true)

spark_df_diesel schema:
root
 |-- year_month: date (nullable = false)
 |-- uf: string (nullable = false)
 |-- product: string (nullable = false)
 |-- unit: string (nullable = false)
 |-- volume: double (nullable = true)
 |-- created_at: timestamp (nullable = true)



#### Dataframe preview

In [9]:
spark_df_oil.show()

+----------+-------------------+---------------+----+------------------+-------------------+
|year_month|                 uf|        product|unit|            volume|         created_at|
+----------+-------------------+---------------+----+------------------+-------------------+
|2000-01-01|           RONDÔNIA|GASOLINA C (m3)|  m3|          9563.263|2021-06-24 00:41:02|
|2000-01-01|               ACRE|GASOLINA C (m3)|  m3|          3065.758|2021-06-24 00:41:02|
|2000-01-01|           AMAZONAS|GASOLINA C (m3)|  m3|         17615.604|2021-06-24 00:41:02|
|2000-01-01|            RORAIMA|GASOLINA C (m3)|  m3|            3259.3|2021-06-24 00:41:02|
|2000-01-01|               PARÁ|GASOLINA C (m3)|  m3|         28830.479|2021-06-24 00:41:02|
|2000-01-01|              AMAPÁ|GASOLINA C (m3)|  m3|           3456.35|2021-06-24 00:41:02|
|2000-01-01|          TOCANTINS|GASOLINA C (m3)|  m3|          6961.518|2021-06-24 00:41:02|
|2000-01-01|           MARANHÃO|GASOLINA C (m3)|  m3|          16751.0

In [10]:
spark_df_diesel.show()

+----------+-------------------+--------------------+----+-----------------+-------------------+
|year_month|                 uf|             product|unit|           volume|         created_at|
+----------+-------------------+--------------------+----+-----------------+-------------------+
|2013-01-01|           RONDÔNIA|ÓLEO DIESEL S-10 ...|  m3|           3517.6|2021-06-24 00:41:03|
|2013-01-01|               ACRE|ÓLEO DIESEL S-10 ...|  m3|            363.0|2021-06-24 00:41:03|
|2013-01-01|           AMAZONAS|ÓLEO DIESEL S-10 ...|  m3|         3190.585|2021-06-24 00:41:03|
|2013-01-01|            RORAIMA|ÓLEO DIESEL S-10 ...|  m3|            795.4|2021-06-24 00:41:03|
|2013-01-01|               PARÁ|ÓLEO DIESEL S-10 ...|  m3|          30137.8|2021-06-24 00:41:03|
|2013-01-01|              AMAPÁ|ÓLEO DIESEL S-10 ...|  m3|            252.5|2021-06-24 00:41:03|
|2013-01-01|          TOCANTINS|ÓLEO DIESEL S-10 ...|  m3|           6365.0|2021-06-24 00:41:03|
|2013-01-01|           MARANHÃ

#### Check if totals match raw data

In [11]:
import random

def rnd_query(df_raw, table_name):
  """
  Creates a random query to extract the sum of the column volume that matches a random year, product and UF. 
  Also return the matching total column value from the original raw data.

  :param pd.DataFrame df: diesel-by-uf-and-type.csv or oil-derivative-fuels-by-uf-and-product.csv derivate Pandas dataframe
  :param string table_name: Name of the refined Spark dataframe derived table from raw data

  :returns: SUM Query for random parameters with expected total column value from raw data
  """

  # Select random values for UF, Product and Year with corresponding Total value from Pandas raw Dataframe

  rnd_uf = str(random.choice(df_raw.iloc[:,3].unique()))
  rnd_product = str(random.choice(df_raw.iloc[:,0].unique()))
  rnd_year = random.choice(df_raw.iloc[:,1].unique())

  error = True
  while error: # Check if the total value exist for rnd_product
    try:
       rnd_total = df_raw.loc[(df_raw.iloc[:,1]==rnd_year) \
               & (df_raw.iloc[:,3]==rnd_uf) \
               & (df_raw.iloc[:,0]==rnd_product)].iloc[0,-1]
       error = False
    except:
      rnd_product = str(random.choice(df_raw.iloc[:,0].unique()))
 
  print('Random UF: ',rnd_uf)
  print('Random Product:',rnd_product)
  print('Random Year: ',rnd_year)
  print('Expected Total: ', rnd_total)

  query = ("SELECT SUM(volume) FROM {table_name} \
          WHERE uf == '{rnd_uf}' AND \
          YEAR(year_month) == {rnd_year} AND \
          product == '{rnd_product}'").format(table_name=table_name, rnd_uf=rnd_uf, rnd_year=rnd_year, rnd_product=rnd_product)

  return query, rnd_total

In [12]:
# Creates or replaces a local temporary view with selected DataFrame

spark_df_diesel.createOrReplaceTempView('diesel')

spark_df_oil.createOrReplaceTempView('oil')

In [13]:
def verify_match(df_raw, table_name):
  """
  Verifies if the SUM of volume column from table using random parameters matches the corresponding total in the original data.
  Raise error if it doesn't.

  :param pd.DataFrame df: diesel-by-uf-and-type.csv or oil-derivative-fuels-by-uf-and-product.csv derivate Pandas dataframe
  :param string table_name: Name of the refined Spark dataframe derived table from raw data
  """
  print('--------------------------------------------------')
  
  print('Table being verified: ',table_name,'\n')
  print('Query SUM result must match the expected random Total:\n')

  query, rnd_total = rnd_query(df_raw, table_name)
  
  results = spark.sql(query)

  print('\nQuery result from {table_name} table:'.format(table_name=table_name))
  results.show()

  if abs(results.head()[0] - rnd_total) <0.01: # Discard differences of small decimal digits
    print('Total values match')

  else:
    raise ValueError('Verify data consistency')

  print('--------------------------------------------------')

  pass

In [14]:
# Run verify_match function to validade refined data with raw totals

verify_match(df_diesel_raw, 'diesel')

verify_match(df_oil_raw, 'oil')

--------------------------------------------------
Table being verified:  diesel 

Query SUM result must match the expected random Total:

Random UF:  RIO DE JANEIRO
Random Product: ÓLEO DIESEL MARÍTIMO (m3)
Random Year:  2019
Expected Total:  243919.34699999998

Query result from diesel table:
+-----------+
|sum(volume)|
+-----------+
| 243919.347|
+-----------+

Total values match
--------------------------------------------------
--------------------------------------------------
Table being verified:  oil 

Query SUM result must match the expected random Total:

Random UF:  MARANHÃO
Random Product: ÓLEO DIESEL (m3)
Random Year:  2013
Expected Total:  1214221.924

Query result from oil table:
+-----------+
|sum(volume)|
+-----------+
|1214221.924|
+-----------+

Total values match
--------------------------------------------------


## Verifying results but now with both tables concatenated

In [15]:
# Concatenate raw dfs created earlier with Pandas

df_concat_raw = pd.concat([df_oil_raw, df_diesel_raw], axis=0)

df_concat = translate_setdate(df_concat_raw)

print('df_concat columns: ',list(df_concat.columns))
print('df_concat row count: ',len(df_concat.index))

df_concat columns:  ['year_month', 'uf', 'product', 'unit', 'volume']
df_concat row count:  67392


In [16]:
# Creates Spark Dataframes with required schema and timestamp column

spark_df_concat = spark.createDataFrame(df_concat, schema=schema) \
  .withColumn('created_at', unix_timestamp(lit(datetime.datetime.fromtimestamp(time.time()) \
  .strftime('%Y-%m-%d %H:%M:%S')),'yyyy-MM-dd HH:mm:ss') \
  .cast("timestamp"))

print('spark_df_concat schema:')
spark_df_concat.printSchema()

spark_df_concat schema:
root
 |-- year_month: date (nullable = false)
 |-- uf: string (nullable = false)
 |-- product: string (nullable = false)
 |-- unit: string (nullable = false)
 |-- volume: double (nullable = true)
 |-- created_at: timestamp (nullable = true)



In [17]:
# Creates or replaces a local temporary view with selected DataFrame
spark_df_concat.createOrReplaceTempView('concat')

# Run verify_match function to validade refined data with raw totals
verify_match(df_concat_raw, 'concat')

--------------------------------------------------
Table being verified:  concat 

Query SUM result must match the expected random Total:

Random UF:  ACRE
Random Product: ÓLEO DIESEL S-10 (m3)
Random Year:  2018
Expected Total:  38150.815

Query result from concat table:
+-----------+
|sum(volume)|
+-----------+
|  38150.815|
+-----------+

Total values match
--------------------------------------------------


## Exporting data

In [18]:
# Save dataframes as compressed .csv.gz in the refined_data folder

spark_df_oil.coalesce(1).write \
                .option('header', 'true') \
                .csv('refined_data/oil_derivative_fuels_by_uf_and_product_refined_data', mode='overwrite', sep=';', compression='gzip', encoding='iso-8859-1')

spark_df_diesel.coalesce(1).write \
                .option('header', 'true') \
                .csv('refined_data/diesel_by_uf_and_type_refined_data', mode='overwrite', sep=';', compression='gzip', encoding='iso-8859-1')

spark_df_concat.coalesce(1).write \
                .option('header', 'true') \
                .csv('refined_data/oil_and_diesel_refined_data', mode='overwrite', sep=';', compression='gzip', encoding='iso-8859-1')