In [60]:
# import libaries
import pandas as pd
import polars as pl
import requests
import zipfile
import os
from datetime import datetime

In [2]:
URL = "http://archive.ics.uci.edu/static/public/502/online+retail+ii.zip/"

https://gist.github.com/niftycode/a747648db1b79396b8e4814946a4dba2

https://docs.pola.rs/api/python/dev/reference/api/polars.read_excel.html

https://docs.pola.rs/py-polars/html/reference/dataframe/index.html

### **Task 2: Data Loading**

1. Create a script to load http://archive.ics.uci.edu/dataset/502/online+retail+ii.
2. Implement functions to read datasets using both **Pandas** and **Polars**.
3. Compare loading times and document findings by creating a benchmark function that returns;
    - data loading time
    - aggregation time.

In [61]:
# create a decorator that prints time
def time_execution():
    def decorator(func):
        def wrapper(*args, **kwargs):
            start_time = datetime.now()
            result = func(*args, **kwargs)
            end_time = datetime.now()
            load_time = end_time - start_time
            print(f"Execution time: {load_time}")
            return result  
        return wrapper
    return decorator

In [62]:
# Create directory if it doesn't exist
os.makedirs('data', exist_ok=True)

@time_execution()
def download_data(url, file_name):
    try:
        response = requests.get(url)
        if response.status_code == 200:
            with open(file_name, 'wb') as f:
                f.write(response.content)
            print("Download complete:", file_name)
        else:
            print("Download failed")
    except Exception as e:
        print("Error:", e)
        
@time_execution()
def extract_zip_file(zip_file):
    try:
        with zipfile.ZipFile(zip_file, 'r') as file:
            file.extractall("data")
        print("Extraction complete")
    except Exception as e:
        print("Error:", e)


In [63]:
zip_path = "data/online_retail.zip"

download_data(URL, zip_path)
extract_zip_file(zip_path)

Download complete: data/online_retail.zip
Execution time: 0:00:30.332860
Extraction complete
Execution time: 0:00:00.121510


In [43]:
# data path 
data = os.path.join('data', 'online_retail_II.xlsx')

In [67]:
@time_execution()
def pandas_read_data(file_path:str) -> pd.DataFrame:
    """Read data from an Excel file"""
    try:
        pandas_df = pd.read_excel(file_path)
        return pandas_df
    except Exception as e:
        print(f"Error reading file {file_path}: {e}")
        return None

In [68]:
@time_execution()
def polars_read_data(file_path: str) -> pl.DataFrame:
    """Reads an Excel file with Polars"""
    try:
        polars_df = pl.read_excel(file_path, has_header=True)
        return polars_df
    except Exception as e:
        print(f"Error loading file: {e}")
        return None

In [69]:
# run the two functions
df_pandas = pandas_read_data(data)
df_polars = polars_read_data(data)

Execution time: 0:00:33.739795
Execution time: 0:00:04.326165


In [70]:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 525461 entries, 0 to 525460
Data columns (total 8 columns):
 #   Column       Non-Null Count   Dtype         
---  ------       --------------   -----         
 0   Invoice      525461 non-null  object        
 1   StockCode    525461 non-null  object        
 2   Description  522533 non-null  object        
 3   Quantity     525461 non-null  int64         
 4   InvoiceDate  525461 non-null  datetime64[ns]
 5   Price        525461 non-null  float64       
 6   Customer ID  417534 non-null  float64       
 7   Country      525461 non-null  object        
dtypes: datetime64[ns](1), float64(2), int64(1), object(4)
memory usage: 32.1+ MB


In [71]:
df_pandas.describe()

Unnamed: 0,Quantity,InvoiceDate,Price,Customer ID
count,525461.0,525461,525461.0,417534.0
mean,10.337667,2010-06-28 11:37:36.845017856,4.688834,15360.645478
min,-9600.0,2009-12-01 07:45:00,-53594.36,12346.0
25%,1.0,2010-03-21 12:20:00,1.25,13983.0
50%,3.0,2010-07-06 09:51:00,2.1,15311.0
75%,10.0,2010-10-15 12:45:00,4.21,16799.0
max,19152.0,2010-12-09 20:01:00,25111.09,18287.0
std,107.42411,,146.126914,1680.811316


### **Task 3: Data Cleaning**

1. Implement functions to handle missing values (e.g., fill with mean, drop rows).
2. Apply cleaning transformations using **Pandas** and **Polars**.
3. Document data cleaning process and reason behind your methodology.

***Suggestion***

As a data engineer, it's crucial to consider the needs of end users, such as data analysts, and the types of calculations they will perform with the data. Upon reviewing the dataset, I noticed that some descriptions were missing. More importantly, certain customer IDs were absent. However, this does not necessarily mean that these transactions did not occur—it could be due to gaps in their system.

