# Upload data to DB

In [33]:

# --- CONFIG ---
#############################################
EXCEL_PATH = "data/fz10_2024_12_cleaned.xlsx"  # <-------------------------
#############################################
# --- DB_CONNECTOR---
import os
from urllib.parse import quote_plus

from sqlalchemy import create_engine, text

TABLE = "kba_registrations"  # Database table name
DB_HOST = "mysql-lab-innsbruck.mysql.database.azure.com"
DB_NAME = "kba_lab"
DB_USER = "sqladmin"
pwd = os.environ.get("DB_PASSWORD")

if not pwd:
    raise ValueError("Set DB_PASSWORD env var")
engine = create_engine(f"mysql+mysqlconnector://{DB_USER}:{quote_plus(pwd)}@{DB_HOST}/{DB_NAME}")
with engine.connect() as conn:
    print("Connected to:", conn.execute(text("SELECT DATABASE()")).scalar())


Connected to: kba_lab


In [34]:
import pandas as pd, re


def norm(s: str) -> str:
    s = re.sub(r"\s+", "_", str(s).strip())
    s = s.replace("%", "pct").replace("/", "_").replace("-", "_")
    s = re.sub(r"[^0-9a-zA-Z_]", "", s)
    s = re.sub(r"_+", "_", s)
    return s.lower().strip("_")


expected = [
    "brand", "model", "date", "total", "diesel_engine",
    "hybrid_incl_plugin", "petrol_hybrid_incl_plugin", "diesel_hybrid_incl_plugin",
    "hybrid_excl_plugin", "petrol_hybrid_excl_plugin", "diesel_hybrid_excl_plugin",
    "plugin_hybrid", "petrol_plugin_hybrid", "diesel_plugin_hybrid",
    "electric_bev", "all_wheel_drive", "convertibles"
]

df = pd.read_excel(EXCEL_PATH)
df.columns = [norm(c) for c in df.columns]

missing = [c for c in expected if c not in df.columns]
extra = [c for c in df.columns if c not in expected]
print("Missing:", missing)
print("Extra:", extra)

# оставляем только нужные колонки в нужном порядке
df = df[[c for c in expected if c in df.columns]]

# дата -> DATE, NaN -> None
if "date" in df.columns:
    df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.date
df = df.where(pd.notnull(df), None)

# Replace null models with brand
df['model'] = df['model'].fillna(df['brand'])

print("Rows to process:", len(df))

df.head(10)

Missing: []
Extra: []
Rows to process: 416


Unnamed: 0,brand,model,date,total,diesel_engine,hybrid_incl_plugin,petrol_hybrid_incl_plugin,diesel_hybrid_incl_plugin,hybrid_excl_plugin,petrol_hybrid_excl_plugin,diesel_hybrid_excl_plugin,plugin_hybrid,petrol_plugin_hybrid,diesel_plugin_hybrid,electric_bev,all_wheel_drive,convertibles
0,AIWAYS,U5,2024-12-01,,,,,,,,,,,,,,
1,AIWAYS,SONSTIGE,2024-12-01,,,,,,,,,,,,,,
2,ALFA ROMEO,GIULIA,2024-12-01,81.0,11.0,,,,,,,,,,,54.0,
3,ALFA ROMEO,JUNIOR,2024-12-01,114.0,,106.0,106.0,,106.0,106.0,,,,,8.0,,
4,ALFA ROMEO,STELVIO,2024-12-01,127.0,39.0,,,,,,,,,,,127.0,
5,ALFA ROMEO,TONALE,2024-12-01,129.0,19.0,110.0,110.0,,92.0,92.0,,18.0,18.0,,,18.0,
6,ALPINE,A110,2024-12-01,33.0,,,,,,,,,,,,,
7,ALPINE,SONSTIGE,2024-12-01,12.0,,,,,,,,,,,,,
8,ASTON MARTIN,DBX,2024-12-01,4.0,,,,,,,,,,,,4.0,
9,ASTON MARTIN,V8,2024-12-01,,,,,,,,,,,,,,


In [35]:
from sqlalchemy import text

with engine.begin() as conn:
    conn.execute(text("""
        ALTER TABLE kba_registrations
          MODIFY brand VARCHAR(100) NOT NULL,
          MODIFY model VARCHAR(100) NOT NULL,
          MODIFY date  DATE NOT NULL
    """))
    exists = conn.execute(text("""
        SELECT COUNT(*) FROM information_schema.statistics
        WHERE table_schema = DATABASE()
          AND table_name = 'kba_registrations'
          AND index_name = 'uniq_brand_model_date'
    """)).scalar()
    if not exists:
        conn.execute(text("""
            ALTER TABLE kba_registrations
              ADD UNIQUE KEY uniq_brand_model_date (brand, model, date)
        """))

# stage
stage = "kba_stage"

with engine.begin() as conn:
    df.to_sql(stage, con=conn, if_exists="replace", index=False, method="multi", chunksize=5000)
    keys = ["brand", "model", "date"]
    nonkeys = [c for c in df.columns if c not in keys]

    cols_csv = ", ".join(f"`{c}`" for c in df.columns)
    select_csv = ", ".join(f"s.`{c}`" for c in df.columns)
    update_sets = ", ".join(f"`{c}` = VALUES(`{c}`)" for c in nonkeys)

    before = conn.execute(text(f"SELECT COUNT(*) FROM {TABLE}")).scalar()

    upsert_sql = f"""
    INSERT INTO {TABLE} ({cols_csv})
    SELECT {select_csv}
    FROM {stage} s
    ON DUPLICATE KEY UPDATE
    {update_sets};
    """
    conn.execute(text(upsert_sql))

    conn.execute(text(f"DROP TABLE IF EXISTS {stage}"))
    after = conn.execute(text(f"SELECT COUNT(*) FROM {TABLE}")).scalar()

print(f"✅ Upsert complete. rows_before={before}, processed={len(df)}, rows_after={after}")


✅ Upsert complete. rows_before=8523, processed=416, rows_after=8939
