
# 02_ETL_AutoSales.ipynb

Pipeline **ETL** pour charger les CSV Kaggle dans **SQL Server** :  
- `cars_dataset.csv` ‚Üí `dbo.stg_cars_dataset` ‚Üí `dim_model` + `fact_listings`  
- `BMW sales data (2010-2024).csv` ‚Üí `dbo.stg_bmw_sales` ‚Üí `dim_*` + `fact_sales`

> **Pr√©-requis** : Avoir ex√©cut√© le DDL (tables `stg_*`, `dim_*`, `fact_*` cr√©√©es) et install√© `pyodbc` + pilote ODBC SQL Server.


In [193]:

# === CONFIGURATION SQL SERVER  ===
from pathlib import Path
import pyodbc
from sqlalchemy import (create_engine, text)

dbms = 'mssql+pyodbc'
driver = 'ODBC+driver+17+for+SQL+Server'
server = r'GRACES_DIVINES\GRACE_DIVINES'
database = 'AutoSales'

con_string_trusted_windows_connection = f'{dbms}://{server}/{database}?trusted_connection=yes&driver={driver}'

engine = create_engine(con_string_trusted_windows_connection)
# === CHEMINS CSV (√† adapter si besoin) ===
BASE = Path(r'C:\Users\tchom\Desktop\project-web-data\bmw-uk-market-analysis\data')  # Remplacer par ton dossier local si besoin
CSV_CARS = BASE / 'cars_dataset.csv'
CSV_BMW  = BASE / 'BMW sales data (2010-2024).csv'

print('CSV cars_dataset path:', CSV_CARS)
print('CSV BMW sales path   :', CSV_BMW)

engine.connect()  # test connection


CSV cars_dataset path: C:\Users\tchom\Desktop\project-web-data\bmw-uk-market-analysis\data\cars_dataset.csv
CSV BMW sales path   : C:\Users\tchom\Desktop\project-web-data\bmw-uk-market-analysis\data\BMW sales data (2010-2024).csv


<sqlalchemy.engine.base.Connection at 0x1d121a1b450>

In [187]:
# Vider les tables de staging
def truncate_staging():
    sqls = [
        "IF OBJECT_ID('dbo.stg_cars_dataset') IS NOT NULL TRUNCATE TABLE dbo.stg_cars_dataset;",
        "IF OBJECT_ID('dbo.stg_bmw_sales')   IS NOT NULL TRUNCATE TABLE dbo.stg_bmw_sales;"
    ]

    # Utilise la connexion DBAPI pour appliquer TRUNCATE; si TRUNCATE √©choue (FKs/permissions), on fait un DELETE
    with engine.connect() as cur:
        try:
            for s in sqls:
                cur.execute(text(s))
            cur.commit()
            print("Staging tables truncated (TRUNCATE applied).")
        except Exception as e:
            cur.rollback()
            print("TRUNCATE failed, attempting DELETE fallback:", e)
            try:
                for s in sqls:
                    # extraire le nom de la table apr√®s 'TRUNCATE TABLE'
                    if 'TRUNCATE TABLE' in s:
                        tbl = s.split('TRUNCATE TABLE')[-1].strip().rstrip(';')
                        cur.execute(f"DELETE FROM {tbl}")
                cur.commit()
                print("Staging tables cleared with DELETE fallback.")
            except Exception as e2:
                cur.rollback()
                raise

# Ne pas ex√©cuter automatiquement; appel contr√¥l√© depuis l'orchestration

In [188]:
import pandas as pd

