In [70]:
import numpy as np
import pandas as pd 
import geopandas as gpd
import os
from shapely.geometry import Point
from shapely.geometry import MultiPolygon, Polygon
import time
import psycopg2
from io import StringIO

In [112]:
def add_bike(dataframe, taxiZone):
    #time process to estimate the total time across all data sources
    start_time = time.time()
    
    dataframe['mode'] = 'bike'
    dataframe['passenger_count'] = 1

    #defining tables for pickup
    bikePickup = dataframe[['passenger_count', 'started_at_unix', 'start_lng', 'start_lat', 'mode']].copy()
    bikePickup.loc[:, 'geometry'] = bikePickup.apply(lambda row: Point(row['start_lng'], row['start_lat']), axis=1)
    bikePickup.loc[:, 'passenger_count'] = bikePickup['passenger_count'] * -1

    geoPickup = gpd.GeoDataFrame(bikePickup, geometry='geometry', crs="EPSG:4326")
    pickupJoined = gpd.sjoin(geoPickup, taxiZone, how="left", predicate="within")
    pickupJoined = pickupJoined.drop(columns=['geometry', 'start_lng', 'start_lat', 'index_right'])

    #defining tables for dropoff
    bikeDropoff = bike[['passenger_count', 'ended_at_unix', 'end_lng', 'end_lat', 'mode']].copy()
    bikeDropoff.loc[:, 'geometry'] = bikeDropoff.apply(lambda row: Point(row['end_lng'], row['end_lat']), axis=1)
    
    geoDropoff = gpd.GeoDataFrame(bikeDropoff, geometry='geometry', crs="EPSG:4326")
    dropoffJoined = gpd.sjoin(geoDropoff, taxiZone, how="left", predicate="within")
    dropoffJoined = dropoffJoined.drop(columns=['geometry', 'end_lng', 'end_lat', 'index_right'])
    
    
    # Rename columns in pickupJoined and dropoffJoined
    pickupJoined = pickupJoined.rename(columns={
        'passenger_count': 'busynessscore',
        'started_at_unix': 'timestamp',
        'location_id':'zonenumber'
    })
    
    dropoffJoined = dropoffJoined.rename(columns={
        'passenger_count': 'busynessscore',
        'ended_at_unix': 'timestamp',
        'location_id':'zonenumber'
    })
    
    # Concatenate pickupJoined and dropoffJoined into a single DataFrame
    bikeOutput = pd.concat([pickupJoined, dropoffJoined], ignore_index=True)
    
    bikeOutput['busynessscore'] = bikeOutput['busynessscore'].astype(int)
    
    print(f'add_bike took {time.time() - start_time} seconds to complete')
    
    return bikeOutput

In [134]:
def add_taxi(dataframe, taxiZone):
    #time process to estimate the total time across all data sources
    start_time = time.time()
    
    dataframe['mode'] = 'taxi'

    #defining tables for pickup
    taxiPickup = dataframe[['passenger_count', 'started_at_unix', 'start_lng', 'start_lat', 'mode']].copy()
    taxiPickup.loc[:, 'geometry'] = taxiPickup.apply(lambda row: Point(row['start_lng'], row['start_lat']), axis=1)
    taxiPickup.loc[:, 'passenger_count'] = taxiPickup['passenger_count'] * -1

    geoPickup = gpd.GeoDataFrame(taxiPickup, geometry='geometry', crs="EPSG:4326")
    pickupJoined = gpd.sjoin(geoPickup, taxiZone, how="left", predicate="within")
    pickupJoined = pickupJoined.drop(columns=['geometry', 'start_lng', 'start_lat', 'index_right'])

    #defining tables for dropoff
    taxiDropoff = taxi[['passenger_count', 'ended_at_unix', 'end_lng', 'end_lat', 'mode']].copy()
    taxiDropoff.loc[:, 'geometry'] = taxiDropoff.apply(lambda row: Point(row['end_lng'], row['end_lat']), axis=1)
    
    geoDropoff = gpd.GeoDataFrame(taxiDropoff, geometry='geometry', crs="EPSG:4326")
    dropoffJoined = gpd.sjoin(geoDropoff, taxiZone, how="left", predicate="within")
    dropoffJoined = dropoffJoined.drop(columns=['geometry', 'end_lng', 'end_lat', 'index_right'])
    
    
    # Rename columns in pickupJoined and dropoffJoined
    pickupJoined = pickupJoined.rename(columns={
        'passenger_count': 'busynessscore',
        'started_at_unix': 'timestamp',
        'location_id':'zonenumber'
    })
    
    dropoffJoined = dropoffJoined.rename(columns={
        'passenger_count': 'busynessscore',
        'ended_at_unix': 'timestamp',
        'location_id':'zonenumber'
    })
    
    # Concatenate pickupJoined and dropoffJoined into a single DataFrame
    taxiOutput = pd.concat([pickupJoined, dropoffJoined], ignore_index=True)
    
    taxiOutput['busynessscore'] = taxiOutput['busynessscore'].astype(int)
    
    print(f'add_taxi took {time.time() - start_time} seconds to complete')
    taxiOutput = taxiOutput[taxiOutput['zonenumber'].isna()==False]
    return taxiOutput

