# Week 5 — Data Quality & Data Pipeline (Subte Turnstile Data)

This notebook consolidates and cleans the Buenos Aires subway (Subte) turnstile
(`molinetes`) data.

**Objectives**

- Load all raw turnstile CSV files from `data/raw/molinetes/`
- Concatenate them into a single unified dataset
- Apply reusable data quality checks from `utils/data_quality.py`
- Standardize column names and run basic cleaning
- Validate schema, row count and basic constraints
- Save a clean version of the dataset to `data/processed/`
- Produce a short, human-readable data quality summary


In [10]:
import sys
from pathlib import Path

import pandas as pd

# Ensure project root is on sys.path so we can import `utils`
PROJECT_ROOT = Path("..").resolve()
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

from utils.data_quality import (
    DataQualityConfig,
    run_data_quality_pipeline,
    standardize_column_names,
)

DATA_RAW_DIR = PROJECT_ROOT / "data" / "raw"
DATA_PROCESSED_DIR = PROJECT_ROOT / "data" / "processed"

DATA_RAW_DIR, DATA_PROCESSED_DIR


(WindowsPath('E:/Proyectos/Proyectos GitHub/urban-intelligence-lab/data/raw'),
 WindowsPath('E:/Proyectos/Proyectos GitHub/urban-intelligence-lab/data/processed'))

In [11]:
MOLINETES_DIR = DATA_RAW_DIR / "molinetes"

print("PROJECT_ROOT:", PROJECT_ROOT)
print("MOLINETES_DIR exists?:", MOLINETES_DIR.exists())

if MOLINETES_DIR.exists():
    files = sorted(MOLINETES_DIR.glob("*.csv"))
    print(f"Found {len(files)} CSV files in data/raw/molinetes:")
    for f in files:
        print(" -", f.name)
else:
    files = []
    print("⚠️ molinetes directory does not exist")


PROJECT_ROOT: E:\Proyectos\Proyectos GitHub\urban-intelligence-lab
MOLINETES_DIR exists?: True
Found 24 CSV files in data/raw/molinetes:
 - 202401_PAX15min-ABC.csv
 - 202401_PAX15min-DEH.csv
 - 202402_PAX15min-ABC.csv
 - 202402_PAX15min-DEH.csv
 - 202403_PAX15min-ABC.csv
 - 202403_PAX15min-DEH.csv
 - 202404_PAX15min-ABC.csv
 - 202404_PAX15min-DEH.csv
 - 202405_PAX15min-ABC.csv
 - 202405_PAX15min-DEH.csv
 - 202406_PAX15min-ABC.csv
 - 202406_PAX15min-DEH.csv
 - 202407_PAX15min-ABC.csv
 - 202407_PAX15min-DEH.csv
 - 202408_PAX15min-ABC.csv
 - 202408_PAX15min-DEH.csv
 - 202409_PAX15min-ABC.csv
 - 202409_PAX15min-DEH.csv
 - 202410_PAX15min-ABC.csv
 - 202410_PAX15min-DEH.csv
 - 202411_PAX15min-ABC.csv
 - 202411_PAX15min-DEH.csv
 - 202412_PAX15min-ABC-INCLUYEOTROMODOSDEPAGO.csv
 - 202412_PAX15min-DEH-INCLUYEOTROMODOSDEPAGO.csv


In [13]:
if not files:
    raise RuntimeError("No CSV files found in data/raw/molinetes/. Please check the data folder.")

dfs = []

for f in files:
    try:
        # First try UTF-8 (default)
        df = pd.read_csv(f)
        used_encoding = "utf-8"
    except UnicodeDecodeError:
        # Fallback to latin1 / cp1252 style encoding
        df = pd.read_csv(f, encoding="latin1")
        used_encoding = "latin1"

    print(f"Loaded {f.name} with encoding {used_encoding}, shape={df.shape}")
    dfs.append(df)

raw_df = pd.concat(dfs, ignore_index=True)

print("Combined shape:", raw_df.shape)
raw_df.head()


