# Step 2 â€“ Ingestion (Bronze) into DuckDB

Goal:
- Load the raw CSV into DuckDB as a `raw_registrations` table
- Keep data as-is (no business transformations yet)
- Create a persistent DuckDB database file for the whole project


2) Imports + paths

In [10]:
import pandas as pd
import duckdb
from pathlib import Path

PROJECT_ROOT = Path("..")
RAW_PATH = PROJECT_ROOT / "data" / "raw"
DB_PATH = PROJECT_ROOT / "data" / "duckdb" / "motorcycle.db"

csv_files = list(RAW_PATH.glob("*.csv"))
CSV_FILE = csv_files[0]

CSV_FILE, DB_PATH


(PosixPath('../data/raw/SP_Hersteller_Handelsnamen_Krad_f73ec_-2904598668132282746.csv'),
 PosixPath('../data/duckdb/motorcycle.db'))

3) Quick check: inspect Report_date format

In [11]:
df_sample = pd.read_csv(CSV_FILE, nrows=20)
df_sample["Report_date"].head(10).tolist()


['01.01.2023',
 '01.01.2023',
 '01.01.2023',
 '01.01.2023',
 '01.01.2023',
 '01.01.2023',
 '01.01.2023',
 '01.01.2023',
 '01.01.2023',
 '01.01.2023']

4) Create DuckDB DB + load raw table

In [12]:
con = duckdb.connect(str(DB_PATH))

# Drop & recreate for repeatability during development
con.execute("DROP TABLE IF EXISTS raw_registrations")

con.execute(f"""
CREATE TABLE raw_registrations AS
SELECT * FROM read_csv_auto('{CSV_FILE.as_posix()}');
""")

con.execute("SELECT COUNT(*) FROM raw_registrations").fetchone()


(128719,)

5) Confirm schema & preview

In [13]:
con.execute("DESCRIBE raw_registrations").fetchdf()


Unnamed: 0,column_name,column_type,null,key,default,extra
0,Report_date,DATE,YES,,,
1,Manufacturer,VARCHAR,YES,,,
2,Trade_name,VARCHAR,YES,,,
3,Type_key,VARCHAR,YES,,,
4,State,VARCHAR,YES,,,
5,Count,BIGINT,YES,,,
6,ZS Anzahl,VARCHAR,YES,,,
7,Object_Id,BIGINT,YES,,,


In [14]:
con.execute("SELECT * FROM raw_registrations LIMIT 5").fetchdf()


Unnamed: 0,Report_date,Manufacturer,Trade_name,Type_key,State,Count,ZS Anzahl,Object_Id
0,2023-01-01,AEON MOTOR (RC),,AAB,Schleswig-Holstein,7,,1
1,2023-01-01,AEON MOTOR (RC),,AAB,Niedersachsen,23,,2
2,2023-01-01,AEON MOTOR (RC),,AAB,Bremen,1,,3
3,2023-01-01,AEON MOTOR (RC),,AAB,Nordrhein-Westfalen,6,,4
4,2023-01-01,AEON MOTOR (RC),,AAB,Hessen,12,,5


6) Basic quality checks

In [15]:
# Null counts per column (quick)
con.execute("""
SELECT
  SUM(CASE WHEN Report_date IS NULL THEN 1 ELSE 0 END) AS null_report_date,
  SUM(CASE WHEN Manufacturer IS NULL THEN 1 ELSE 0 END) AS null_manufacturer,
  SUM(CASE WHEN State IS NULL THEN 1 ELSE 0 END) AS null_state,
  SUM(CASE WHEN Count IS NULL THEN 1 ELSE 0 END) AS null_count
FROM raw_registrations
""").fetchdf()


Unnamed: 0,null_report_date,null_manufacturer,null_state,null_count
0,0.0,0.0,0.0,0.0


In [16]:
# Top rows by Count to spot weird values
con.execute("""
SELECT Report_date, State, Manufacturer, Trade_name, Type_key, Count, "ZS Anzahl", Object_Id
FROM raw_registrations
ORDER BY Count DESC
LIMIT 10
""").fetchdf()


Unnamed: 0,Report_date,State,Manufacturer,Trade_name,Type_key,Count,ZS Anzahl,Object_Id
0,2025-01-01,Bayern,SONSTIGE HERSTELLER,,,56232,,116809
1,2024-01-01,Bayern,SONSTIGE HERSTELLER,,,55688,,73851
2,2023-01-01,Bayern,SONSTIGE HERSTELLER,,,54292,,32626
3,2025-01-01,Nordrhein-Westfalen,SONSTIGE HERSTELLER,,,41712,,116805
4,2024-01-01,Nordrhein-Westfalen,SONSTIGE HERSTELLER,,,41559,,73835
5,2025-01-01,Bayern,PIAGGIO (I),SONSTIGE/NICHT GETYPT,,40229,,111285
6,2023-01-01,Nordrhein-Westfalen,YAMAHA (J),SONSTIGE/NICHT GETYPT,,40171,,36867
7,2023-01-01,Nordrhein-Westfalen,SONSTIGE HERSTELLER,,,40053,,32614
8,2024-01-01,Bayern,PIAGGIO (I),SONSTIGE/NICHT GETYPT,,39950,,71444
9,2023-01-01,Bayern,PIAGGIO (I),SONSTIGE/NICHT GETYPT,,39524,,28778


In [17]:
# Check if Object_Id is unique (useful for dedupe)
con.execute("""
SELECT
  COUNT(*) AS rows,
  COUNT(DISTINCT Object_Id) AS distinct_object_id
FROM raw_registrations
""").fetchdf()


Unnamed: 0,rows,distinct_object_id
0,128719,128719


In [9]:
con.close()

In [19]:
con.execute("SHOW TABLES").fetchdf()


Unnamed: 0,name
0,dim_date
1,dim_date_keyed
2,dim_manufacturer
3,dim_manufacturer_keyed
4,dim_model
5,dim_model_keyed
6,dim_state
7,dim_state_keyed
8,fct_registrations
9,mart_market_share_monthly


In [21]:
con.execute("""
SELECT Report_date, COUNT(*) AS rows
FROM silver_registrations_clean
GROUP BY Report_date
ORDER BY Report_date
""").fetchdf()


CatalogException: Catalog Error: Table with name silver_registrations_clean does not exist!
Did you mean "stg_registrations_clean"?

LINE 3: FROM silver_registrations_clean
             ^