In [1]:
from pathlib import Path
import polars as pl

def processing_time_series_data(csv_filename):
    # Paths
    csv_file_base = Path("../data/unzipped/")
    parquet_file_base = Path("../data/processed/")
    parquet_filename = csv_filename.split(".")[0] + '.parquet'

    # Enforce schema for consistent results
    schema = {
        "ride_id": pl.Utf8,
        "rideable_type": pl.Utf8,
        "started_at": pl.Utf8, 
        "ended_at": pl.Utf8,
        "start_station_name": pl.Utf8,
        "start_station_id": pl.Utf8,
        "end_station_name": pl.Utf8,
        "end_station_id": pl.Utf8,
        "start_lat": pl.Utf8,
        "start_lng": pl.Utf8,
        "end_lat": pl.Utf8,
        "end_lng": pl.Utf8,
        "member_casual": pl.Utf8
    }
    
    columns = [
        "ride_id", "rideable_type",
        "started_at", "ended_at",
        "start_station_name", "start_station_id", 
        "end_station_name", "end_station_id", 
        "start_lat", "start_lng", "end_lat", "end_lng",
        "member_casual"
    ]
    
    # Process the data
    df = (
        pl.read_csv(csv_file_base / csv_filename, columns=columns, schema=schema, truncate_ragged_lines=True)
        .with_columns(
            pl.col("started_at").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S%.f"),
            pl.col("ended_at").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S%.f")
        )
        .with_columns(
            pl.col("started_at").dt.hour().alias("hour"),
            pl.col("started_at").dt.date().alias("ride_date")
        )
        .group_by(["ride_date", "hour"])
        .agg(
            pl.col("ride_id").n_unique().alias("unique_rides")
        )
        .sort(["ride_date", "hour"])
    )

    # Export to parquet
    df.write_parquet(parquet_file_base / parquet_filename, compression='gzip')

In [2]:
# Get all CSV filenames as strings
filenames = [f.name for f in Path("../data/unzipped/").iterdir() if f.is_file()]

# Process each CSV file
for filename in filenames:
    print(f"Processing {filename}")
    processing_time_series_data(filename)
    print("\tDONE")

Processing 202301-citibike-tripdata_202301-citibike-tripdata_1.csv
	DONE
Processing 202301-citibike-tripdata_202301-citibike-tripdata_2.csv
	DONE
Processing 202302-citibike-tripdata_202302-citibike-tripdata_1.csv
	DONE
Processing 202302-citibike-tripdata_202302-citibike-tripdata_2.csv
	DONE
Processing 202303-citibike-tripdata_202303-citibike-tripdata_1.csv
	DONE
Processing 202303-citibike-tripdata_202303-citibike-tripdata_2.csv
	DONE
Processing 202303-citibike-tripdata_202303-citibike-tripdata_3.csv
	DONE
Processing 202304-citibike-tripdata_202304-citibike-tripdata_1.csv
	DONE
Processing 202304-citibike-tripdata_202304-citibike-tripdata_2.csv
	DONE
Processing 202304-citibike-tripdata_202304-citibike-tripdata_3.csv
	DONE
Processing 202305-citibike-tripdata_202305-citibike-tripdata_1.csv
	DONE
Processing 202305-citibike-tripdata_202305-citibike-tripdata_2.csv
	DONE
Processing 202305-citibike-tripdata_202305-citibike-tripdata_3.csv
	DONE
Processing 202305-citibike-tripdata_202305-citibike