Hi everyone! Glad you are here!

This is one of the test tasks that I was asked to complete as a part of the interview process. I had to:
1. Transform the file from xlsx to parquet;
2. Conduct data quality checks;
3. Rename columns and aggrefate the data;
4. Convert one currency to another;
5. Save the file back to csv and load in to the database.

In [58]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd

spark = SparkSession.builder.appName("Test").getOrCreate()

### 1. Transformation
There were some difficulties with the library installation, so I just decided to use pandas to turn xlsx to csv format.

In [518]:
prep = pd.read_excel('/Users/Savelyev_Ilya/Downloads/Тестовое/Source/DetailSales.xlsx')
prep['Fecha'] = pd.to_datetime(prep['Fecha'])
prep['Creado'] = pd.to_datetime(prep['Creado'])
prep.to_csv('/Users/Savelyev_Ilya/Downloads/Тестовое/Source/DetailSales.csv', index=False)

In [520]:
df = spark.read.format("csv").option('header', True).option('inferSchema', True).load('/Users/Savelyev_Ilya/Downloads/Тестовое/Source/DetailSales.csv')
df.write.mode('overwrite').parquet('/Users/Savelyev_Ilya/Downloads/Тестовое/Stage/DetailSales.pqt')

                                                                                

### 2. Moving the source
Just use the bash command here

In [522]:
!mv /Users/Savelyev_Ilya/Downloads/Тестовое/Source/DetailSales.xlsx /Users/Savelyev_Ilya/Downloads/Тестовое/Archive/DetailSales.xlsx

### 3. Working with parquet
To begin, let's just take a look at the dataset and at the columns' datatypes.

In [707]:
df = spark.read.format("parquet").load('/Users/Savelyev_Ilya/Downloads/Тестовое/Stage/DetailSales.pqt')
df.show(5)
df.printSchema()

+--------+--------+--------+---------------+--------+------+-----+--------+----------+-------+-------------------+--------+
|      id|Orden ID|Producto|         Nombre|Cantidad|Precio|Total|Vendedor|     Fecha|Usuario|             Creado|Impuesto|
+--------+--------+--------+---------------+--------+------+-----+--------+----------+-------+-------------------+--------+
|3b1c962d|e72de456|      50|COLOR RAIZ X50G|       3|   700| 2100|9f04044e|2021-01-08|  ADMIN|2021-12-06 06:41:00|    1.19|
|037b90c1|52ad5614|      50|COLOR RAIZ X50G|       4|   700| 2800|9f04044e|2021-01-10|  ADMIN|2021-12-06 08:11:00|    1.19|
|7addf8af|d1c1e8db|      50|COLOR RAIZ X50G|       6|   700| 4200|9f04044e|2021-02-08|  ADMIN|2021-12-06 08:56:00|    1.19|
|7c6dc67f|c7b6cf8c|      50|COLOR RAIZ X50G|      12|   700| 8400|9f04044e|2021-02-08|  ADMIN|2021-12-06 08:59:00|    1.19|
|69b6d156|1a248fee|      50|COLOR RAIZ X50G|       6|   700| 4200|c5abb107|2021-02-10|  ADMIN|2021-12-06 09:35:00|    1.19|
+-------

As we can see, all the columns have the correct datatypes.

Now, let's list the major data quality checks we would like to conduct:

1. Uniqueness of rows
2. Abscence of null-values
3. Reasonable date range
4. Abscence of the non-positive values in the numeric columns (Cantidad, Precio, Total)
5. While the following equation is reasonable - Total = Precio * Cantidad, I would like to check that this rule is met in all of the records
 

#### 1. Uniqueness of rows

In [531]:
df.count()

25158

The quantity of rows and unique indetifiers is the same - 25158 records

In [533]:
df.agg(F.countDistinct(df.id)).collect()

[Row(count(DISTINCT id)=25158)]

#### 2. Abscence of null-values
We need to make sure, that there are no cells with null-values, otherwise we have to decide, what to do with them.