A proper approach would be to implement data modeling, ensuring a well-structured database by splitting the data into three key tables:

*  Products – Containing product details such as name, category, and price.
*  Transactions – Recording all sales and linking them to customers and products.
*  Customers – Storing customer details, ensuring each transaction can be properly attributed.

***Data Transformation Steps:***

* Negative Values in Price and Quantity: Some values in these columns are negative, possibly due to refunds or data entry errors. To fix this, I will convert them to their absolute values to ensure all values are positive.

* Canceled Transactions: Transactions where the "StockCode" starts with "C" indicate cancellations, but there is no clear label for this. I will create a new column, "TransactionStatus," labeling such transactions as "Canceled" and others as "Completed."

* Missing Customer IDs: Some rows have missing "CustomerID" values, which could lead to loss of important transaction data if removed. Instead of dropping them, I will replace missing values with "Unknown" to maintain data integrity.



In [76]:
df_pandas.head()

Unnamed: 0,Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
0,489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01 07:45:00,6.95,13085.0,United Kingdom
1,489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom
2,489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom
3,489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,2009-12-01 07:45:00,2.1,13085.0,United Kingdom
4,489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01 07:45:00,1.25,13085.0,United Kingdom


In [78]:
df_pandas.sort_values(by='Price',ascending=False)

Unnamed: 0,Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
241824,C512770,M,Manual,-1,2010-06-17 16:52:00,25111.09,17399.0,United Kingdom
241827,512771,M,Manual,1,2010-06-17 16:53:00,25111.09,,United Kingdom
320581,C520667,BANK CHARGES,Bank Charges,-1,2010-08-27 13:42:00,18910.69,,United Kingdom
519294,C537651,AMAZONFEE,AMAZON FEE,-1,2010-12-07 15:49:00,13541.33,,United Kingdom
517955,537632,AMAZONFEE,AMAZON FEE,1,2010-12-07 15:08:00,13541.33,,United Kingdom
...,...,...,...,...,...,...,...,...
66086,495297,37493B,,-37,2010-01-22 13:36:00,0.00,,United Kingdom
193101,507709,17033,sold as 17003?,-2200,2010-05-11 11:20:00,0.00,,United Kingdom
403472,A528059,B,Adjust bad debt,1,2010-10-20 12:04:00,-38925.87,,United Kingdom
276274,A516228,B,Adjust bad debt,1,2010-07-19 11:24:00,-44031.79,,United Kingdom


In [87]:
nega = df_pandas[(df_pandas['Price'] < 0) | (df_pandas['Quantity'] < 0)]
nega

Unnamed: 0,Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
178,C489449,22087,PAPER BUNTING WHITE LACE,-12,2009-12-01 10:33:00,2.95,16321.0,Australia
179,C489449,85206A,CREAM FELT EASTER EGG BASKET,-6,2009-12-01 10:33:00,1.65,16321.0,Australia
180,C489449,21895,POTTING SHED SOW 'N' GROW SET,-4,2009-12-01 10:33:00,4.25,16321.0,Australia
181,C489449,21896,POTTING SHED TWINE,-6,2009-12-01 10:33:00,2.10,16321.0,Australia
182,C489449,22083,PAPER CHAIN KIT RETRO SPOT,-12,2009-12-01 10:33:00,2.95,16321.0,Australia
...,...,...,...,...,...,...,...,...
525231,538159,21324,,-18,2010-12-09 17:17:00,0.00,,United Kingdom
525232,538158,20892,,-32,2010-12-09 17:17:00,0.00,,United Kingdom
525234,538161,46000S,Dotcom sales,-100,2010-12-09 17:25:00,0.00,,United Kingdom
525235,538162,46000M,Dotcom sales,-100,2010-12-09 17:25:00,0.00,,United Kingdom


I considered keeping transactions with issues since they might help analyze cancellations and adjustments. However, since the requirement is to process only valid data, I will keep only transactions with positive **Price** and **Quantity**, ensuring clean and reliable data for analysis.


Create a new column to indicate canceled transactions

`df["TransactionStatus"] = df["StockCode"].astype(str).apply(lambda x: "Canceled" if x.startswith("C") else "Completed")`

In [105]:
@time_execution()
def pd_transform_data(df:pd.DataFrame) -> pd.DataFrame:
    """Perform Transformation on the data using pandas"""
    # copy the original file without modification
    df = df.copy()

    # Convert negative values in Price and Quantity to positive
    df = df[(df['Price'] < 0) | (df['Quantity'] < 0)]

    # create the amount field which is pirce* quantiy
    df['Amount'] = df["Price"] * df["Quantity"]

    # Fill missing CustomerID with "Unknown"
    df["Customer ID"] = df["Customer ID"].fillna("Unknown")

    return df