Loaded 202401_PAX15min-ABC.csv with encoding utf-8, shape=(587990, 1)
Loaded 202401_PAX15min-DEH.csv with encoding utf-8, shape=(290788, 1)
Loaded 202402_PAX15min-ABC.csv with encoding utf-8, shape=(551054, 1)
Loaded 202402_PAX15min-DEH.csv with encoding utf-8, shape=(236273, 1)
Loaded 202403_PAX15min-ABC.csv with encoding latin1, shape=(599344, 1)
Loaded 202403_PAX15min-DEH.csv with encoding latin1, shape=(413562, 1)
Loaded 202404_PAX15min-ABC.csv with encoding latin1, shape=(574669, 1)
Loaded 202404_PAX15min-DEH.csv with encoding latin1, shape=(459968, 1)
Loaded 202405_PAX15min-ABC.csv with encoding latin1, shape=(569524, 1)
Loaded 202405_PAX15min-DEH.csv with encoding latin1, shape=(449116, 1)
Loaded 202406_PAX15min-ABC.csv with encoding utf-8, shape=(549191, 1)
Loaded 202406_PAX15min-DEH.csv with encoding utf-8, shape=(426072, 1)
Loaded 202407_PAX15min-ABC.csv with encoding utf-8, shape=(567561, 1)
Loaded 202407_PAX15min-DEH.csv with encoding utf-8, shape=(437985, 1)
Loaded 202408_

Unnamed: 0,FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;pax_pagos;pax_pases_pagos;pax_franq;pax_TOTAL,FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;pax_pagos;pax_pases_pagos;pax_franq;pax_TOTAL;;
0,1/1/2024;07:45:00;08:00:00;LineaB;LineaB_Malab...,
1,1/1/2024;07:45:00;08:00:00;LineaB;LineaB_Trona...,
2,1/1/2024;07:45:00;08:00:00;LineaB;LineaB_Pelle...,
3,1/1/2024;07:45:00;08:00:00;LineaA;LineaA_Flore...,
4,1/1/2024;07:45:00;08:00:00;LineaB;LineaB_Dorre...,


In [14]:
raw_df.info()
raw_df.columns.tolist()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11440440 entries, 0 to 11440439
Data columns (total 2 columns):
 #   Column                                                                                     Dtype 
---  ------                                                                                     ----- 
 0   FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;pax_pagos;pax_pases_pagos;pax_franq;pax_TOTAL    object
 1   FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;pax_pagos;pax_pases_pagos;pax_franq;pax_TOTAL;;  object
dtypes: object(2)
memory usage: 174.6+ MB


['FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;pax_pagos;pax_pases_pagos;pax_franq;pax_TOTAL',
 'FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;pax_pagos;pax_pases_pagos;pax_franq;pax_TOTAL;;']

In [15]:
combined_raw_path = DATA_RAW_DIR / "molinetes_combined.csv"
raw_df.to_csv(combined_raw_path, index=False)

combined_raw_path


WindowsPath('E:/Proyectos/Proyectos GitHub/urban-intelligence-lab/data/raw/molinetes_combined.csv')

In [16]:
# Standardize column names the same way the pipeline will do it
df_for_config = standardize_column_names(raw_df)

standardized_columns = df_for_config.columns.tolist()
standardized_columns

# Basic heuristics to infer some rules.
# You can refine these rules after inspecting the dataset.

date_columns = [
    c for c in standardized_columns
    if "fecha" in c.lower() or "date" in c.lower()
]

numeric_columns = df_for_config.select_dtypes(include="number").columns.tolist()

non_null_columns = [
    c for c in standardized_columns
    if any(keyword in c.lower() for keyword in ["linea", "line", "station", "estacion", "fecha", "date"])
]

config = DataQualityConfig(
    name="subte_molinetes_ridership",
    expected_columns=standardized_columns,
    non_null_columns=non_null_columns,
    date_columns=date_columns,
    numeric_columns=numeric_columns,
    allowed_values={
        # You can manually add constraints here later, for example:
        # "linea": ["A", "B", "C", "D", "E", "H"],
    },
    value_ranges={
        # Example (you can refine after inspecting distributions):
        # "validations": (0, None),
    },
    unique_keys=[
        # Example of potential composite key, to be refined later:
        # ["linea", "estacion", "fecha"]
    ],
    min_rows=1000,  # expecting a reasonably large dataset
)

