In [1]:
import dask.dataframe as dd
import pandas as pd
from sqlalchemy import create_engine

# --- Configuration ---
POSTGRES_URL = "postgresql+psycopg2://postgres:example@db:5432/raw_data"

# --- Load Data from PostgreSQL ---
engine = create_engine(POSTGRES_URL)

# Read tables into Dask DataFrames
tickets = dd.read_sql_table('raw_tickets', POSTGRES_URL, index_col='ticket_id', npartitions=10)
# sensors = dd.read_sql_table('raw_sensors', POSTGRES_URL, index_col='measurement_id', npartitions=10)
# trips = dd.read_sql_table('trips', POSTGRES_URL, index_col='trip_id', npartitions=10)
# traffic = dd.read_sql_table('traffic', POSTGRES_URL, index_col='trip_id', npartitions=10)
# weather = dd.read_sql_table('weather', POSTGRES_URL, index_col='measurement_id', npartitions=10)
# events = dd.read_sql_table('events', POSTGRES_URL, index_col='event_id', npartitions=10)

# --- Aggregations (similar to MLmodels.ipynb) ---

# Tickets: count passengers in per trip per day
tickets['timestamp'] = dd.to_datetime(tickets['timestamp'])
tickets['day'] = tickets['timestamp'].dt.date
agg_tickets = tickets.groupby(['trip_id', 'day']).agg({
    'fare': 'count',
    'school': 'max',
    'hospital': 'max',
    'peak_hour': 'max',
    'timestamp': 'max'
}).rename(columns={'fare': 'passengers_in'}).reset_index()

# Sensors: count passengers out per trip per day
sensors['timestamp'] = dd.to_datetime(sensors['timestamp'])
sensors['day'] = sensors['timestamp'].dt.date
agg_sensors = sensors.groupby(['trip_id', 'day']).agg({
    'status': 'count',
    'timestamp': 'max'
}).rename(columns={'status': 'passengers_out'}).reset_index()

# Merge tickets and sensors
merged = dd.merge(agg_tickets, agg_sensors, on=['trip_id', 'day', 'timestamp'], how='inner')

# Merge with trips
merged = dd.merge(merged, trips.reset_index(), on='trip_id', how='left')

# Merge with traffic
traffic['timestamp'] = dd.to_datetime(traffic['timestamp'])
merged = dd.merge(merged, traffic.reset_index(), on=['trip_id', 'timestamp'], how='left')

# Merge with weather (asof merge, so convert to pandas for this step)
merged_pd = merged.compute()
weather_pd = weather.compute()
weather_pd['hour'] = pd.to_datetime(weather_pd['hour'])
merged_pd['timestamp'] = pd.to_datetime(merged_pd['timestamp'])
merged_pd = pd.merge_asof(
    merged_pd.sort_values('timestamp'),
    weather_pd[['hour', 'temperature', 'precipitation_probability', 'weather_code', 'latitude', 'longitude']].sort_values('hour'),
    left_on='timestamp', right_on='hour', direction='backward'
)

# Merge with events (by date)
events_pd = events.compute()
events_pd['day_event'] = pd.to_datetime(events_pd['day_event']).dt.date
merged_pd['day'] = pd.to_datetime(merged_pd['day']).dt.date
final = pd.merge(events_pd, merged_pd, left_on='day_event', right_on='day', how='right')
final['event_dummy'] = final['day_event'].notna().astype(int)

# --- Save to PostgreSQL ---
final_dd = dd.from_pandas(final, npartitions=10)
final_dd.to_sql('feature_table', POSTGRES_URL, if_exists='replace', index=False)

print("Feature table created and saved to PostgreSQL.")

TypeError: Provided index column is of type "object".  If divisions is not provided the index column type must be numeric or datetime.