In [None]:
import dask.dataframe as dd
import pandas as pd

# Load your taxi data
# OUTPUT_DIR = '/d/hpc/projects/FRI/bigdata/students/in7357/out'
OUTPUT_DIR = '/home/ivan/FRI/2024-2025/sem2/bd/hw3/data/out'
from dask.diagnostics import ProgressBar

events = dd.read_csv('/d/hpc/projects/FRI/bigdata/students/in7357/nyc_event_data', assume_missing=True)

In [2]:
events.head(2)

Unnamed: 0,Event ID,Event Name,Start Date/Time,End Date/Time,Event Agency,Event Type,Event Borough,Event Location,Event Street Side,Street Closure Type,Community Board,Police Precinct
0,368421.0,Big Apple Circus,11/18/2017 07:00:00 PM,11/18/2017 08:00:00 PM,Parks Department,Special Event,Manhattan,"Damrosch Park: Damrosch Park ,Damrosch Park: T...",,,7,20
1,330050.0,Mt. Eden Farmer's Market,11/16/2017 08:00:00 AM,11/16/2017 04:00:00 PM,Parks Department,Special Event,Bronx,Mount Eden Malls: Mount Eden Malls,,,4,44


In [3]:
events['Start Date/Time'] = dd.to_datetime(events['Start Date/Time'], format='%m/%d/%Y %I:%M:%S %p')
events['End Date/Time'] = dd.to_datetime(events['End Date/Time'], format='%m/%d/%Y %I:%M:%S %p')

In [4]:
selected_iterval = (events['Start Date/Time'] > '2020-01-01') & (events['Start Date/Time'] < '2024-03-01')
events = events[selected_iterval]

In [None]:
important_event_types = [
    "Marathon", 
    "BID Multi-Block", 
    "Rally", 
    # "Parade", 
    # "Plaza Event", 
    "Festival", 
    "Health Fair", 
    # "Parade"
]
# Filter out rows based on Event Name
events = events[events['Event Type'].isin(important_event_types)]

In [None]:
events['Event Type'].value_counts().compute()

Event Type
Parade             19438
Plaza Event        13152
Health Fair         1040
Marathon               7
BID Multi-Block       40
Rally                164
Name: count, dtype: int64[pyarrow]



In [12]:
events_df = events.compute()

In [13]:
len(events_df)

33841

In [None]:
# events_df.to_csv(f'{OUTPUT_DIR}/events_filtered.csv', index=False)

In [3]:
important_event_types = [
    "Marathon", 
    "BID Multi-Block", 
    "Rally", 
    # "Parade", 
    # "Plaza Event", 
    "Festival", 
    "Health Fair", 
    # "Parade"
]

In [4]:
events_df = pd.read_csv(f'{OUTPUT_DIR}/events_filtered.csv')
events_df = events_df[events_df['Event Type'].isin(important_event_types)]
len(events_df)

1251

In [5]:
events_df['Event ID'] = events_df['Event ID'].astype(int)
events_df['Event Location'].value_counts()

events_df['Event Location'] = events_df['Event Location'].str.split('between').str[0]


In [6]:
events_df['Event Location'].value_counts()

Event Location
MORNINGSIDE AVENUE     34
WEST  119 STREET       26
WEST  123 STREET       25
FURMAN AVENUE          24
MONTEREY AVENUE        22
                       ..
EAST GUN HILL ROAD      1
WEST  145 STREET        1
HARMAN STREET           1
ROCKAWAY PARKWAY        1
116 AVENUE              1
Name: count, Length: 182, dtype: int64

In [7]:
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from geopy.exc import GeocoderTimedOut, GeocoderUnavailable

# Initialize Nominatim geocoder
geolocator = Nominatim(user_agent="event_location_geocoder", timeout=10)
geocode = RateLimiter(geolocator.geocode, min_delay_seconds=1)

def preprocess_location(location):
    # Clean and format the location string
    location = (
        location.replace("between", "&")
        .replace(" and ", " & ")
        .replace("STREET", "St")
        .replace("BOULEVARD", "Blvd")
        .replace("BROADWAY", "Broadway")
        + ", New York, NY"  # Add city/state for better accuracy
    )
    return location

def get_coordinates(location):
    try:
        location = preprocess_location(location)
        result = geocode(location)
        if result:
            return (result.latitude, result.longitude)
        else:
            return (None, None)
    except (GeocoderTimedOut, GeocoderUnavailable):
        return (None, None)