def load_csv_to_staging():
    insert_cars = (
        "INSERT INTO dbo.stg_cars_dataset(model, [year], price, transmission, mileage, fuelType, tax, mpg, engineSize, [Make]) "
        "VALUES (?,?,?,?,?,?,?,?,?,?)"
    )
    insert_bmw = (
        "INSERT INTO dbo.stg_bmw_sales([Model],[Year],[Region],[Color],[Fuel_Type],[Transmission],[Engine_Size_L],[Mileage_KM],[Price_USD],[Sales_Volume],[Sales_Classification]) "
        "VALUES (?,?,?,?,?,?,?,?,?,?,?)"
    )
    chunksize = 5000

    # utilise engine.raw_connection() pour r√©cup√©rer la connexion DBAPI (pyodbc)
    with engine.connect() as raw_conn:
        # active fast_executemany si le driver le supporte (pyodbc)
        try:
            raw_conn.fast_executemany = True
        except Exception:
            pass

        # cars_dataset
        print("Loading cars_dataset.csv -> stg_cars_dataset ...")
        cols_cars = ["model","year","price","transmission","mileage","fuelType","tax","mpg","engineSize","Make"]
        dtypes_cars = {
            "model":"string","year":"Int64","price":"float64","transmission":"string","mileage":"Int64",
            "fuelType":"string","tax":"float64","mpg":"float64","engineSize":"float64","Make":"string"
        }
        total_rows = 0
        for chunk in pd.read_csv(str(CSV_CARS), usecols=cols_cars, dtype=dtypes_cars, chunksize=chunksize):
            chunk = chunk.where(pd.notnull(chunk), None)
            rows = list(chunk.itertuples(index=False, name=None))
            if rows:
                raw_conn.executemany(insert_cars, rows)
                total_rows += len(rows)
        raw_conn.commit()
        print(f"Inserted {total_rows} rows into stg_cars_dataset.")

        # BMW sales
        print("Loading BMW sales CSV -> stg_bmw_sales ...")
        cols_bmw = ["Model","Year","Region","Color","Fuel_Type","Transmission","Engine_Size_L","Mileage_KM","Price_USD","Sales_Volume","Sales_Classification"]
        dtypes_bmw = {
            "Model":"string","Year":"Int64","Region":"string","Color":"string","Fuel_Type":"string",
            "Transmission":"string","Engine_Size_L":"float64","Mileage_KM":"Int64","Price_USD":"float64",
            "Sales_Volume":"Int64","Sales_Classification":"string"
        }
        total_rows = 0
        for chunk in pd.read_csv(str(CSV_BMW), usecols=cols_bmw, dtype=dtypes_bmw, chunksize=chunksize):
            chunk = chunk.where(pd.notnull(chunk), None)
            rows = list(chunk.itertuples(index=False, name=None))
            if rows:
                raw_conn.executemany(insert_bmw, rows)
                total_rows += len(rows)
        raw_conn.commit()
        print(f"Inserted {total_rows} rows into stg_bmw_sales.")

# Appel contr√¥l√© depuis l'orchestration


In [192]:
import pandas as pd
from sqlalchemy import text