In [75]:
taxiZone = gpd.read_file('NYCTaxiZones.geojson')
taxiZone = taxiZone.drop(columns=['shape_area','zone','borough','objectid','shape_leng'])
taxiZone.set_geometry("geometry", inplace=True)

In [61]:
taxi = pd.read_csv(taxiArray[0])

In [129]:
taxiOutput = add_taxi(taxi, taxiZone)

add_taxi took 40.38041400909424 seconds to complete


In [133]:
# taxiOutput['zonenumber']==null
null_zonenumber_rows = taxiOutput[taxiOutput['zonenumber'].isna()]

# Print the filtered DataFrame
# print(null_zonenumber_rows)

filtered_taxi_2 = taxiOutput[taxiOutput['zonenumber'].isna()==False]
filtered_taxi_2

Unnamed: 0,busynessscore,timestamp,mode,zonenumber
0,-1,1704070675,taxi,186
1,-1,1704067380,taxi,140
2,-1,1704068226,taxi,236
3,-1,1704069398,taxi,79
4,-1,1704070011,taxi,211
...,...,...,...,...
5457571,1,1706743773,taxi,243
5457572,1,1706743110,taxi,129
5457573,1,1706743055,taxi,261
5457574,3,1706743500,taxi,249


In [136]:
def connect_to_postgres(dataframe ,postgres_host, postgres_port, database_name, db_user, db_password, table_name):
    
    start_time = time.time()
    
    conn = psycopg2.connect(
        database = database_name,
        user = db_user,
        password = db_password,
        host=postgres_host,
        port=postgres_port
    )
    print("PostgreSQL connection opened")

    # Example: Execute SQL query
    cursor = conn.cursor()
        
# Create a temporary table with DOUBLE PRECISION for lat/lng and BIGINT for Unix timestamps
        
    cursor.execute('''
    CREATE TEMP TABLE temp (
        busynessscore INTEGER,
        timestamp DOUBLE PRECISION,
        mode VARCHAR(50),
        zonenumber INTEGER 
    );
    ''')

        # Using StringIO to perform bulk insert into the temporary table
        buffer = StringIO()
        dataframe.to_csv(buffer, index=False, header=False)
        buffer.seek(0)

        cursor.copy_expert(f"""
        COPY temp (busynessscore, timestamp, mode, zonenumber)
        FROM STDIN WITH CSV
        """, buffer)

        # Insert data from the temporary table to the target table with conflict handling
        cursor.execute(f"""
        INSERT INTO {table_name} (busynessscore, timestamp, mode, zonenumber)
        SELECT busynessscore, timestamp, mode, zonenumber
        FROM temp
        ON CONFLICT DO NOTHING;
        """)

        # Commit the transaction
        try conn.commit():
            print("Data inserted successfully")
            
        conn.commit()
        print("Data inserted successfully")

        # Close the PostgreSQL connection
        cursor.close()
        conn.close()
        print("PostgreSQL connection closed")

    print(f'connect_to_postgres took {time.time() - start_time} seconds to complete')

IndentationError: unexpected indent (367535078.py, line 29)

In [82]:
bikeOutput = add_bike(bike, taxiZone)

add_taxi took 46.61847400665283 seconds to complete


In [83]:
bikeOutput

Unnamed: 0,busynessscore,timestamp,mode,zonenumber
0,-1,1696119404,bike,168
1,-1,1696119804,bike,168
2,-1,1696119678,bike,161
3,-2,1696119375,bike,151
4,-1,1696120394,bike,238
...,...,...,...,...
6462239,2,1698796674,bike,160
6462240,1,1698793763,bike,143
6462241,2,1698794566,bike,236
6462242,3,1698798019,bike,61


In [103]:
taxiDir = '/Users/brianmcmahon/Downloads/yellow_taxi_clean'

taxiArray = []
for subdir, dirs, files in os.walk(taxiDir):
    for file in files:
        x = os.path.join(subdir, file)
        if x[-3:] == 'csv':
            taxiArray.append(x)
# print(taxiArray)