location = "GOUVERNEUR STREET"
print(get_coordinates(location)) 

(40.71225982427319, -73.98374241026366)


In [None]:
unique_locations = events_df['Event Location'].unique()

# Create a DataFrame to store coordinates
locations_df = pd.DataFrame(unique_locations, columns=['Event Location'])
locations_df[['lat', 'lon']] = locations_df['Event Location'].apply(
    lambda x: pd.Series(get_coordinates(x)))

In [10]:
# locations_df.to_csv(f'{OUTPUT_DIR}/locations_coordinates.csv', index=False)
locations_df = pd.read_csv(f'{OUTPUT_DIR}/locations_coordinates.csv')
locations_df = locations_df.dropna()
locations_df = locations_df.drop_duplicates(subset=['Event Location'])
locations_df

Unnamed: 0,Event Location,lat,lon
4,PITKIN AVENUE,40.673353,-73.886029
7,OLD BROADWAY,40.985151,-73.879895
8,MORNINGSIDE AVENUE,40.811032,-73.954259
11,BENEDICT AVENUE,40.622899,-74.126122
14,EAST 114 STREET,43.149887,-77.603218
...,...,...,...
177,LIBERTY STREET,40.709994,-74.012325
178,FRANKLIN AVENUE,40.670681,-73.957973
179,HULL STREET,40.679362,-73.912410
180,EAST 38 STREET,40.746797,-73.972865


In [None]:
import pandas as pd
import dask.dataframe as dd
import geopandas as gpd
import zipfile
import os


zip_path = "/home/ivan/FRI/2024-2025/sem2/bd/hw3/marko/taxi_zones.zip"  
extract_dir = "/home/ivan/FRI/2024-2025/sem2/bd/hw3/marko/taxi_zones_shapefile"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

shp_file = [f for f in os.listdir(extract_dir) if f.endswith(".shp")][0]
gdf = gpd.read_file(os.path.join(extract_dir, shp_file)).to_crs("EPSG:4326")

# Compute centroids
gdf["latitude"] = gdf.centroid.y
gdf["longitude"] = gdf.centroid.x
zone_coords = gdf[["LocationID", "latitude", "longitude"]].copy()
pickup_zones = zone_coords.rename(columns={
    "LocationID": "pulocationid",
    "latitude": "pickup_latitude",
    "longitude": "pickup_longitude"
})
dropoff_zones = zone_coords.rename(columns={
    "LocationID": "dolocationid",
    "latitude": "dropoff_latitude",
    "longitude": "dropoff_longitude"
})


taxi_df = dd.read_parquet(f"{OUTPUT_DIR}/optimized_parquet")
taxi_df['pulocationid'] = taxi_df['pulocationid'].astype('int32')
taxi_df['dolocationid'] = taxi_df['dolocationid'].astype('int32')
# Merge pickup and dropoff coordinates
taxi_df = taxi_df.merge(pickup_zones, on="pulocationid", how="left")
taxi_df = taxi_df.merge(dropoff_zones, on="dolocationid", how="left")




  gdf["latitude"] = gdf.centroid.y

  gdf["longitude"] = gdf.centroid.x


In [14]:
# merge events with the locations

events_merged = events_df.merge(locations_df, on='Event Location', how='left')
events_merged = events_merged.dropna(subset=['lat', 'lon'])

In [15]:
events_merged_gdf = gpd.GeoDataFrame(
    events_merged,
    geometry=gpd.points_from_xy(events_merged['lon'], events_merged['lat']),
    crs="EPSG:4326"
).to_crs("EPSG:32618")

In [16]:
taxi_df = dd.read_csv(f"{OUTPUT_DIR}/sample_taxi_data.csv")