def load_csv_to_staging():
    insert_cars = text("""
        INSERT INTO dbo.stg_cars_dataset
        (model, [year], price, transmission, mileage, fuelType, tax, mpg, engineSize, [Make])
        VALUES (:model, :year, :price, :transmission, :mileage, :fuelType, :tax, :mpg, :engineSize, :Make)
    """)

    insert_bmw = text("""
        INSERT INTO dbo.stg_bmw_sales
        ([Model],[Year],[Region],[Color],[Fuel_Type],[Transmission],
         [Engine_Size_L],[Mileage_KM],[Price_USD],[Sales_Volume],[Sales_Classification])
        VALUES (:Model, :Year, :Region, :Color, :Fuel_Type, :Transmission,
                :Engine_Size_L, :Mileage_KM, :Price_USD, :Sales_Volume, :Sales_Classification)
    """)

    chunksize = 5000

    # --- üöó cars_dataset
    print("Loading cars_dataset.csv -> stg_cars_dataset ...")
    cols_cars = ["model", "year", "price", "transmission", "mileage",
                 "fuelType", "tax", "mpg", "engineSize", "Make"]
    dtypes_cars = {
        "model": "string", "year": "Int64", "price": "float64",
        "transmission": "string", "mileage": "Int64", "fuelType": "string",
        "tax": "float64", "mpg": "float64", "engineSize": "float64", "Make": "string"
    }

    total_rows = 0
    with engine.connect() as conn:
        with conn.begin():  # transaction context
            for chunk in pd.read_csv(str(CSV_CARS), usecols=cols_cars, dtype=dtypes_cars, chunksize=chunksize):
                chunk = chunk.where(pd.notnull(chunk), None)
                rows = chunk.to_dict(orient="records")
                if rows:
                    conn.execute(insert_cars, rows)
                    total_rows += len(rows)
        print(f"Inserted {total_rows} rows into stg_cars_dataset.")

    # --- üöò BMW sales
    print("Loading BMW sales CSV -> stg_bmw_sales ...")
    cols_bmw = ["Model", "Year", "Region", "Color", "Fuel_Type",
                "Transmission", "Engine_Size_L", "Mileage_KM",
                "Price_USD", "Sales_Volume", "Sales_Classification"]
    dtypes_bmw = {
        "Model": "string", "Year": "Int64", "Region": "string", "Color": "string",
        "Fuel_Type": "string", "Transmission": "string", "Engine_Size_L": "float64",
        "Mileage_KM": "Int64", "Price_USD": "float64", "Sales_Volume": "Int64",
        "Sales_Classification": "string"
    }

    total_rows = 0
    with engine.connect() as conn:
        with conn.begin():
            for chunk in pd.read_csv(str(CSV_BMW), usecols=cols_bmw, dtype=dtypes_bmw, chunksize=chunksize):
                chunk = chunk.where(pd.notnull(chunk), None)
                rows = chunk.to_dict(orient="records")
                if rows:
                    conn.execute(insert_bmw, rows)
                    total_rows += len(rows)
        print(f"Inserted {total_rows} rows into stg_bmw_sales.")


In [169]:
from sqlalchemy import text

