## Data Processing

### Set-Up

In [11]:
# import libraries
import os
import polars as pl

In [12]:
# set OpenMP threads to prevent Windows MKL memory leak
os.environ["OMP_NUM_THREADS"] = "1"

In [13]:
# file path to dataset
file_path = "C:\\Users\\agste\\.cache\\kagglehub\\datasets\\carrie1\\ecommerce-data\\versions\\1\\data.csv"

In [14]:
# load data using Polars
print("Loading dataset...")
df = pl.read_csv(file_path, encoding="ISO-8859-1", ignore_errors=True).with_columns(
    pl.col("InvoiceDate").str.to_datetime("%m/%d/%Y %H:%M", strict=False)
)
print(df.head())

Loading dataset...
shape: (5, 8)
┌───────────┬───────────┬──────────────┬──────────┬─────────────┬───────────┬────────────┬─────────┐
│ InvoiceNo ┆ StockCode ┆ Description  ┆ Quantity ┆ InvoiceDate ┆ UnitPrice ┆ CustomerID ┆ Country │
│ ---       ┆ ---       ┆ ---          ┆ ---      ┆ ---         ┆ ---       ┆ ---        ┆ ---     │
│ i64       ┆ str       ┆ str          ┆ i64      ┆ datetime[μs ┆ f64       ┆ i64        ┆ str     │
│           ┆           ┆              ┆          ┆ ]           ┆           ┆            ┆         │
╞═══════════╪═══════════╪══════════════╪══════════╪═════════════╪═══════════╪════════════╪═════════╡
│ 536365    ┆ 85123A    ┆ WHITE        ┆ 6        ┆ 2010-12-01  ┆ 2.55      ┆ 17850      ┆ United  │
│           ┆           ┆ HANGING      ┆          ┆ 08:26:00    ┆           ┆            ┆ Kingdom │
│           ┆           ┆ HEART        ┆          ┆             ┆           ┆            ┆         │
│           ┆           ┆ T-LIGHT HO…  ┆          ┆       

### Data Cleaning

In [15]:
# drop null values and remove duplicates
df = df.drop_nulls(["InvoiceNo", "CustomerID"]).unique()

In [16]:
# create the Monetary column for RFM analysis & remove orders with 0 Monetary value
df = df.with_columns(
    pl.col("Quantity").mul(pl.col("UnitPrice")).alias("Monetary")
).filter(~pl.col("Monetary").eq(0))

In [17]:
# cast Invoice date column to Date object
df = df.with_columns(pl.col("InvoiceDate").cast(pl.Date))

In [18]:
# filter out invalid StockCodes based on character length
acceptable_number_of_chars = [5, 6, 7]
df = (
    df.with_columns(pl.col("StockCode").str.len_chars().alias("number_of_char"))
    .filter(pl.col("number_of_char").is_in(acceptable_number_of_chars))
    .drop("number_of_char")
)

### Save

In [19]:
# define output path
output_dir = "C:\\Users\\agste\\Angelos Work Projects\\RFM & Clustering Project\\Data_Lake"
os.makedirs(output_dir, exist_ok=True)  # Create directory if not exists
data_output_path = os.path.join(output_dir, "cleaned_data.parquet")

In [20]:
# save cleaned data as Parquet
df.write_parquet(data_output_path)
print(f"Cleaned data saved to {data_output_path}")

Cleaned data saved to C:\Users\agste\Angelos Work Projects\RFM & Clustering Project\Data_Lake\cleaned_data.parquet
