
## TECH TEST

In [0]:
# File location and type
file_location = "/FileStore/tables/sales.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

transaction_id,date,category,product,quantity,price
1,2024-07-01,Widget,Widget-A,10.0,9.99
2,2024-07-01,Gadget,Gadget-X,5.0,19.99
3,2024-07-02,Widget,Widget-B,7.0,9.99
4,2024-07-02,Doodad,Doodad-1,,4.99
5,2024-07-03,Widget,Widget-C,3.0,9.99
6,2024-07-03,Gadget,Gadget-Y,8.0,19.99
7,2024-07-04,Widget,Widget-A,2.0,9.99
8,2024-07-04,Doodad,Doodad-2,4.0,not_a_number
9,2024-07-05,Widget,Widget-B,6.0,9.99
10,2024-07-05,Gadget,Gadget-X,3.0,19.99


In [0]:

from pyspark.sql.functions import col, coalesce, date_format, when, row_number, mean, stddev, lit, abs
from pyspark.sql.window import Window

## 1. DATA CLEANING

>**Handle missing values:**

In [0]:


# Replace missing quantity with 0
df_1 = df.fillna({"quantity":0}) 
print("Replace missing quantity with 0")
display(df_1)

# Replace missing or invalid price values (not_a_number) with the median price for the same product category
df_tmp = df_1.replace(["not_a_number"], [None], "price")

median_price = df_tmp.groupBy("category").agg({"price": "median"}).withColumnRenamed("median(price)", "median_price").withColumnRenamed("category", "category_b")

df_joined = df_tmp.join(median_price, 
                    ((df_tmp["category"] == median_price["category_b"]) & df_tmp.price.isNull()), 
                    how="left")                 
                    
df_2 = df_joined.select(df_joined["transaction_id"],  
                    df_joined["date"],
                    df_joined["category"],
                    df_joined["product"],
                    df_joined["quantity"],
                    coalesce(df_joined["price"], df_joined["median_price"]).alias("price") 
)
print("Replace missing or invalid price values (not_a_number) with the median price for the same product category")
display(df_2)

Replace missing quantity with 0


transaction_id,date,category,product,quantity,price
1,2024-07-01,Widget,Widget-A,10,9.99
2,2024-07-01,Gadget,Gadget-X,5,19.99
3,2024-07-02,Widget,Widget-B,7,9.99
4,2024-07-02,Doodad,Doodad-1,0,4.99
5,2024-07-03,Widget,Widget-C,3,9.99
6,2024-07-03,Gadget,Gadget-Y,8,19.99
7,2024-07-04,Widget,Widget-A,2,9.99
8,2024-07-04,Doodad,Doodad-2,4,not_a_number
9,2024-07-05,Widget,Widget-B,6,9.99
10,2024-07-05,Gadget,Gadget-X,3,19.99


Replace missing or invalid price values (not_a_number) with the median price for the same product category


transaction_id,date,category,product,quantity,price
1,2024-07-01,Widget,Widget-A,10,9.99
2,2024-07-01,Gadget,Gadget-X,5,19.99
3,2024-07-02,Widget,Widget-B,7,9.99
4,2024-07-02,Doodad,Doodad-1,0,4.99
5,2024-07-03,Widget,Widget-C,3,9.99
6,2024-07-03,Gadget,Gadget-Y,8,19.99
7,2024-07-04,Widget,Widget-A,2,9.99
8,2024-07-04,Doodad,Doodad-2,4,4.99
9,2024-07-05,Widget,Widget-B,6,9.99
10,2024-07-05,Gadget,Gadget-X,3,19.99


> **Drop rows where both quantity and price are invalid or missing**

In [0]:
df_cleaned = df.withColumn("price", col("price").cast("double")).dropna(subset=["price"]).withColumn("quantity", col("quantity")).dropna(subset=["quantity"])
display(df_cleaned)