UPSERT_DIMS_SQL = [
    # dim_year (cars)
    "INSERT INTO dbo.dim_year(year_key, [year]) "
    "SELECT DISTINCT CAST([year] AS INT), CAST([year] AS INT) "
    "FROM dbo.stg_cars_dataset s "
    "WHERE s.[year] IS NOT NULL "
    "  AND NOT EXISTS (SELECT 1 FROM dbo.dim_year d WHERE d.year_key = s.[year])",

    # dim_year (bmw)
    "INSERT INTO dbo.dim_year(year_key, [year]) "
    "SELECT DISTINCT CAST([Year] AS INT), CAST([Year] AS INT) "
    "FROM dbo.stg_bmw_sales s "
    "WHERE s.[Year] IS NOT NULL "
    "  AND NOT EXISTS (SELECT 1 FROM dbo.dim_year d WHERE d.year_key = s.[Year])",

    # dim_model from cars
    "INSERT INTO dbo.dim_model(make, model, fuel_type, transmission, engine_size_l) "
    "SELECT DISTINCT "
    "    ISNULL(NULLIF([Make],''),'UNKNOWN'), "
    "    ISNULL(NULLIF([model],''),'UNKNOWN'), "
    "    NULLIF([fuelType],''), "
    "    NULLIF([transmission],''), "
    "    CAST([engineSize] AS DECIMAL(4,2)) "
    "FROM dbo.stg_cars_dataset s "
    "WHERE ISNULL(NULLIF([Make],''),'UNKNOWN') IS NOT NULL "
    "  AND ISNULL(NULLIF([model],''),'UNKNOWN') IS NOT NULL "
    "  AND NOT EXISTS ( "
    "      SELECT 1 FROM dbo.dim_model d "
    "      WHERE d.make = ISNULL(NULLIF(s.[Make],''),'UNKNOWN') "
    "        AND d.model = ISNULL(NULLIF(s.[model],''),'UNKNOWN') "
    "        AND ISNULL(d.fuel_type,'') = ISNULL(NULLIF(s.[fuelType],''),'') "
    "        AND ISNULL(d.transmission,'') = ISNULL(NULLIF(s.[transmission],''),'') "
    "        AND ISNULL(d.engine_size_l,-1.0) = ISNULL(CAST(s.[engineSize] AS DECIMAL(4,2)),-1.0) "
    "  )",

    # dim_model from bmw (make=BMW)
    "INSERT INTO dbo.dim_model(make, model, fuel_type, transmission, engine_size_l) "
    "SELECT DISTINCT "
    "    'BMW' AS make, "
    "    ISNULL(NULLIF([Model],''),'UNKNOWN') AS model, "
    "    NULLIF([Fuel_Type],''), "
    "    NULLIF([Transmission],''), "
    "    CAST([Engine_Size_L] AS DECIMAL(4,2)) "
    "FROM dbo.stg_bmw_sales s "
    "WHERE ISNULL(NULLIF([Model],''),'UNKNOWN') IS NOT NULL "
    "  AND NOT EXISTS ( "
    "      SELECT 1 FROM dbo.dim_model d "
    "      WHERE d.make = 'BMW' "
    "        AND d.model = ISNULL(NULLIF(s.[Model],''),'UNKNOWN') "
    "        AND ISNULL(d.fuel_type,'') = ISNULL(NULLIF(s.[Fuel_Type],''),'') "
    "        AND ISNULL(d.transmission,'') = ISNULL(NULLIF(s.[Transmission],''),'') "
    "        AND ISNULL(d.engine_size_l,-1.0) = ISNULL(CAST(s.[Engine_Size_L] AS DECIMAL(4,2)),-1.0) "
    "  )",

    # dim_region
    "INSERT INTO dbo.dim_region(region) "
    "SELECT DISTINCT ISNULL(NULLIF([Region],''),'UNKNOWN') "
    "FROM dbo.stg_bmw_sales s "
    "WHERE ISNULL(NULLIF([Region],''),'UNKNOWN') IS NOT NULL "
    "  AND NOT EXISTS (SELECT 1 FROM dbo.dim_region d WHERE d.region = ISNULL(NULLIF(s.[Region],''),'UNKNOWN'))",

    # dim_color
    "INSERT INTO dbo.dim_color(color) "
    "SELECT DISTINCT ISNULL(NULLIF([Color],''),'UNKNOWN') "
    "FROM dbo.stg_bmw_sales s "
    "WHERE ISNULL(NULLIF([Color],''),'UNKNOWN') IS NOT NULL "
    "  AND NOT EXISTS (SELECT 1 FROM dbo.dim_color d WHERE d.color = ISNULL(NULLIF(s.[Color],''),'UNKNOWN'))",
]

def upsert_dimensions():
    with engine.begin() as conn:
        for stmt in UPSERT_DIMS_SQL:
            conn.execute(text(stmt))
    print("Dimensions upserted.")

# ne pas ex√©cuter automatiquement
# appel contr√¥l√© depuis l'orchestration


In [170]:
# Insertions dans les tables de faits
from sqlalchemy import text