In [104]:
bikeDir = '/Users/brianmcmahon/Downloads/CitiBike/filtered'

bikeArray = []
for subdir, dirs, files in os.walk(bikeDir):
    for file in files:
        x = os.path.join(subdir, file)
        if x[-3:] == 'csv':
            bikeArray.append(x)
# print(bikeArray)

In [105]:
subDir = '/Users/brianmcmahon/Desktop/Subway_Filtered'

subArray = []
for subdir, dirs, files in os.walk(subDir):
    for file in files:
        x = os.path.join(subdir, file)
        if x[-3:] == 'csv':
            subArray.append(x)
# print(subArray)

In [74]:
postgres_host = 'localhost'
postgres_port = 5432 
database_name = 'busyness'
db_user = 'crafty'
db_password = 'winner'
table_name = 'public.busyness_data'

# Call the function to establish SSH tunnel and connect to PostgreSQL
if __name__ == '__main__':
    taxiOutput = add_taxi(taxi, taxiZone)
    connect_to_postgres(taxiOutput ,postgres_host, postgres_port, database_name, db_user, db_password, table_name)

add_taxi took 45.88121581077576 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 12.824935913085938 seconds to complete


In [135]:
postgres_host = 'localhost'
postgres_port = 5432 
database_name = 'busyness'
db_user = 'crafty'
db_password = 'winner'
table_name = 'public.busyness_data_2'
taxiArray = ['/Users/brianmcmahon/Documents/Research Practicum/Data/taxi/yellow_tripdata_clean_2024-01.csv',
            '/Users/brianmcmahon/Documents/Research Practicum/Data/taxi/yellow_tripdata_clean_2024-02.csv',
            '/Users/brianmcmahon/Documents/Research Practicum/Data/taxi/yellow_tripdata_clean_2024-03.csv',
            '/Users/brianmcmahon/Documents/Research Practicum/Data/taxi/yellow_tripdata_clean_2024-04.csv']
# taxiArray
for i,taxi_name in enumerate(taxiArray):
    print(i+1,"/",len(taxiArray),taxi_name)
    taxi = pd.read_csv(taxi_name)
    taxiOutput = add_taxi(taxi, taxiZone)
    connect_to_postgres(taxiOutput ,postgres_host, postgres_port, database_name, db_user, db_password, table_name)
# taxi = pd.read_csv('/Users/brianmcmahon/Downloads/yellow_taxi_clean/-06.csv')
# taxiOutput = add_taxi(taxi, taxiZone)
# connect_to_postgres(taxiOutput ,postgres_host, postgres_port, database_name, db_user, db_password, table_name)

1 / 4 /Users/brianmcmahon/Documents/Research Practicum/Data/taxi/yellow_tripdata_clean_2024-01.csv
add_taxi took 40.27506399154663 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 41.79831004142761 seconds to complete
2 / 4 /Users/brianmcmahon/Documents/Research Practicum/Data/taxi/yellow_tripdata_clean_2024-02.csv
add_taxi took 39.309959173202515 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 40.08965611457825 seconds to complete
3 / 4 /Users/brianmcmahon/Documents/Research Practicum/Data/taxi/yellow_tripdata_clean_2024-03.csv
add_taxi took 44.2085440158844 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 47.2042920589447 seconds to complete
4 / 4 /Users/brianmcmahon/Documents/Research Practicum/Data/taxi/yellow_tripdata_clean_2024-04.csv
add_t

In [116]:
postgres_host = 'localhost'
postgres_port = 5432 
database_name = 'busyness'
db_user = 'crafty'
db_password = 'winner'
table_name = 'public.busyness_data'

for i,bike_name in enumerate(bikeArray):
    print(i+1,"/",len(bikeArray),bike_name)
    bike = pd.read_csv(bike_name)
    bikeOutput = add_bike(bike, taxiZone)
    connect_to_postgres(bikeOutput ,postgres_host, postgres_port, database_name, db_user, db_password, table_name)

1 / 78 /Users/brianmcmahon/Downloads/CitiBike/filtered/filtered_citibike_47.csv
add_bike took 15.62569808959961 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 6.055856704711914 seconds to complete
2 / 78 /Users/brianmcmahon/Downloads/CitiBike/filtered/filtered_citibike_53.csv
add_bike took 11.123969793319702 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 4.8388121128082275 seconds to complete
3 / 78 /Users/brianmcmahon/Downloads/CitiBike/filtered/filtered_citibike_52.csv
add_bike took 14.747516870498657 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 5.932225227355957 seconds to complete
4 / 78 /Users/brianmcmahon/Downloads/CitiBike/filtered/filtered_citibike_46.csv
add_bike took 10.411691665649414 seconds to complete
PostgreSQL connection op

