import polars as pl
import numpy as np
import os
import sys
from pathlib import Path

# Add project root to path for helpers
project_root = Path(os.getcwd())
while not (project_root / '.git').exists() and project_root != project_root.parent:
    project_root = project_root.parent
sys.path.append(str(project_root))

from src.utils.data_manager import load_from, save_to

print('âœ… Preprocessing helpers loaded!')

In [None]:
import polars as pl
import numpy as np
from pathlib import Path

from src.utils.data_manager import load_from, save_to

## 1. Load Data
Loading raw data from `data/raw/car_sales_data.csv`.

In [None]:
cars_raw = pl.read_csv(load_from("raw", "Cars.csv"))
customers_raw = pl.read_csv(load_from("raw", "Customers.csv"))
sales_raw = pl.read_csv(load_from("raw", "Sales.csv"))

print(f"Cars shape: {cars_raw.shape}")
print(f"Customers shape: {customers_raw.shape}")
print(f"Sales shape: {sales_raw.shape}")

## 2. Inspect Anomalies
Identifying duplicates and mismatched types before cleaning.

In [None]:
# 1. Check for Validation: Duplicates
duplicates = df_raw.filter(df_raw.is_duplicated())
if duplicates.height > 0:
    print(f"Found {duplicates.height} duplicate rows:")
    print(duplicates)
else:
    print("No duplicates found.")

# 2. Apply transformations WITHOUT dropping nulls initially to find bad data
df_casted = (
    df_raw
    .unique() # handling duplicates first
    .with_columns([
        # String standardization
        pl.col("Manufacturer").str.strip_chars().str.to_titlecase(),
        pl.col("Model").str.strip_chars(),
        pl.col("Fuel type").str.strip_chars().str.to_titlecase(),
        
        # Numeric casting (strict=False turns errors into nulls)
        pl.col("Engine size").cast(pl.Float64, strict=False),
        pl.col("Year of manufacture").cast(pl.Int64, strict=False),
        pl.col("Mileage").cast(pl.Int64, strict=False),
        pl.col("Price").cast(pl.Float64, strict=False)
    ])
)

# 3. Check for rows that became Null (indicating bad data)
nan_rows = df_casted.filter(pl.any_horizontal(pl.all().is_null()))

if nan_rows.height > 0:
    print(f"\nFound {nan_rows.height} rows with nulls (potential casting errors):")
    print(nan_rows)
else:
    print("\nNo null rows found after casting.")

## 3. Finalize Cleaning
Dropping identified invalid rows and saving.

In [None]:
df_cleaned = df_casted.drop_nulls()

print(f"Original shape: {df_raw.shape}")
print(f"Cleaned shape:  {df_cleaned.shape}")
df_cleaned.head()

In [None]:
# --- CREATIVE DATA GENERATION: Transforming to Star Schema ---
print('ðŸš€ Generating Star Schema datasets...')

# 1. Create CARS table
cars_base = df_cleaned.select([
    pl.col('Manufacturer').alias('Brand'),
    pl.col('Model'),
    pl.col('Year of manufacture').alias('Year'),
    pl.col('Price')
]).unique()

df_cars = cars_base.with_columns([
    pl.concat_str([pl.lit('C'), pl.int_range(1, cars_base.height + 1).cast(pl.String).str.zfill(4)]).alias('Car_ID'),
    pl.lit('White').alias('Color'),
    pl.lit('Petrol').alias('Engine_Type'),
    pl.lit('Automatic').alias('Transmission'),
    pl.lit(10).alias('Quantity_In_Stock'),
    pl.lit('Available').alias('Status')
])

# 2. Create CUSTOMERS table
# Generate 500 mock customers
num_customers = 500
df_customers = pl.DataFrame({
    'Customer_ID': [f'CU{str(i).zfill(4)}' for i in range(1, num_customers + 1)],
    'First Name': ['Customer'] * num_customers,
    'Last Name': [str(i) for i in range(1, num_customers + 1)],
    'Gender': ['Other'] * num_customers,
    'Age': np.random.randint(18, 70, num_customers),
    'Job Role': ['Professional'] * num_customers,
    'Phone': ['555-0100'] * num_customers,
    'Email': [f'user{i}@example.com' for i in range(1, num_customers + 1)],
    'City': ['Bangkok'] * num_customers,
    'State': ['Thailand'] * num_customers,
    'Region': np.random.choice(['North', 'South', 'East', 'West', 'Central'], num_customers)
})

# 3. Create SALES table
# Link raw records back to generated IDs
df_sales_raw = df_cleaned.join(df_cars.select(['Brand', 'Model', 'Year', 'Price', 'Car_ID']), 
                               left_on=['Manufacturer', 'Model', 'Year of manufacture', 'Price'], 
                               right_on=['Brand', 'Model', 'Year', 'Price'])

df_sales = df_sales_raw.with_columns([
    pl.concat_str([pl.lit('S'), pl.int_range(1, df_sales_raw.height + 1).cast(pl.String).str.zfill(5)]).alias('Sale_ID'),
    pl.col('Car_ID'),
    # Randomly assign a customer to each sale
    pl.lit(df_customers['Customer_ID'].to_numpy()[np.random.randint(0, num_customers, df_sales_raw.height)]).alias('Customer_ID'),
    pl.lit('2024-01-01').alias('Sale_Date'), # Static for now
    pl.lit(1).alias('Quantity'),
    pl.col('Price').alias('Sale_Price'),
    pl.lit('Cash').alias('Payment_Method'),
    pl.lit('AI Bot').alias('Salesperson')
]).select(['Sale_ID', 'Customer_ID', 'Car_ID', 'Sale_Date', 'Quantity', 'Sale_Price', 'Payment_Method', 'Salesperson'])

# 4. Save everything
df_cars.write_csv(save_to('cleaned', 'Cars_cleaned.csv'))
df_customers.write_csv(save_to('cleaned', 'Customers_cleaned.csv'))
df_sales.write_csv(save_to('cleaned', 'Sales_cleaned.csv'))
df_cleaned.write_csv(save_to('cleaned', 'car_sales_data_cleaned.csv'))

print(f'âœ… Generated {df_cars.height} cars, {df_customers.height} customers, and {df_sales.height} sales records.')
print('âœ… Star Schema files saved to data/cleaned/')

In [None]:
cars_cleaned.write_csv(save_to("cleaned", "Cars_cleaned.csv"))
customers_cleaned.write_csv(save_to("cleaned", "Customers_cleaned.csv"))
sales_cleaned.write_csv(save_to("cleaned", "Sales_cleaned.csv"))

print("All cleaned datasets saved successfully!")