In [709]:
nulls = {}

for column in df.columns:
    nulls[column] = df.filter(F.col(f'{column}').isNull()).count()

nulls

{'id': 0,
 'Orden ID': 0,
 'Producto': 0,
 'Nombre': 0,
 'Cantidad': 0,
 'Precio': 0,
 'Total': 0,
 'Vendedor': 0,
 'Fecha': 0,
 'Usuario': 0,
 'Creado': 0,
 'Impuesto': 0}

No null-values found, we can move on.

#### 3. Reasonable date range
Let's make sure that there aren't any abnormal dates in the dataset.

In [535]:
df.agg(F.min(df.Fecha)).collect()

[Row(min(Fecha)=datetime.date(2021, 1, 8))]

In [537]:
df.agg(F.max(df.Fecha)).collect()

[Row(max(Fecha)=datetime.date(2023, 3, 15))]

In [539]:
df.agg(F.min(df.Creado)).collect()

[Row(min(Creado)=datetime.datetime(2021, 4, 8, 0, 0))]

In [541]:
df.agg(F.max(df.Creado)).collect()

[Row(max(Creado)=datetime.datetime(2023, 3, 15, 14, 40, 5))]

There were no abnormal dates found. I am a bit confused that Fecha and Creado somewhere have the same value and somewhere there is a disctance in almost a year. In reality, it could be anomaly, so I would analyze how these two columns are correlated to each other and clean or replace records if needed. While now I don't have this information, I will assume that everything is okay and will move forward to the next check.

#### 4. Abscense of non-positive values in the numeric columns
It's reasonable to assume, that it is impossible to order a negative or zero amount of a product or buy it for the negavive or zero amount of money. Thus, I have to check that there are no non-positive values in Cantidad, Precio and Total columns.

In [543]:
df.filter('Precio <= 0 or Cantidad <= 0 or Total <= 0').show()

+---+--------+--------+------+--------+------+-----+--------+-----+-------+------+--------+
| id|Orden ID|Producto|Nombre|Cantidad|Precio|Total|Vendedor|Fecha|Usuario|Creado|Impuesto|
+---+--------+--------+------+--------+------+-----+--------+-----+-------+------+--------+
+---+--------+--------+------+--------+------+-----+--------+-----+-------+------+--------+



Business logic is not violated, we can move forward.

#### 5. Validity of the equation Total = Precio * Cantidad
While the total amount of money spent equals the product of the price and quantity, let's make sure that all the records meet this rule.

In [545]:
df.filter('Precio * Cantidad != Total').show()

+---+--------+--------+------+--------+------+-----+--------+-----+-------+------+--------+
| id|Orden ID|Producto|Nombre|Cantidad|Precio|Total|Vendedor|Fecha|Usuario|Creado|Impuesto|
+---+--------+--------+------+--------+------+-----+--------+-----+-------+------+--------+
+---+--------+--------+------+--------+------+-----+--------+-----+-------+------+--------+



There are no such records. Great!

All the major data quality checks are conducted. We can go to the next step of our task.

Next, we have to rename some of the columns and leave only them in the dataframe.

Here, in the course of the task, I found that it is required to aggregate by number of pieces and price, but the question arises how to aggregate by price. If the price of the product is the same in the context of the day, there are no problems, and if it changes, then there is a problem. If we take the average price value for the day, the data may also be incorrect, because it will not take into account how many pieces were bought at each price. The most accurate result will be dividing the amount of revenue per day by the number of units sold, then we will get a weighted average price per day. In this regard, I will allow myself a small deviation from the instructions during the execution of the task. The final dataset will be in the format that is required.

