In [55]:
import pandas as pd
import glob
import numpy as np
import gc

taxi_trips = pd.read_parquet('random_taxi_trips_sample.parquet')
taxi_trips['start_datetime'] = pd.to_datetime(taxi_trips['start_datetime'])
taxi_trips['end_datetime'] = pd.to_datetime(taxi_trips['end_datetime'])

spec_events = pd.read_csv('events_december_2012.csv')
spec_events['start_datetime'] = pd.to_datetime(spec_events['start_datetime'])
spec_events['end_datetime'] = pd.to_datetime(spec_events['end_datetime'])
spec_event_types = [None] * len(taxi_trips) 
taxi_trips['Event Type'] = spec_event_types

Merges special events with taxi trips dataset for datetime window overlaps/intersections and coordinates within +/- a 500m threshold distance/proximity to taxi trip coordinates. 

In [None]:
def process_in_batches(spec_events, taxi_trips, batch_size=1000):
    results = [] 
    num_batches = int(np.ceil(len(taxi_trips) / batch_size))
    
    for i in range(num_batches):
        # Process a managable number of rows to prevent RAM error from Cartesian product 
        batch = taxi_trips.iloc[i * batch_size:(i + 1) * batch_size]
        batch['key'] = 0
        temp_spec_events = spec_events.copy()
        temp_spec_events['key'] = 0
        combined = pd.merge(temp_spec_events, batch, on='key').drop('key', axis=1)
        
        # Keep even data instance if time intervals of trip and event overlap or intersect
        combined = combined[(np.maximum(combined['start_datetime_x'], combined['start_datetime_y']) <= 
                             np.minimum(combined['end_datetime_x'], combined['end_datetime_y']))]

        # Record distance between taxi coordinates and event coordinates
        combined['lat_diff'] = np.abs(combined['Latitude'] - combined['PU_lat'])
        combined['long_diff'] = np.abs(combined['Longitude'] - combined['PU_long'])
        
        # Keep event data instance if coordinates fall within threshold of trip coordinates (hardcoded, roughly 500m area)
        combined = combined[(combined['lat_diff'] <= 0.0045) & (combined['long_diff'] <= 0.0059)]
        
        # Append only rows meeting both conditions 
        results.append(combined)
        file_name = f"special_merged_{i}.parquet"
        combined.to_parquet(file_name, index=False)

        # Call garbage collection to free up memory for next batch to be processed
        del combined, batch, temp_spec_events
        gc.collect()

    return pd.concat(results) 

processed_data = process_in_batches(spec_events, taxi_trips)


In [56]:
# Load all batch-processed Parquet files into a single DataFrame 
parquet_files = glob.glob('special_merged/*.parquet')
all_events = pd.concat([pd.read_parquet(file) for file in parquet_files], ignore_index=True)

# Merge the all filtered events with merged 'weather_taxi_pop' dataset on shared
# taxi_trips dataset feature names/values
all_events.rename(columns={'start_datetime_y': 'start_datetime',
                           'end_datetime_y': 'end_datetime',
                           'Event Type_x': 'Event Type'}, inplace=True)

# Merge the all filtered events with merged 'weather_taxi_pop' dataset on shared
# taxi_trips dataset feature names/values
updated_taxi_trips = pd.merge(taxi_trips,
                              all_events[['start_datetime', 'end_datetime', 'trip_distance', 'PU_lat', 'PU_long', 'Event Type']],
                              on=['start_datetime', 'end_datetime', 'trip_distance', 'PU_lat', 'PU_long'],
                              how='left')

# Copy event type data from merged dataset into final dataset column
updated_taxi_trips['Event Type'] = updated_taxi_trips.apply(
    lambda row: row['Event Type_y'] if pd.notna(row['Event Type_y']) else row['Event Type_x'], axis=1)

# Delete extraneous columns, rename columns where needed
updated_taxi_trips = updated_taxi_trips.drop(columns=['Event Type_x', 'Event Type_y'])
updated_taxi_trips = updated_taxi_trips.rename(columns={'Event Type': 'special_events'})
special_events = updated_taxi_trips['special_events']
updated_taxi_trips = updated_taxi_trips.drop(columns='special_events')
updated_taxi_trips = updated_taxi_trips.drop_duplicates()
updated_taxi_trips['special_events'] = special_events

updated_taxi_trips.to_parquet('../partial_merge/special_events_merge.parquet')