add_bike took 13.289894819259644 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 6.3248209953308105 seconds to complete
31 / 78 /Users/brianmcmahon/Downloads/CitiBike/filtered/filtered_citibike_32.csv
add_bike took 14.400392055511475 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 7.5448620319366455 seconds to complete
32 / 78 /Users/brianmcmahon/Downloads/CitiBike/filtered/filtered_citibike_7.csv
add_bike took 13.592310905456543 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 6.279547214508057 seconds to complete
33 / 78 /Users/brianmcmahon/Downloads/CitiBike/filtered/filtered_citibike_3.csv
add_bike took 13.488721132278442 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postg

Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 3.0746140480041504 seconds to complete
60 / 78 /Users/brianmcmahon/Downloads/CitiBike/filtered/filtered_citibike_72.csv
add_bike took 9.62047815322876 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 4.116938829421997 seconds to complete
61 / 78 /Users/brianmcmahon/Downloads/CitiBike/filtered/filtered_citibike_73.csv
add_bike took 13.734704732894897 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 5.9048850536346436 seconds to complete
62 / 78 /Users/brianmcmahon/Downloads/CitiBike/filtered/filtered_citibike_67.csv
add_bike took 13.02605414390564 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 5.510311841964722 seconds to complete
63 / 78 /Users/brianmcmahon/Download

In [122]:
postgres_host = 'localhost'
postgres_port = 5432 
database_name = 'busyness'
db_user = 'crafty'
db_password = 'winner'
table_name = 'public.busyness_data'

for i,sub_name in enumerate(subArray):
    print(i+1,"/",len(subArray),sub_name)
    sub = pd.read_csv(sub_name)
    subOutput = add_sub(sub, taxiZone)
    connect_to_postgres(subOutput ,postgres_host, postgres_port, database_name, db_user, db_password, table_name)

1 / 6 /Users/brianmcmahon/Desktop/Subway_Filtered/combined_output_0.csv
add_sub took 79.59619784355164 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 27.96381902694702 seconds to complete
2 / 6 /Users/brianmcmahon/Desktop/Subway_Filtered/combined_output_1.csv
add_sub took 57.988381147384644 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 25.43887996673584 seconds to complete
3 / 6 /Users/brianmcmahon/Desktop/Subway_Filtered/combined_output_3.csv
add_sub took 67.09413409233093 seconds to complete
PostgreSQL connection opened
Data inserted successfully
PostgreSQL connection closed
connect_to_postgres took 26.313267946243286 seconds to complete
4 / 6 /Users/brianmcmahon/Desktop/Subway_Filtered/combined_output_2.csv
add_sub took 65.77857518196106 seconds to complete
PostgreSQL connection opened
Data inserted successfully
Postgr

In [119]:
sub = pd.read_csv(subArray[0])
subOutput = add_sub(sub, taxiZone)

add_sub took 74.16766691207886 seconds to complete


In [121]:
def add_sub(dataframe, taxiZone):
    #time process to estimate the total time across all data sources
    start_time = time.time()
    
    dataframe['mode'] = 'subway'

    #defining tables for pickup
    dataframe.loc[:, 'geometry'] = dataframe.apply(lambda row: Point(row['longitude'], row['latitude']), axis=1)
    dataframe.loc[:, 'ridership'] = dataframe['ridership'] * -1

    geoPickup = gpd.GeoDataFrame(dataframe, geometry='geometry', crs="EPSG:4326")
    subOutput = gpd.sjoin(geoPickup, taxiZone, how="left", predicate="within")
    subOutput = subOutput.drop(columns=['geometry', 'longitude', 'latitude', 'index_right'])
    
    
    # Rename columns in pickupJoined and dropoffJoined
    subOutput = subOutput.rename(columns={
        'ridership': 'busynessscore',
        'transit_timestamp': 'timestamp',
        'location_id':'zonenumber'
    })
        
    
    subOutput['busynessscore'] = subOutput['busynessscore'].astype(int)
    
    print(f'add_sub took {time.time() - start_time} seconds to complete')
    
    return subOutput

In [120]:
subOutput

Unnamed: 0,timestamp,busynessscore,mode,passenger_count,zonenumber
0,1711375200,-230,subway,1,186
1,1711375200,-47,subway,1,231
2,1711378800,-15,subway,1,69
3,1711382400,-10,subway,1,76
4,1711389600,-7,subway,1,94
...,...,...,...,...,...
8989101,1702220400,-15,subway,1,43
8989102,1702220400,-5,subway,1,177
8989103,1702220400,-1,subway,1,216
8989104,1702220400,-1,subway,1,231