In [17]:
# Merge with taxi data
def process_partition(df_partition, events_gdf):
    # Convert partition to pandas DataFrame
    df = df_partition.copy()
    
    pickup_gdf = gpd.GeoDataFrame(
        df,
        geometry=gpd.points_from_xy(df.pickup_longitude, df.pickup_latitude),
        crs="EPSG:4326"
    ).to_crs("EPSG:32618")
    
    pickup_gdf['buffer'] = pickup_gdf.geometry.buffer(1000)
    pickup_joined = gpd.sjoin(
        pickup_gdf.set_geometry('buffer'), 
        events_gdf, 
        how='inner', 
        predicate='intersects'
    )
    pickup_events = pickup_joined[
        pickup_joined.tpep_pickup_datetime.between(
            pickup_joined['Start Date/Time'], 
            pickup_joined['End Date/Time']
        )
    ].groupby(level=0).size().rename('pickup_events')
    
    # Process dropoff events
    dropoff_gdf = gpd.GeoDataFrame(
        df,
        geometry=gpd.points_from_xy(df.dropoff_longitude, df.dropoff_latitude),
        crs="EPSG:4326"
    ).to_crs("EPSG:32618")
    
    dropoff_gdf['buffer'] = dropoff_gdf.geometry.buffer(1000)
    dropoff_joined = gpd.sjoin(
        dropoff_gdf.set_geometry('buffer'), 
        events_gdf, 
        how='inner', 
        predicate='intersects'
    )
    dropoff_events = dropoff_joined[
        dropoff_joined.tpep_dropoff_datetime.between(
            dropoff_joined['Start Date/Time'], 
            dropoff_joined['End Date/Time']
        )
    ].groupby(level=0).size().rename('dropoff_events')
    
    # Merge results
    return df.join(pickup_events, how='left') \
             .join(dropoff_events, how='left') \
             .fillna({'pickup_events': 0, 'dropoff_events': 0}) \
             .assign(total_events=lambda x: x.pickup_events + x.dropoff_events)

# Apply processing to all partitions
meta = taxi_df._meta.copy()
meta['pickup_events'] = pd.Series(dtype='int64')
meta['dropoff_events'] = pd.Series(dtype='int64')
meta['total_events'] = pd.Series(dtype='int64')

taxi_enriched = taxi_df.map_partitions(
    process_partition,
    events_gdf=events_merged_gdf,  
    meta=meta)



In [18]:
taxi_enriched = taxi_enriched.compute()

In [21]:
taxi_enriched.head(10)

Unnamed: 0,vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,...,congestion_surcharge,airport_fee,year,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,pickup_events,dropoff_events,total_events
0,1.0,2020-01-22 20:10:13,2020-01-22 20:28:03,1.0,2.2,1.0,N,100,113,1.0,...,2.5,,2020,40.753513,-73.988787,40.732579,-73.994305,0.0,0.0,0.0
1,2.0,2020-01-18 23:54:57,2020-01-19 00:04:55,6.0,1.96,1.0,N,264,264,1.0,...,2.5,,2020,,,,,0.0,0.0,0.0
2,1.0,2020-01-04 11:10:51,2020-01-04 11:18:37,3.0,1.3,1.0,N,161,237,2.0,...,2.5,,2020,40.758028,-73.977698,40.768615,-73.965635,0.0,0.0,0.0
3,2.0,2020-01-23 08:05:22,2020-01-23 08:17:23,5.0,1.73,1.0,N,141,162,1.0,...,2.5,,2020,40.766948,-73.959635,40.756688,-73.972356,0.0,0.0,0.0
4,2.0,2020-01-16 17:09:03,2020-01-16 17:12:50,1.0,0.93,1.0,N,263,236,1.0,...,2.5,,2020,40.778766,-73.95101,40.780436,-73.957012,0.0,0.0,0.0
5,2.0,2020-01-06 17:32:51,2020-01-06 17:42:57,1.0,1.3,1.0,N,79,232,1.0,...,2.5,,2020,40.72762,-73.985937,40.714733,-73.983025,0.0,0.0,0.0
6,2.0,2020-01-21 22:49:19,2020-01-21 22:59:49,1.0,3.44,1.0,N,48,238,1.0,...,2.5,,2020,40.762253,-73.989845,40.791705,-73.973049,0.0,0.0,0.0
7,1.0,2020-01-14 14:43:16,2020-01-14 14:57:14,1.0,1.3,1.0,N,170,90,1.0,...,2.5,,2020,40.747746,-73.978492,40.742279,-73.996971,0.0,0.0,0.0
8,2.0,2020-01-30 07:06:00,2020-01-30 07:44:00,,10.26,,,69,234,0.0,...,,,2020,40.831417,-73.915029,40.740337,-73.990458,0.0,0.0,0.0
9,1.0,2020-01-16 08:07:20,2020-01-16 08:33:05,1.0,3.2,1.0,N,238,162,1.0,...,2.5,,2020,40.791705,-73.973049,40.756688,-73.972356,0.0,0.0,0.0


In [None]:
taxi_enriched.to_parquet(f"{OUTPUT_DIR}/taxi_enriched_with_events.parquet",
                         write_index=False)