transaction_id,date,category,product,quantity,price
1,2024-07-01,Widget,Widget-A,10,9.99
2,2024-07-01,Gadget,Gadget-X,5,19.99
3,2024-07-02,Widget,Widget-B,7,9.99
5,2024-07-03,Widget,Widget-C,3,9.99
6,2024-07-03,Gadget,Gadget-Y,8,19.99
7,2024-07-04,Widget,Widget-A,2,9.99
9,2024-07-05,Widget,Widget-B,6,9.99
10,2024-07-05,Gadget,Gadget-X,3,19.99
11,2024-07-06,Gadget,,5,19.99
12,2024-07-06,Doodad,Doodad-3,1,4.99



## 2. DERIVED COLUMNS

> **Calculate total_sales (quantity * price) for each transaction**

In [0]:
df_total_sales = df_cleaned.alias("a"). \
                  select(
                    "a.*", 
                    (col("quantity") * col("price")).alias("total_sales"))
display(df_total_sales)

transaction_id,date,category,product,quantity,price,total_sales
1,2024-07-01,Widget,Widget-A,10,9.99,99.9
2,2024-07-01,Gadget,Gadget-X,5,19.99,99.95
3,2024-07-02,Widget,Widget-B,7,9.99,69.93
5,2024-07-03,Widget,Widget-C,3,9.99,29.97
6,2024-07-03,Gadget,Gadget-Y,8,19.99,159.92
7,2024-07-04,Widget,Widget-A,2,9.99,19.98
9,2024-07-05,Widget,Widget-B,6,9.99,59.94
10,2024-07-05,Gadget,Gadget-X,3,19.99,59.97
11,2024-07-06,Gadget,,5,19.99,99.95
12,2024-07-06,Doodad,Doodad-3,1,4.99,4.99


> **Create a day_of_week column based on the date**

In [0]:
df_day_week = df_total_sales.alias("b").select(
    "b.*",
    date_format(col("date"), "EEEE").alias("day_of_week")
)
display(df_day_week)

transaction_id,date,category,product,quantity,price,total_sales,day_of_week
1,2024-07-01,Widget,Widget-A,10,9.99,99.9,Monday
2,2024-07-01,Gadget,Gadget-X,5,19.99,99.95,Monday
3,2024-07-02,Widget,Widget-B,7,9.99,69.93,Tuesday
5,2024-07-03,Widget,Widget-C,3,9.99,29.97,Wednesday
6,2024-07-03,Gadget,Gadget-Y,8,19.99,159.92,Wednesday
7,2024-07-04,Widget,Widget-A,2,9.99,19.98,Thursday
9,2024-07-05,Widget,Widget-B,6,9.99,59.94,Friday
10,2024-07-05,Gadget,Gadget-X,3,19.99,59.97,Friday
11,2024-07-06,Gadget,,5,19.99,99.95,Saturday
12,2024-07-06,Doodad,Doodad-3,1,4.99,4.99,Saturday


> **Add a high_volume flag: True if quantity > 10, otherwise False.**

In [0]:
df_flg = df_day_week.select(
    "*",
    when(col("quantity") > 10, True).otherwise(False).alias("high_volume")
)
display(df_flg)

transaction_id,date,category,product,quantity,price,total_sales,day_of_week,high_volume
1,2024-07-01,Widget,Widget-A,10,9.99,99.9,Monday,False
2,2024-07-01,Gadget,Gadget-X,5,19.99,99.95,Monday,False
3,2024-07-02,Widget,Widget-B,7,9.99,69.93,Tuesday,False
5,2024-07-03,Widget,Widget-C,3,9.99,29.97,Wednesday,False
6,2024-07-03,Gadget,Gadget-Y,8,19.99,159.92,Wednesday,False
7,2024-07-04,Widget,Widget-A,2,9.99,19.98,Thursday,False
9,2024-07-05,Widget,Widget-B,6,9.99,59.94,Friday,False
10,2024-07-05,Gadget,Gadget-X,3,19.99,59.97,Friday,False
11,2024-07-06,Gadget,,5,19.99,99.95,Saturday,False
12,2024-07-06,Doodad,Doodad-3,1,4.99,4.99,Saturday,False