def insert_facts():
    with engine.begin() as conn:
        # fact_listings
        stmt_listings = (
            "INSERT INTO dbo.fact_listings "
            "(year_key, model_key, transmission, mileage, fuel_type, tax, mpg, engine_size_l, price, currency) "
            "SELECT "
            "    CAST(s.[year] AS INT) AS year_key, "
            "    d.model_key, "
            "    s.transmission, "
            "    s.mileage, "
            "    s.fuelType, "
            "    s.tax, "
            "    s.mpg, "
            "    CAST(s.engineSize AS DECIMAL(4,2)) AS engine_size_l, "
            "    s.price, "
            "    'USD' AS currency "
            "FROM dbo.stg_cars_dataset s "
            "JOIN dbo.dim_model d "
            "  ON d.make = ISNULL(NULLIF(s.[Make],''),'UNKNOWN') "
            " AND d.model = ISNULL(NULLIF(s.[model],''),'UNKNOWN') "
            " AND ISNULL(d.fuel_type,'') = ISNULL(NULLIF(s.[fuelType],''),'') "
            " AND ISNULL(d.transmission,'') = ISNULL(NULLIF(s.[transmission],''),'') "
            " AND ISNULL(d.engine_size_l,-1.0) = ISNULL(CAST(s.[engineSize] AS DECIMAL(4,2)),-1.0) "
            "JOIN dbo.dim_year y ON y.year_key = CAST(s.[year] AS INT)"
        )
        conn.execute(text(stmt_listings))
        print("Inserted fact_listings.")

        # fact_sales
        stmt_sales = (
            "INSERT INTO dbo.fact_sales "
            "(year_key, model_key, region_key, color_key, fuel_type, transmission, engine_size_l, mileage_km, price_usd, sales_volume, sales_classification) "
            "SELECT "
            "    CAST(s.[Year] AS INT) AS year_key, "
            "    d.model_key, "
            "    r.region_key, "
            "    c.color_key, "
            "    s.[Fuel_Type], "
            "    s.[Transmission], "
            "    CAST(s.[Engine_Size_L] AS DECIMAL(4,2)) AS engine_size_l, "
            "    s.[Mileage_KM], "
            "    s.[Price_USD], "
            "    s.[Sales_Volume], "
            "    s.[Sales_Classification] "
            "FROM dbo.stg_bmw_sales s "
            "LEFT JOIN dbo.dim_region r ON r.region = ISNULL(NULLIF(s.[Region],''),'UNKNOWN') "
            "LEFT JOIN dbo.dim_color  c ON c.color  = ISNULL(NULLIF(s.[Color],''),'UNKNOWN') "
            "JOIN dbo.dim_model d "
            "  ON d.make = 'BMW' "
            " AND d.model = ISNULL(NULLIF(s.[Model],''),'UNKNOWN') "
            " AND ISNULL(d.fuel_type,'') = ISNULL(NULLIF(s.[Fuel_Type],''),'') "
            " AND ISNULL(d.transmission,'') = ISNULL(NULLIF(s.[Transmission],''),'') "
            " AND ISNULL(d.engine_size_l,-1.0) = ISNULL(CAST(s.[Engine_Size_L] AS DECIMAL(4,2)),-1.0) "
            "JOIN dbo.dim_year y ON y.year_key = CAST(s.[Year] AS INT)"
        )
        conn.execute(text(stmt_sales))
        print("Inserted fact_sales.")

# ne pas ex√©cuter automatiquement
# appel contr√¥l√© depuis l'orchestration


In [194]:
# Orchestration (contr√¥l√©e)
print("Starting ETL orchestration (controlled).")
print("CSV CARS:", CSV_CARS)
print("CSV BMW :", CSV_BMW)

# Pour ex√©cuter le pipeline complet, d√©finis RUN_ETL = True ci-dessous et r√©ex√©cute cette cellule.
RUN_ETL = True  # <-- change to True after vous √™tes s√ªr(e) que la connexion et les fichiers sont OK

if RUN_ETL:
    print("RUN_ETL=True -> executing ETL steps")
    # Attention : ces fonctions effectuent des op√©rations destructrices (TRUNCATE/INSERT)
    truncate_staging()
    load_csv_to_staging()
    upsert_dimensions()
    insert_facts()
    print("ETL completed successfully ‚úÖ")
else:
    print("RUN_ETL is False. The ETL was NOT executed. Set RUN_ETL = True to run the full pipeline.")


Starting ETL orchestration (controlled).
CSV CARS: C:\Users\tchom\Desktop\project-web-data\bmw-uk-market-analysis\data\cars_dataset.csv
CSV BMW : C:\Users\tchom\Desktop\project-web-data\bmw-uk-market-analysis\data\BMW sales data (2010-2024).csv
RUN_ETL=True -> executing ETL steps
Staging tables truncated (TRUNCATE applied).
Loading cars_dataset.csv -> stg_cars_dataset ...
Inserted 72435 rows into stg_cars_dataset.
Loading BMW sales CSV -> stg_bmw_sales ...
Inserted 50000 rows into stg_bmw_sales.
Dimensions upserted.
Inserted fact_listings.
Inserted fact_sales.
ETL completed successfully ‚úÖ