In [110]:
@time_execution()
def pl_transform_data(df:pl.DataFrame) -> pl.DataFrame:
    """Perform Transformation on the data using Polars"""
    return(
        df.filter((pl.col("Price")> 0) & (pl.col("Quantity")>0))
        .with_columns([
            (pl.col("Price")* pl.col("Quantity")).alias("Amount"),
            pl.col("Customer ID").fill_null("Unknown")
        ])
    )

In [111]:
# run the two transformed functions
pd_transformed = pd_transform_data(df_pandas)
pl_transformed = pl_transform_data(df_polars)

Execution time: 0:00:00.032791
Execution time: 0:00:00.054188


In [112]:
pl_transformed.head()

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country,Amount
i64,str,str,i64,datetime[ms],f64,str,str,f64
489434,"""85048""","""15CM CHRISTMAS GLASS BALL 20 L…",12,2009-12-01 07:45:00,6.95,"""13085""","""United Kingdom""",83.4
489434,"""79323P""","""PINK CHERRY LIGHTS""",12,2009-12-01 07:45:00,6.75,"""13085""","""United Kingdom""",81.0
489434,"""79323W""",""" WHITE CHERRY LIGHTS""",12,2009-12-01 07:45:00,6.75,"""13085""","""United Kingdom""",81.0
489434,"""22041""","""RECORD FRAME 7"" SINGLE SIZE """,48,2009-12-01 07:45:00,2.1,"""13085""","""United Kingdom""",100.8
489434,"""21232""","""STRAWBERRY CERAMIC TRINKET BOX""",24,2009-12-01 07:45:00,1.25,"""13085""","""United Kingdom""",30.0


In [113]:
pl_transformed.describe()

statistic,Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country,Amount
str,f64,str,str,f64,str,f64,str,str,f64
"""count""",511565.0,"""511566""","""511566""",511566.0,"""511566""",511566.0,"""511566""","""511566""",511566.0
"""null_count""",1.0,"""0""","""0""",0.0,"""0""",0.0,"""0""","""0""",0.0
"""mean""",514521.144429,,,11.40015,"""2010-06-28 22:20:30.395000""",4.252563,,,20.146502
"""std""",14442.248562,,,86.761177,,63.664629,,,90.920077
"""min""",489434.0,"""10002""",""" DOORMAT UNION JACK GUNS AND …",1.0,"""2009-12-01 07:45:00""",0.001,"""12346""","""Australia""",0.001
"""25%""",501896.0,,,1.0,"""2010-03-21 15:22:00""",1.25,,,4.2
"""50%""",514865.0,,,3.0,"""2010-07-07 09:41:00""",2.1,,,10.14
"""75%""",527328.0,,,10.0,"""2010-10-15 15:06:00""",4.21,,,17.7
"""max""",538171.0,"""m""","""ZINC WILLIE WINKIE CANDLE STI…",19152.0,"""2010-12-09 20:01:00""",25111.09,"""Unknown""","""West Indies""",25111.09


1. **Removing Negative Price Values**  
   - If a transaction has a negative **Price**, it likely represents a **canceled order, refund, or adjusted bad debt**.  
   - Keeping these values could distort revenue calculations and business insights.  
   - To ensure accurate financial analysis, all rows with negative **Price** will be removed.  

2. **Removing Negative Quantity Values**  
   - A negative **Quantity** suggests an issue, such as **returns, data entry errors, or adjustments**.  
   - Transactions should typically have positive quantities, as negative values are not standard in sales records.  
   - To maintain data integrity, these rows will also be dropped.  

3. **Why Remove These Rows?**  
   - Any transaction where **Price** or **Quantity** is negative indicates **data issues** that could affect analysis.  
   - Removing them ensures that only valid and meaningful sales records remain.  
   - This step helps produce **more reliable** business insights, revenue tracking, and customer behavior analysis.

Adjusting for bad debt involves recording the debt as an expense and reducing the accounts receivable balance. This is done through a journal entry and an adjusting entry. 
Journal entry Debit bad debt expense and Credit allowance for doubtful accounts. 

### **Task 4: Data Aggregation**

1. Group dataset by a categorical column (e.g., `category`).
2. Compute **mean, sum, count** for a numeric column.
3. Implement aggregation using both **Pandas** and **Polars**.
4. Expose results via a FastAPI endpoint (`/aggregate`).




https://www.geeksforgeeks.org/pandas-groupby/

TransactionStatus

Customer ID

Country

InvoiceDate - Year- day - month

### Pandas Data Aggregation Code

In [115]:
def pd_transaction_per_country(df: pd.DataFrame) -> pd.Series:
    """Counts the number of transactions per country."""
    return df.groupby("Country")["Invoice"].count()

def pd_transaction_revenue_per_country(df: pd.DataFrame) -> pd.Series:
    """Calculates the total transaction amount per country."""
    return df.groupby("Country")["Amount"].sum()

def pd_unique_customers_per_country(df: pd.DataFrame) -> pd.Series:
    """Counts the number of unique customers per country."""
    return df.groupby("Country")["Customer ID"].nunique()

def pd_average_order_value_per_country(df: pd.DataFrame) -> pd.Series:
    """Computes the average order value per country."""
    return df.groupby("Country")["Amount"].mean()

def pd_transactions_per_customer(df: pd.DataFrame) -> pd.Series:
    """Counts the number of transactions per customer."""
    return df.groupby("Customer ID")["Invoice"].nunique()

def pd_total_amount_spent_per_customer(df: pd.DataFrame) -> pd.Series:
    """Calculates the total amount spent per customer."""
    return df.groupby("Customer ID")["Amount"].sum()

def pd_average_order_value_per_customer(df: pd.DataFrame) -> pd.Series:
    """Computes the average order value per customer."""
    return df.groupby("Customer ID")["Amount"].mean()

In [116]:
pd_transaction_per_country(df_pandas)

Country
Australia                  654
Austria                    537
Bahrain                    107
Belgium                   1054
Bermuda                     34
Brazil                      62
Canada                      77
Channel Islands            906
Cyprus                     554
Denmark                    428
EIRE                      9670
Finland                    354
France                    5772
Germany                   8129
Greece                     517
Hong Kong                   76
Iceland                     71
Israel                      74
Italy                      731
Japan                      224
Korea                       63
Lebanon                     13
Lithuania                  154
Malta                      172
Netherlands               2769
Nigeria                     32
Norway                     369
Poland                     194
Portugal                  1101
RSA                        111
Singapore                  117
Spain                     1278


### Polars Data Aggregation Code

https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.group_by.html

In [119]:
def pol_transaction_per_country(df: pl.DataFrame) -> pl.DataFrame:
    """Counts the number of transactions per country."""
    return df.group_by("Country").agg(pl.col("Invoice").count())

def pol_transaction_revenue_per_country(df: pl.DataFrame) -> pl.DataFrame:
    """Calculates the total transaction amount per country."""
    return df.group_by("Country").agg(pl.col("Amount").sum())

def pol_unique_customers_per_country(df: pl.DataFrame) -> pl.DataFrame:
    """Counts the number of unique customers per country."""
    return df.group_by("Country").agg(pl.col("Customer ID").n_unique())

def pol_average_order_value_per_country(df: pl.DataFrame) -> pl.DataFrame:
    """Computes the average order value per country."""
    return df.group_by("Country").agg(pl.col("Amount").mean())

def pol_transactions_per_customer(df: pl.DataFrame) -> pl.DataFrame:
    """Counts the number of transactions per customer."""
    return df.group_by("Customer ID").agg(pl.col("Invoice").n_unique())

def pol_total_amount_spent_per_customer(df: pl.DataFrame) -> pl.DataFrame:
    """Calculates the total amount spent per customer."""
    return df.group_by("Customer ID").agg(pl.col("Amount").sum())

def pol_average_order_value_per_customer(df: pl.DataFrame) -> pl.DataFrame:
    """Computes the average order value per customer."""
    return df.group_by("Customer ID").agg(pl.col("Amount").mean())


In [121]:
pol_transaction_per_country(df_polars)

Country,Invoice
str,u32
"""Finland""",347
"""Lithuania""",154
"""Italy""",710
"""Denmark""",418
"""Hong Kong""",74
…,…
"""United Arab Emirates""",399
"""Unspecified""",306
"""Malta""",170
"""Poland""",182


### **Task 6: Saving and Retrieving Processed Data**

1. Save processed data in **JSON** and **Parquet** formats.
2. Implement API endpoints to download these files (`/download-json`, `/download-parquet`).


In [None]:
# down data to json and parquet using pandas
def pd_download_parquet(df: pd.DataFrame, filename: str) -> None:
    """Saves a Pandas DataFrame as a Parquet file."""
    df.to_parquet(filename, index=False)

def pd_download_json(df: pd.DataFrame, filename: str) -> None:
    """Saves a Pandas DataFrame as a JSON file."""
    df.to_json(filename, orient="records", lines=True)

In [None]:
# down data to json and parquet using polars
def pl_download_parquet(df: pl.DataFrame, filename: str) -> None:
    """Saves a Polars DataFrame as a Parquet file."""
    df.write_parquet(filename)

def pl_download_json(df: pl.DataFrame, filename: str) -> None:
    """Saves a Polars DataFrame as a JSON file."""
    df.write_json(filename)