## 3. COMPLEX TRANSFORMATIONS

> **Group data by category and calculate:**

In [0]:
# Average price per product in the category
df_avg_price = df_flg.groupBy("category", "product") \
    .agg({"price": "avg"}).withColumnRenamed("avg(price)", "avg_price") \
        .orderBy("category", "product")
display(df_avg_price)

# Total revenue for each category
df_total_revenue = df_flg.groupBy("category") \
    .agg({"total_sales": "sum"}).withColumnRenamed("sum(total_sales)", "total_revenue") \
        .orderBy("category")
display(df_total_revenue)

# Day with highest sales for the category

window_spec = Window.partitionBy("category").orderBy(col("total_sales").desc())
df_ranked = df_flg.withColumn("rank", row_number().over(window_spec))
 
df_max_sales = df_ranked.filter(col("rank") == 1).select(
    col("category"), 
    col("day_of_week"), 
    col("total_sales").alias("highest_sales")
    )
display(df_max_sales)
 


category,product,avg_price
Doodad,Doodad-1,4.99
Doodad,Doodad-2,4.99
Doodad,Doodad-3,4.99
Gadget,,19.99
Gadget,Gadget-X,19.99
Gadget,Gadget-Y,19.99
Widget,Widget-A,9.99
Widget,Widget-B,9.99
Widget,Widget-C,9.99


category,total_revenue
Doodad,154.69000000000003
Gadget,1639.1800000000003
Widget,859.1400000000001


category,day_of_week,highest_sales
Doodad,Wednesday,19.96
Gadget,Friday,179.91
Widget,Monday,99.9


> **Identify outliers in the data**

In [0]:
# 1. Transactions where quantity is more than 2 standard deviations from the category mean
print("Transactions where quantity is more than 2 standard deviations from the category mean")

# Generating the stats (mean and standard deviation) grouped by category
df_stats = df_cleaned.groupBy("category") \
    .agg(
        mean("quantity").alias("avg_quantity"), 
        stddev("quantity").alias("stddev_quantity"))

# Joining the stats to the original data 
df_joined_stats = df_cleaned.alias("a").join(df_stats.alias("b"), "category", "outer")

# display(df_stats)
# display(df_joined_stats)

# Calculating the difference between the quantity and the mean quantity for each row and the threshold to be compared to (I generated these as 2 new columns in order to verify the output is correct)
df_outliers = df_joined_stats.select(
    "*",
    lit(abs(col("quantity") - col("avg_quantity"))).alias("qty_diff"),
    lit(2 * col("stddev_quantity")).alias("qty_threshold")
)

# OUTPUT:
df_outliers_filtered = df_outliers.filter(col("qty_diff") > col("qty_threshold"))
display(df_outliers_filtered)




# 2. Mark these rows with an outlier flag
print("Mark these rows with an outlier flag")

df_outliers_flg = df_outliers.select(
    "*",
    when(col("qty_diff") > col("qty_threshold"), True).otherwise(False).alias("outlier_flag")
)

display(df_outliers_flg)





Transactions where quantity is more than 2 standard deviations from the category mean


category,transaction_id,date,product,quantity,price,avg_quantity,stddev_quantity,qty_diff,qty_threshold
Widget,1,2024-07-01,Widget-A,10,9.99,5.0588235294117645,2.435944073539408,4.941176470588236,4.871888147078816


Mark these rows with an outlier flag


