In [None]:
import pandas as pd
import numpy as np
import os
from zipfile import ZipFile
from pathlib import Path
import re
import io


In [None]:
mapping_columns = {
    "tripduration":"duration",
    "starttime": "start_time",
    "stoptime": "stop_time",
    "start station id": "start_station_id",
    "start station name": "start_station_name",
    "start station latitude": "start_station_latitude", 
    "start station longitude": "start_station_longitude",
    "end station id": "end_station_id",
    "end station name": "end_station_name",
    "end station latitude": "end_station_latitude",
    "end station longitude": "end_station_longitude",
    "bikeid": "bike_id",
    "usertype": "user_type",
    "birth year": "birth_year",
    "started_at": "start_time",
    "ended_at": "stop_time",
    "start_lat": "start_station_latitude",
    "start_lng": "start_station_longitude",
    "end_lat": "end_station_latitude",
    "end_lng": "end_station_longitude",
    "member_casual": "user_type"
}



In [None]:

def _normalize(col: str) -> str:
    return (
        col.strip()
           .lower()
           .replace("-", " ")
           .replace("_", " ")
           .replace("  ", " ")
           .strip()
    )

def _normalized_mapping(raw_map: dict) -> dict:
    """Make mapping robust to case/spacing/underscore/hyphen drift."""
    return {_normalize(k): v for k, v in raw_map.items()}

def _rename_and_coalesce(df: pd.DataFrame, norm_map: dict) -> pd.DataFrame:
    """
    - normalize incoming column names
    - apply mapping
    - if multiple original columns map into the same canonical name,
      coalesce them into one (first non-null)
    """
    df = df.copy()
    # 1) normalize incoming names to improve hit-rate of your mapping
    norm_cols = [_normalize(c) for c in df.columns]
    rename_plan = {}
    for orig, normed in zip(df.columns, norm_cols):
        if normed in norm_map:
            rename_plan[orig] = norm_map[normed]
        else:
            rename_plan[orig] = orig

    df = df.rename(columns=rename_plan)

    # 2) coalesce duplicates after rename (e.g., two columns both became 'start_station_latitude')
    dup_targets = df.columns[df.columns.duplicated(keep=False)].unique()
    for tgt in dup_targets:
        same_cols = [c for c in df.columns if c == tgt]
        if len(same_cols) > 1:
            # take first non-null across duplicates
            df[tgt] = df[same_cols].bfill(axis=1).iloc[:, 0]
            # drop the extra columns (keep one)
            for extra in same_cols[1:]:
                df.drop(columns=extra, inplace=True)

    return df



In [None]:

def parse_year_month(filename: str):
    # To find YYYY and MM in the filename; return (year, month) or (None, None)
    import re
    m = re.search(r"(19|20)\d{2}", filename)
    y = int(m.group()) if m else None
    m2 = re.search(r"(?:^|[^0-9])([01]?\d)(?:[^0-9]|$)", filename)
    mm = int(m2.group(1)) if m2 else None
    if mm is not None and not (1 <= mm <= 12):
        mm = None
    return y, mm

def read_one_zip_second(zip_path: str | Path) -> pd.DataFrame:
    """
    Open one ZIP, read every CSV inside, standardize columns with your mapping,
    coalesce duplicates, and return a single concatenated DataFrame.
    - Adds 'year' and 'month' columns from the CSV filename if present
    - Tries utf-8 first, then latin1 for odd files
    - Ensures union of columns across all inner CSVs (missing -> NaN)
    """
    zip_path = Path(zip_path)
    frames: list[pd.DataFrame] = []
    norm_map = _normalized_mapping(mapping_columns)

    with ZipFile(zip_path) as zf:
        csv_members = [m for m in zf.namelist() if m.lower().endswith(".csv")]
        if not csv_members:
            raise RuntimeError(f"No CSV files found in {zip_path.name}")

        for member in sorted(csv_members):
            year, month = parse_year_month(Path(member).name)

            with zf.open(member, "r") as fbin:
                data = fbin.read()
                for enc in ("utf-8", "latin1"):
                    try:
                        df = pd.read_csv(io.StringIO(data.decode(enc)), low_memory=False)
                        break
                    except UnicodeDecodeError:
                        continue
                else:
                    raise UnicodeDecodeError("decode", member, 0, 0, "utf-8 and latin1 failed")

            # add year/month if detected from filename
            if year is not None:
                df["year"] = year
            if month is not None:
                df["month"] = month

            # standardize columns and coalesce duplicates
            df = _rename_and_coalesce(df, norm_map)

            frames.append(df)

    # --- ensuring union schema so missing columns become NaN across months ---
    all_cols = []
    for df in frames:
        all_cols.extend(df.columns.tolist())
    union_cols = list(dict.fromkeys(all_cols))  # to preserve first-seen order

    aligned = [d.reindex(columns=union_cols) for d in frames]
    return pd.concat(aligned, ignore_index=True)


In [None]:

FOLDER = Path(".")  # current folder
zip_paths = sorted([p for p in FOLDER.glob("*.zip") if p.is_file()])

frames = []
for zp in zip_paths:
    # if "2023" in zp.name:
    #     break
    print("Reading:", zp.name)
    df_year = read_one_zip_second(zp)
    frames.append(df_year)

all_trips = pd.concat(frames, ignore_index=True)

In [None]:
# import pandas as pd
# from utils import ZipCsvStandardizer

# reader = ZipCsvStandardizer()
# df = reader.read_parquet_dir("./bluebikes_parquet")

# print(df.shape)
# df.head()

(28741314, 16)


Unnamed: 0,duration,start_time,stop_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,birth_year,gender,year
0,542,2015-01-01 00:21:44,2015-01-01 00:30:47,115,Porter Square Station,42.387997,-71.119087,96.0,Cambridge Main Library at Broadway / Trowbridg...,42.373379,-71.111076,277,Subscriber,1984.0,1,2015
1,438,2015-01-01 00:27:03,2015-01-01 00:34:21,80,MIT Stata Center at Vassar St / Main St,42.361961,-71.092056,95.0,Cambridge St - at Columbia St / Webster Ave,42.372971,-71.094444,648,Subscriber,1985.0,1,2015
2,254,2015-01-01 00:31:31,2015-01-01 00:35:46,91,One Kendall Square at Hampshire St / Portland St,42.366276,-71.09169,68.0,Central Square at Mass Ave / Essex St,42.36507,-71.103104,555,Subscriber,1974.0,1,2015
3,432,2015-01-01 00:53:46,2015-01-01 01:00:58,115,Porter Square Station,42.387997,-71.119087,96.0,Cambridge Main Library at Broadway / Trowbridg...,42.373379,-71.111076,1307,Subscriber,1987.0,1,2015
4,735,2015-01-01 01:07:06,2015-01-01 01:19:21,105,Lower Cambridgeport at Magazine St/Riverside Rd,42.356953,-71.113686,88.0,Inman Square at Vellucci Plaza / Hampshire St,42.374035,-71.101425,177,Customer,1986.0,2,2015