In [711]:
agg = df.select(
    F.col('Producto').alias('Product_ID'), 
    F.col('Nombre').alias('Product_Name'), 
    F.col('Cantidad').alias('Quantity'), 
    F.col('Total'),
    F.col('Fecha').alias('date')
).groupby(
    'date',
    'Product_ID',
    'Product_Name'
).agg(
    {
        'Quantity': 'sum',
        'Total': 'sum'
    }
).withColumn(
    'Amount',
    F.col('sum(Total)') / F.col('sum(Quantity)')
).select(
    'date',
    'Product_ID',
    'Product_Name',
    F.col('sum(Quantity)').alias('Quantity'),
    'Amount',
)

agg.show()

+----------+----------+--------------------+--------+------------------+
|      date|Product_ID|        Product_Name|Quantity|            Amount|
+----------+----------+--------------------+--------+------------------+
|2022-10-17|        50|AZAFRAN DE RAIZ X50G|      14|             900.0|
|2022-11-28|        50|AZAFRAN DE RAIZ X50G|      18|             900.0|
|2022-12-03|        53| CANELA ASTILLA X20G|      30|            3200.0|
|2022-05-10|        54|     SAL DE AJO X20G|       3|             600.0|
|2022-01-26|        57|        CURCUMA X40G|       4|            1500.0|
|2022-03-10|        58|    COLOR SUPER X30G|      16|             500.0|
|2022-04-19|        58|    COLOR SUPER X30G|       8|             500.0|
|2021-05-08|        61|     UVAS PASAS X40G|      13|             920.0|
|2021-11-18|        67| CANELA ASTILLA X10G|      30|            1400.0|
|2021-01-08|        68|  CANELA MOLIDA X10G|       6|            1200.0|
|2022-08-02|        68|  CANELA MOLIDA X10G|       

In the original excel file, the numbers in the Precio column (now Amount) were in USD. The task requires you to convert prices to rubles on the appropriate date. To get the current exchange rates, I will use the free open API from the site https://currencybeacon.com . I will take 3 years of data from 2021 to 2023 with a reserve, make a dataframe of them and then combine them with our data.

In [331]:
import requests as r
import json
import os
from datetime import datetime, timedelta

token = os.getenv("CURRENCY_TOKEN")
rows = []
current_date = datetime(2021, 1, 1)

while current_date < datetime(2024, 1, 1):

    date_iso = current_date.strftime('%Y-%m-%d')
    
    url = f'https://api.currencybeacon.com/v1/historical?api_key={token}&base=USD&date={date_iso}&symbols=RUB'

    response = r.get(url)
    res = json.loads(response.text)

    row = {'date': res['response']['date'], 'USDRUB': res['response']['rates']['RUB']}
    rows.append(row)

    current_date += timedelta(days=1)

usdrub = spark.createDataFrame(rows)
usdrub = usdrub.withColumn('date', F.to_date('date'))
usdrub.printSchema()
usdrub.show(5)

root
 |-- USDRUB: double (nullable = true)
 |-- date: date (nullable = true)

+-----------+----------+
|     USDRUB|      date|
+-----------+----------+
|74.00082406|2021-01-01|
|74.02350872|2021-01-02|
|73.82344403|2021-01-03|
|74.27699404|2021-01-04|
|74.00834907|2021-01-05|
+-----------+----------+
only showing top 5 rows



In [713]:
agg = agg.join(usdrub, how='left_outer', on='date')
agg.show(5)

+----------+----------+--------------------+--------+------+------------+
|      date|Product_ID|        Product_Name|Quantity|Amount|      USDRUB|
+----------+----------+--------------------+--------+------+------------+
|2022-11-28|        50|AZAFRAN DE RAIZ X50G|      18| 900.0| 61.11183603|
|2022-12-03|        53| CANELA ASTILLA X20G|      30|3200.0| 62.57337995|
|2022-05-10|        54|     SAL DE AJO X20G|       3| 600.0| 69.77218835|
|2022-10-17|        50|AZAFRAN DE RAIZ X50G|      14| 900.0| 62.07689851|
|2022-03-10|        58|    COLOR SUPER X30G|      16| 500.0|132.68608142|
+----------+----------+--------------------+--------+------+------------+
only showing top 5 rows



