## Setup

In [1]:
import pandas as pd
import logging 
import sys
import os
sys.path.insert(0,'./src')


from src.utils.logger import sys_logger

os.makedirs("./final_data", exist_ok=True)

## Landing Zone to Staging

In [2]:
from src.utils.splitter import split_files

split_files()

[splitter.py:9]                 Files in landing zone: ['products_20241012.json', 'customer_20241013.csv', 'products_20241011.json', 'sales_20241011.txt', 'sales_20241015.txt', 'customer_20241011.csv', 'products_20241016.json', 'sales_20241016.txt', 'sales_20241012.txt', 'customer_20241016.csv', 'sales_20241013.txt', 'products_20241010.json', 'customer_20241010.csv', 'sales_20241010.txt', 'products_20241009.json', 'customer_20241015.csv', 'customer_20241009.csv', 'customer_20241014.csv', 'products_20241014.json', 'products_20241015.json', 'products_20241013.json', 'customer_20241012.csv', 'sales_20241009.txt', 'sales_20241014.txt']
[splitter.py:14]                products_20241012.json matched with product
[splitter.py:17]                products_20241012.json moved to ./data/products/
[splitter.py:14]                customer_20241013.csv matched with customer
[splitter.py:17]                customer_20241013.csv moved to ./data/customer/
[splitter.py:14]                products_202410

## Staging to Tables

In [3]:
from src.utils.ingestion import run_ingestion

run_ingestion()

[watermark.py:8]                Reading watermark table from ./watermark.parquet
[watermark.py:12]               Watermark table not found at ./watermark.parquet. Creating new table
[watermark.py:14]               Watermark table has 0 rows
[ingestion.py:12]               Loading customer data
[watermark.py:18]               Getting last run date for customer
[watermark.py:21]               No previous runs found for customer
[files.py:10]                   Getting files to process from ./data/customer/
[files.py:14]                   Found 8 files to process
[ingestion.py:19]               Processing customer_20241009.csv
[files.py:26]                   Reading csv file from ./data/customer/customer_20241009.csv with kwargs {'sep': ','}
[dataframes.py:135]             Processing data for customer
[files.py:49]                   Getting saved file for customer
[files.py:18]                   Getting save filename for customer
[files.py:34]                   Reading parquet file from ./

## Report Generation

In [None]:
from matplotlib import pyplot as plt

### Customer Growth

In [None]:
customers = pd.read_parquet('./final_data/customer.parquet')

dates = customers['effective_from'].unique()

data = []
for date in sorted(dates):
    snapshot = customers[(customers['effective_from'] <= date) & (customers['expiry_date'] > date)]
    data.append({'date': date, 'count': len(snapshot)})

df = pd.DataFrame(data)
df.plot(x='date', y='count')
plt.title('Customer Growth')
plt.xlabel('Date')
plt.ylabel('Number of Customers')
plt.show()

### Sales By Membership

In [None]:
# Sales By Members
customers = pd.read_parquet('./final_data/customer.parquet')

dates = customers['effective_from'].unique()
sales = pd.read_parquet('./final_data/sales.parquet')
merged = sales.merge(customers, on='customer_id', how='outer')
merged = merged[(merged['effective_from_y'] <= merged['sale_date']) & (merged['expiry_date_y'] > merged['sale_date'])]
data = merged.groupby(['sale_date', 'membership_status'])['total_price'].sum().unstack().fillna(0)

data.plot()
plt.title('Sales by Membership Status')
plt.xlabel('Date')
plt.ylabel('Total Sales')
plt.show()

### Product Sale Breakdown

In [None]:
# pivot of most popular products over time from members vs non-members

products = pd.read_parquet('./final_data/product.parquet')
customers = pd.read_parquet('./final_data/customer.parquet')
sales = pd.read_parquet('./final_data/sales.parquet')

merged = sales.merge(customers, on='customer_id', how='outer')
merged = merged[(merged['effective_from_y'] <= merged['sale_date']) & (merged['expiry_date_y'] > merged['sale_date'])]
merged_with_products = merged.merge(products, on='product_id', how='left')
data = merged_with_products.groupby(['sale_date', 'membership_status', 'name'])['total_price'].sum().unstack().fillna(0)

data