config


[2025-11-17 16:17:45] [INFO] utils.data_quality - Standardized column names: ['FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;pax_pagos;pax_pases_pagos;pax_franq;pax_TOTAL', 'FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;pax_pagos;pax_pases_pagos;pax_franq;pax_TOTAL;;'] -> ['fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total', 'fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total;;']


DataQualityConfig(name='subte_molinetes_ridership', expected_columns=['fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total', 'fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total;;'], non_null_columns=['fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total', 'fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total;;'], date_columns=['fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total', 'fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total;;'], numeric_columns=[], allowed_values={}, value_ranges={}, unique_keys=[], min_rows=1000)

In [17]:
result = run_data_quality_pipeline(
    raw_path=combined_raw_path,
    processed_dir=DATA_PROCESSED_DIR,
    config=config,
)

result.to_dict()


[2025-11-17 16:36:44] [INFO] utils.data_quality - Loading CSV file: E:\Proyectos\Proyectos GitHub\urban-intelligence-lab\data\raw\molinetes_combined.csv
  raise FileNotFoundError(f"File does not exist: {path}")
[2025-11-17 16:37:05] [INFO] utils.data_quality - Loaded 11440440 rows and 2 columns
[2025-11-17 16:37:05] [INFO] utils.data_quality - Running basic cleaning for dataset 'subte_molinetes_ridership'
[2025-11-17 16:37:06] [INFO] utils.data_quality - Standardized column names: ['FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;pax_pagos;pax_pases_pagos;pax_franq;pax_TOTAL', 'FECHA;DESDE;HASTA;LINEA;MOLINETE;ESTACION;pax_pagos;pax_pases_pagos;pax_franq;pax_TOTAL;;'] -> ['fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total', 'fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total;;']
[2025-11-17 16:37:17] [INFO] utils.data_quality - Dropped 3 duplicate rows
[2025-11-17 16:37:24] [INFO] utils.data_quality - Saving cleaned d

{'dataset_name': 'subte_molinetes_ridership',
 'n_rows_before': 11440440,
 'n_rows_after': 11440437,
 'n_columns': 2,
 'issues': ["Column 'fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total' has 1672032 null values",
  "Column 'fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total;;' has 9768405 null values"],
 'anomaly_columns': [],
 'is_acceptable': False}

In [18]:
clean_csv_path = DATA_PROCESSED_DIR / "subte_molinetes_ridership_clean.csv"
clean_df = pd.read_csv(clean_csv_path)

print(clean_df.shape)
clean_df.head()


  clean_df = pd.read_csv(clean_csv_path)


(11440437, 2)


Unnamed: 0,fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total,fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total;;
0,1/1/2024;07:45:00;08:00:00;LineaB;LineaB_Malab...,
1,1/1/2024;07:45:00;08:00:00;LineaB;LineaB_Trona...,
2,1/1/2024;07:45:00;08:00:00;LineaB;LineaB_Pelle...,
3,1/1/2024;07:45:00;08:00:00;LineaA;LineaA_Flore...,
4,1/1/2024;07:45:00;08:00:00;LineaB;LineaB_Dorre...,


In [19]:
summary_lines = [
    f"Dataset name: {result.dataset_name}",
    f"Rows before cleaning: {result.n_rows_before}",
    f"Rows after cleaning: {result.n_rows_after}",
    f"Number of columns: {result.n_columns}",
    f"Issues found: {len(result.issues)}",
]

if result.issues:
    summary_lines.append("Main issues:")
    for issue in result.issues:
        summary_lines.append(f"- {issue}")

if result.anomaly_columns:
    summary_lines.append(
        f"Potential numeric anomalies detected in: {', '.join(result.anomaly_columns)}"
    )

print("\n".join(summary_lines))


Dataset name: subte_molinetes_ridership
Rows before cleaning: 11440440
Rows after cleaning: 11440437
Number of columns: 2
Issues found: 2
Main issues:
- Column 'fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total' has 1672032 null values
- Column 'fecha;desde;hasta;linea;molinete;estacion;pax_pagos;pax_pases_pagos;pax_franq;pax_total;;' has 9768405 null values