category,transaction_id,date,product,quantity,price,avg_quantity,stddev_quantity,qty_diff,qty_threshold,outlier_flag
Doodad,12,2024-07-06,Doodad-3,1,4.99,2.583333333333333,1.0836246694508314,1.5833333333333337,2.1672493389016627,False
Doodad,16,2024-07-08,Doodad-1,2,4.99,2.583333333333333,1.0836246694508314,0.5833333333333335,2.1672493389016627,False
Doodad,19,2024-07-10,Doodad-2,4,4.99,2.583333333333333,1.0836246694508314,1.4166666666666663,2.1672493389016627,False
Doodad,22,2024-07-11,Doodad-3,3,4.99,2.583333333333333,1.0836246694508314,0.4166666666666665,2.1672493389016627,False
Doodad,25,2024-07-13,Doodad-1,2,4.99,2.583333333333333,1.0836246694508314,0.5833333333333335,2.1672493389016627,False
Doodad,28,2024-07-14,Doodad-2,4,4.99,2.583333333333333,1.0836246694508314,1.4166666666666663,2.1672493389016627,False
Doodad,31,2024-07-16,Doodad-3,1,4.99,2.583333333333333,1.0836246694508314,1.5833333333333337,2.1672493389016627,False
Doodad,34,2024-07-17,Doodad-1,2,4.99,2.583333333333333,1.0836246694508314,0.5833333333333335,2.1672493389016627,False
Doodad,37,2024-07-19,Doodad-2,3,4.99,2.583333333333333,1.0836246694508314,0.4166666666666665,2.1672493389016627,False
Doodad,43,2024-07-22,Doodad-1,2,4.99,2.583333333333333,1.0836246694508314,0.5833333333333335,2.1672493389016627,False


## 4. STORAGE

> **Store the cleaned and processed data in a SQLite database with separate tables for:**

• Transactions
• Aggregated metrics by category
• Outliers

In [0]:
# Importar librerías necesarias
import os
import sqlite3
import pandas as pd
 
# Transactions
df_transactions_pd = df_cleaned.toPandas()

# Aggregated metrics by category
df_aggregated_pd = df_flg.toPandas()

# Outliers
df_outliers_pd = df_outliers_flg.toPandas()
 
# Conectar a la base de datos SQLite (o crearla si no existe)
db_dir = '/tmp/mydata/'
if not os.path.exists(db_dir):
    os.makedirs(db_dir)

conn = sqlite3.connect("/tmp/mydata/sales_data.db")
cursor = conn.cursor()
 
# Escribir el DataFrame en una tabla SQLite
df_transactions_pd.to_sql("transactions_table", conn, if_exists="replace", index=False)
df_aggregated_pd.to_sql("aggregated_table", conn, if_exists="replace", index=False)
df_outliers_pd.to_sql("outliers_table", conn, if_exists="replace", index=False)
 
# Cerrar la conexión
conn.close()

In [0]:
conn = sqlite3.connect("/tmp/mydata/sales_data.db")
cursor = conn.cursor()

cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
print(cursor.fetchall())

[('transactions_table',), ('aggregated_table',), ('outliers_table',)]


In [0]:
cursor.execute("SELECT * FROM transactions_table;")
rows = cursor.fetchall()

print("TABLA transactions_table:")
for row in rows:
    print(row)

