In [50]:
import pandas as pd
from sqlalchemy import create_engine
import configparser

# Load MySQL credentials
def get_sqlalchemy_engine(config_file, section):
    config = configparser.ConfigParser()
    config.read(config_file)
    db = config[section]
    return create_engine(
        f"mysql+mysqlconnector://{db['user']}:{db['password']}@{db['host']}/{db['database']}"
    )

# Connect to source (operational DB) and target (data warehouse)
source_engine = get_sqlalchemy_engine("querycrew.ini", "querycrew_db")
target_engine = get_sqlalchemy_engine("querycrew.ini", "querycrew_wh")

In [51]:
# CLEAN CUSTOMERS: Standardize names/emails, drop rows with missing key fields
def clean_customers(df):
    df = df.drop_duplicates(subset='customer_id')
    df = df.dropna(subset=['first_name', 'last_name', 'email', 'phone', 'address'])
    df['first_name'] = df['first_name'].str.title().str.strip()
    df['last_name'] = df['last_name'].str.title().str.strip()
    df['email'] = df['email'].str.lower().str.strip()
    df['address'] = df['address'].str.title().str.strip()
    return df

In [52]:
# CLEAN CARS: Format make/model, remove incomplete rows
def clean_cars(df):
    df = df.drop_duplicates(subset='car_id')
    df = df.dropna(subset=['car_make', 'car_model', 'year', 'car_cost'])

    # Standardize everything except BMW
    df['car_make'] = df['car_make'].apply(lambda x: x if x.strip().upper() == 'BMW' else x.title().strip())
    
    # Only change non-BMW car_model to title case
    df['car_model'] = df.apply(
        lambda row: row['car_model'].strip() if row['car_make'] == 'BMW' else row['car_model'].title().strip(),
        axis=1
    )

    df['year'] = pd.to_numeric(df['year'], errors='coerce')
    df['car_cost'] = pd.to_numeric(df['car_cost'], errors='coerce')
    return df

In [53]:
# CLEAN CAR SALES: Parse date, clean price, drop incomplete records
def clean_car_sales(df):
    df = df.drop_duplicates(subset='sale_id')
    df = df.dropna(subset=['car_make', 'car_model', 'price', 'sale_date', 'year'])

    df['car_make'] = df['car_make'].apply(lambda x: x if x.strip().upper() == 'BMW' else x.title().strip())
    
    df['car_model'] = df.apply(
        lambda row: row['car_model'].strip() if row['car_make'] == 'BMW' else row['car_model'].title().strip(),
        axis=1
    )

    df['price'] = pd.to_numeric(df['price'], errors='coerce')
    df['sale_date'] = pd.to_datetime(df['sale_date'], errors='coerce')
    df['year'] = pd.to_numeric(df['year'], errors='coerce')
    return df

In [54]:
# CLEAN EV ARRIVALS: Capitalize makes, fix dates, drop null values
def clean_ev_arrivals(df):
    df = df.drop_duplicates(subset='arrival_id')
    df = df.dropna(subset=['ev_make', 'ev_model', 'arrival_date'])

    df['ev_make'] = df['ev_make'].apply(lambda x: x if x.strip().upper() == 'BMW' else x.title().strip())
    
    df['ev_model'] = df.apply(
        lambda row: row['ev_model'].strip() if row['ev_make'] == 'BMW' else row['ev_model'].title().strip(),
        axis=1
    )

    df['arrival_date'] = pd.to_datetime(df['arrival_date'], errors='coerce')
    return df

In [55]:
# CLEAN DEALER SALES SUMMARY: Drop duplicates/nulls, parse numbers
def clean_dealer_sales_summary(df):
    df = df.drop_duplicates(subset='dealer_id')
    df = df.dropna(subset=['dealer_name', 'total_sales'])
    df['dealer_name'] = df['dealer_name'].str.title().str.strip()
    df['total_sales'] = pd.to_numeric(df['total_sales'], errors='coerce')
    return df

In [56]:
# CLEAN DEALERS: Format names, drop if contact info is missing
def clean_dealers(df):
    df = df.drop_duplicates(subset='dealer_id')
    df = df.dropna(subset=['dealer_name', 'phone', 'email'])
    df['dealer_name'] = df['dealer_name'].str.title().str.strip()
    return df

In [57]:
# PROCESS AND LOAD CLEANED TABLES INTO WAREHOUSE (querycrew_wh)
tables = {
    "Customers": clean_customers,
    "Car": clean_cars,
    "car_sales": clean_car_sales,
    "EV_arrivals": clean_ev_arrivals,
    "dealer_sales_summary": clean_dealer_sales_summary,
    "Dealers": clean_dealers
}

for table_name, clean_func in tables.items():
    df = pd.read_sql_table(table_name, source_engine)
    df_clean = clean_func(df)
    df_clean.to_sql(table_name, con=target_engine, if_exists='replace', index=False)
    print(f"{table_name} cleaned and loaded.")

Customers cleaned and loaded.
Car cleaned and loaded.
car_sales cleaned and loaded.
EV_arrivals cleaned and loaded.
dealer_sales_summary cleaned and loaded.
Dealers cleaned and loaded.