We make the final transformations and get the final dataframe. I rounded the prices in rubles to two characters only for better readability within the framework of the test task.

In [733]:
final_df = agg.withColumn(
    'Amount', F.round(F.col('Amount') * F.col('USDRUB'), 2)
).select(
    F.date_format(F.col('date'), 'dd.MM.yyyy').alias('date'),
    'Product_ID',
    'Product_Name',
    'Quantity',
    'Amount'
)

final_df.show()

+----------+----------+--------------------+--------+---------+
|      date|Product_ID|        Product_Name|Quantity|   Amount|
+----------+----------+--------------------+--------+---------+
|17.10.2022|        50|AZAFRAN DE RAIZ X50G|      14| 55869.21|
|28.11.2022|        50|AZAFRAN DE RAIZ X50G|      18| 55000.65|
|03.12.2022|        53| CANELA ASTILLA X20G|      30|200234.82|
|10.05.2022|        54|     SAL DE AJO X20G|       3| 41863.31|
|26.01.2022|        57|        CURCUMA X40G|       4| 119283.6|
|10.03.2022|        58|    COLOR SUPER X30G|      16| 66343.04|
|19.04.2022|        58|    COLOR SUPER X30G|       8| 40547.97|
|08.05.2021|        61|     UVAS PASAS X40G|      13|  67846.1|
|18.11.2021|        67| CANELA ASTILLA X10G|      30|102163.51|
|08.01.2021|        68|  CANELA MOLIDA X10G|       6| 88904.82|
|02.08.2022|        68|  CANELA MOLIDA X10G|       9|  48498.5|
|02.04.2022|        70|    CLAVO ENTERO X5G|       4| 68599.32|
|28.12.2021|        70|    CLAVO ENTERO 

Saving the result to csv

In [648]:
final_df.write.mode('overwrite').csv('/Users/Savelyev_Ilya/Downloads/Тестовое/Business/Sales.csv', header=True)

Also, in the task, you need to upload data to any DBMS. Due to the fact that I am very limited in time, I will allow myself not to connect Spark to the database using JDBC, but I will transform the dataframe into Pandas and upload it using the PostgreSQL connection I already have, raised in Docker.

In [735]:
from sqlalchemy import create_engine

df_pd = final_df.toPandas()
engine = create_engine('postgresql+psycopg2://admin:12345@localhost:5432/test_db')
df_pd.to_sql(name='sales', con=engine, if_exists='replace', index=False)

72

Let's make a test query to the table to check that everything is working.

In [742]:
pd.read_sql('select * from sales', con=engine)

Unnamed: 0,date,Product_ID,Product_Name,Quantity,Amount
0,17.10.2022,50,AZAFRAN DE RAIZ X50G,14,55869.21
1,28.11.2022,50,AZAFRAN DE RAIZ X50G,18,55000.65
2,03.12.2022,53,CANELA ASTILLA X20G,30,200234.82
3,10.05.2022,54,SAL DE AJO X20G,3,41863.31
4,26.01.2022,57,CURCUMA X40G,4,119283.60
...,...,...,...,...,...
13067,19.10.2021,177,LINAZA 250G,3,191434.42
13068,16.03.2022,188,EMPAQUE DE CHORIZO X4M,3,453542.16
13069,27.12.2022,193,CAJITA BICARBONATO X10G,14,28230.11
13070,10.10.2022,227,SALSA NEGRA X120ML,6,88737.48


The data has been successfully uploaded to the database, the last part of the task - visualization of the results in Power BI can be viewed at this link: https://app.powerbi.com/view?r=eyJrIjoiMzE5YzdmODYtNGFkMC00ZGVlLWI2ODUtMDQwYmFiMTIxMTE5IiwidCI6ImI3ZjgxMzlhLWEwYTUtNGU0Ny05MGExLTI1OGNjZjE5MTYxNSIsImMiOjl9

Thanks for your attention!</br>
Sincerely,</br> 
Ilya Savelev