TABLA transactions_table:
(1, '2024-07-01', 'Widget', 'Widget-A', 10, 9.99)
(2, '2024-07-01', 'Gadget', 'Gadget-X', 5, 19.99)
(3, '2024-07-02', 'Widget', 'Widget-B', 7, 9.99)
(5, '2024-07-03', 'Widget', 'Widget-C', 3, 9.99)
(6, '2024-07-03', 'Gadget', 'Gadget-Y', 8, 19.99)
(7, '2024-07-04', 'Widget', 'Widget-A', 2, 9.99)
(9, '2024-07-05', 'Widget', 'Widget-B', 6, 9.99)
(10, '2024-07-05', 'Gadget', 'Gadget-X', 3, 19.99)
(11, '2024-07-06', 'Gadget', None, 5, 19.99)
(12, '2024-07-06', 'Doodad', 'Doodad-3', 1, 4.99)
(13, '2024-07-07', 'Widget', 'Widget-C', 8, 9.99)
(14, '2024-07-07', 'Gadget', 'Gadget-Y', 4, 19.99)
(15, '2024-07-08', 'Widget', 'Widget-A', 3, 9.99)
(16, '2024-07-08', 'Doodad', 'Doodad-1', 2, 4.99)
(18, '2024-07-09', 'Widget', 'Widget-B', 5, 9.99)
(19, '2024-07-10', 'Doodad', 'Doodad-2', 4, 4.99)
(20, '2024-07-10', 'Gadget', 'Gadget-Y', 7, 19.99)
(21, '2024-07-11', 'Widget', 'Widget-C', 6, 9.99)
(22, '2024-07-11', 'Doodad', 'Doodad-3', 3, 4.99)
(23, '2024-07-12', 'Gadget', '

In [0]:
cursor.execute("SELECT * FROM aggregated_table;")
rows = cursor.fetchall()

print("TABLA aggregated_table:")
for row in rows:
    print(row)


TABLA aggregated_table:
(1, '2024-07-01', 'Widget', 'Widget-A', 10, 9.99, 99.9, 'Monday', 0)
(2, '2024-07-01', 'Gadget', 'Gadget-X', 5, 19.99, 99.94999999999999, 'Monday', 0)
(3, '2024-07-02', 'Widget', 'Widget-B', 7, 9.99, 69.93, 'Tuesday', 0)
(5, '2024-07-03', 'Widget', 'Widget-C', 3, 9.99, 29.97, 'Wednesday', 0)
(6, '2024-07-03', 'Gadget', 'Gadget-Y', 8, 19.99, 159.92, 'Wednesday', 0)
(7, '2024-07-04', 'Widget', 'Widget-A', 2, 9.99, 19.98, 'Thursday', 0)
(9, '2024-07-05', 'Widget', 'Widget-B', 6, 9.99, 59.94, 'Friday', 0)
(10, '2024-07-05', 'Gadget', 'Gadget-X', 3, 19.99, 59.97, 'Friday', 0)
(11, '2024-07-06', 'Gadget', None, 5, 19.99, 99.94999999999999, 'Saturday', 0)
(12, '2024-07-06', 'Doodad', 'Doodad-3', 1, 4.99, 4.99, 'Saturday', 0)
(13, '2024-07-07', 'Widget', 'Widget-C', 8, 9.99, 79.92, 'Sunday', 0)
(14, '2024-07-07', 'Gadget', 'Gadget-Y', 4, 19.99, 79.96, 'Sunday', 0)
(15, '2024-07-08', 'Widget', 'Widget-A', 3, 9.99, 29.97, 'Monday', 0)
(16, '2024-07-08', 'Doodad', 'Doodad-

In [0]:
cursor.execute("SELECT * FROM outliers_table;")
rows = cursor.fetchall()

print("TABLA outliers_table:")
for row in rows:
    print(row)

conn.close()

TABLA outliers_table:
('Doodad', 12, '2024-07-06', 'Doodad-3', 1, 4.99, 2.5833333333333335, 1.0836246694508314, 1.5833333333333335, 2.1672493389016627, 0)
('Doodad', 16, '2024-07-08', 'Doodad-1', 2, 4.99, 2.5833333333333335, 1.0836246694508314, 0.5833333333333335, 2.1672493389016627, 0)
('Doodad', 19, '2024-07-10', 'Doodad-2', 4, 4.99, 2.5833333333333335, 1.0836246694508314, 1.4166666666666665, 2.1672493389016627, 0)
('Doodad', 22, '2024-07-11', 'Doodad-3', 3, 4.99, 2.5833333333333335, 1.0836246694508314, 0.4166666666666665, 2.1672493389016627, 0)
('Doodad', 25, '2024-07-13', 'Doodad-1', 2, 4.99, 2.5833333333333335, 1.0836246694508314, 0.5833333333333335, 2.1672493389016627, 0)
('Doodad', 28, '2024-07-14', 'Doodad-2', 4, 4.99, 2.5833333333333335, 1.0836246694508314, 1.4166666666666665, 2.1672493389016627, 0)
('Doodad', 31, '2024-07-16', 'Doodad-3', 1, 4.99, 2.5833333333333335, 1.0836246694508314, 1.5833333333333335, 2.1672493389016627, 0)
('Doodad', 34, '2024-07-17', 'Doodad-1', 2, 4.9