## **Objetive**

#### **objective**: `data.parquet`

- Historical dataset covering the period from 2018-07-01 to 2026-02-08  
- Source format: CSV files (mostly one-year batches)  
- Each batch: ~1.8M rows Ã— 39 columns  
- Goal:
  - Apply only minimal and necessary preprocessing  
  - Stack all yearly batches into a single unified dataset  
  - Export the consolidated result as `data.parquet`

## **Loading and Settings**

In [4]:
# Imports & Configuration
from pathlib import Path
import pandas as pd
import duckdb as db

con = db.connect()
DATA_DIR = Path('data/raw')
OUTPUT_FILE = Path('data/processed/stack.csv')

print(f'[CONFIG] DATA_DIR={DATA_DIR} | OUTPUT_FILE={OUTPUT_FILE}')

[CONFIG] DATA_DIR=data\raw | OUTPUT_FILE=data\processed\stack.csv


In [5]:
# Filesystem preparation

def ensure_output_dir(path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)


def list_csv_files(data_dir: Path) -> list[Path]:
    files = sorted(data_dir.glob('*.csv'))
    assert files, 'No CSV files found'
    return files


ensure_output_dir(OUTPUT_FILE)
files = list_csv_files(DATA_DIR)


for file in files:
    print(file)         

print(f'Total: {len(files)} CSV files found')

data\raw\20 and under 2004970 rows.csv
data\raw\21-19 1940164 rows.csv
data\raw\21-20 2018176 rows.csv
data\raw\22-21 1821488 rows.csv
data\raw\23-22 1772092 rows.csv
data\raw\24-23 1818726 rows.csv
data\raw\25-24 1884950 rows.csv
data\raw\26-25 1980801 rows.csv
Total: 8 CSV files found


In [6]:
# Scan each file schema
def scan_file(con: db.DuckDBPyConnection, file: Path):
    df = con.execute(f"""
        SELECT *
        FROM read_csv_auto('{file}', sample_size=-1)
    """).df()
    return set(df.columns), len(df)

schemas = []
row_counts = {}

for i, file in enumerate(files, start=1):
    cols, n_rows = scan_file(con, file)
    schemas.append(cols)
    row_counts[file.name] = n_rows

    print(
        f'[SCAN {i}/{len(files)}]'
        f'{file.name} | rows={n_rows} | cols={len(cols)}'
    )

print(f'[SCAN] Completed. Files scanned: {len(schemas)}')

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[SCAN 1/8]20 and under 2004970 rows.csv | rows=2004970 | cols=39


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[SCAN 2/8]21-19 1940164 rows.csv | rows=1940164 | cols=39


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[SCAN 3/8]21-20 2018176 rows.csv | rows=2018176 | cols=39


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[SCAN 4/8]22-21 1821488 rows.csv | rows=1821488 | cols=39


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[SCAN 5/8]23-22 1772092 rows.csv | rows=1772092 | cols=39


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[SCAN 6/8]24-23 1818726 rows.csv | rows=1818726 | cols=39


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[SCAN 7/8]25-24 1884950 rows.csv | rows=1884950 | cols=39


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[SCAN 8/8]26-25 1980801 rows.csv | rows=1980801 | cols=39
[SCAN] Completed. Files scanned: 8


In [7]:
# Consolidate CSV files into a single stack.csv using common columns

def consolidate_csvs(files: list[Path], schemas: list[set], con, output: Path, row_counts: dict):

    # determine common columns
    common_fields = sorted(set.intersection(*schemas))
    assert common_fields, "No common columns across files"

    # build UNION ALL query
    union_query = " UNION ALL ".join(
        [
            f'SELECT {", ".join(common_fields)} '
            f'FROM read_csv_auto("{f}", SAMPLE_SIZE=-1)'
            for f in files
        ]
    )

    # export consolidated file
    con.execute(f"""
        COPY (
            {union_query}
        )
        TO '{output}'
        WITH (HEADER, DELIMITER ',');
    """)

    

    # sanity check
    total_rows = sum(row_counts.values())

    print(f"[EXPORT] file={output}")
    print(f"[EXPORT] rows={total_rows} | cols={len(common_fields)}")


consolidate_csvs(files, schemas, con, OUTPUT_FILE, row_counts)

# initial stack in csv -> read_csv_auto will be used later to automatically read types

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[EXPORT] file=data\processed\stack.csv
[EXPORT] rows=15241367 | cols=39


## **Preprocessing**

In [8]:
# converting high missingness % columns to VARCHAR (preventing possible coalescing from read_csv_auto)
varchar_cols = {
    'LEGACY_SR_NUMBER': 'VARCHAR',
    'SANITATION_DIVISION_DAYS': 'VARCHAR',
    'PARENT_SR_NUMBER': 'VARCHAR',
    'ELECTRICAL_DISTRICT': 'VARCHAR',
    'CREATED_DEPARTMENT': 'VARCHAR',
    'CITY': 'VARCHAR',
    'STATE': 'VARCHAR',
    'ELECTRICITY_GRID': 'VARCHAR',
    'ZIP_CODE': 'VARCHAR'
}

df = con.execute(f"""
    SELECT *
    FROM read_csv_auto(
        '{OUTPUT_FILE}',
        sample_size=-1,
        timestampformat='%Y-%m-%d %H:%M:%S',
        types={varchar_cols})
""").df()


# (df.isna().mean().sort_values(ascending=False).head(25) * 100).round(decimals=2)

con.register("df_view", df)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<_duckdb.DuckDBPyConnection at 0x24a42117130>

In [9]:
con.execute("""
    COPY df_view
    TO 'data/processed/data.parquet'
    (FORMAT PARQUET)
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<_duckdb.DuckDBPyConnection at 0x24a42117130>

In [18]:
max_min = con.execute("""
    SELECT
        MAX(CREATED_DATE) AS max_date,
        MIN(CREATED_DATE) AS min_date
    FROM read_parquet('data/processed/data.parquet')
""").fetchone()

print(max_min[0].strftime("%Y-%m-%d"))
print(max_min[1].strftime("%Y-%m-%d"))

2026-02-08
2018-07-01
