## This script accesses the large data file containing individual journey information, queries it with SQL via duckdb and saves aggregated dataframes to be used in the further visualisations

In [1]:
import duckdb

In [None]:
# filepath to be referenced in queries
bigfile = 'C:/Data/Citibike_NY_2022/merged/df_weather_duration.parquet'

In [19]:
duckdb.query(f"""
    SELECT *
    FROM '{bigfile}'
    LIMIT 5
""")

┌──────────────────┬───────────────┬─────────────────────────┬─────────────────────────┬────────────────────────┬─────────────────────────┬─────────────┬──────────────┬─────────────┬──────────────┬───────────────┬─────────────────────┬───────┬───────┬───────┬────────────────────┐
│     ride_id      │ rideable_type │       started_at        │        ended_at         │   start_station_name   │    end_station_name     │  start_lat  │  start_lng   │   end_lat   │   end_lng    │ member_casual │        date         │ AWND  │ PRCP  │ TAVG  │   trip_duration    │
│     varchar      │    varchar    │      timestamp_ns       │      timestamp_ns       │        varchar         │         varchar         │   double    │    double    │   double    │    double    │    varchar    │    timestamp_ns     │ int64 │ int64 │ int64 │       double       │
├──────────────────┼───────────────┼─────────────────────────┼─────────────────────────┼────────────────────────┼─────────────────────────┼─────────────┼────

In [None]:
# Creating table with top 20 stations by number of routes beginning there
duckdb.query(f"""
    COPY (
        SELECT 
            start_station_name,
            COUNT(*) AS num_trips
        FROM '{bigfile}'
        GROUP BY start_station_name
        ORDER BY num_trips DESC
        LIMIT 20
    ) TO 'C:/Data/Citibike_NY_2022/merged/top_20.csv' (FORMAT CSV, HEADER TRUE);
""")

In [5]:
# Check how many unique coordinates each start station has
duckdb.query(f"""
    SELECT 
        start_station_name,
        COUNT(DISTINCT start_lat || ',' || start_lng) AS coord_versions
    FROM '{bigfile}'
    GROUP BY start_station_name
    ORDER BY coord_versions DESC
""").to_df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,start_station_name,coord_versions
0,1 Ave & E 68 St,16597
1,Broadway & W 58 St,15936
2,W 21 St & 6 Ave,12876
3,Broadway & W 41 St,12705
4,E 40 St & Park Ave,12003
...,...,...
1756,Schermerhorn St and Court St,1
1757,Folin St & E 181 St,1
1758,Murray St\t& West St,1
1759,Broadway\t& W 48 St,1


In [22]:
# Count how many versions of coordinates there are for each station and also collect 
# these variants to have a look at them
duckdb.query(f"""
    SELECT 
        start_station_name,
        ARRAY_AGG(DISTINCT start_lat || ',' || start_lng) AS coord_variants,
        COUNT(DISTINCT start_lat || ',' || start_lng) AS variant_count
    FROM '{bigfile}'
    GROUP BY start_station_name
    HAVING variant_count > 1
    ORDER BY variant_count DESC
""").to_df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,start_station_name,coord_variants,variant_count
0,1 Ave & E 68 St,"[40.765034318,-73.958014488, 40.764937162,-73....",16633
1,Broadway & W 58 St,"[40.766798735,-73.981944203, 40.766861439,-73....",16074
2,W 21 St & 6 Ave,"[40.741863489,-73.994309425, 40.741694808,-73....",12901
3,Broadway & W 41 St,"[40.755205631,-73.986714959, 40.755051017,-73....",12898
4,W 31 St & 7 Ave,"[40.749011874,-73.991641998, 40.749185681,-73....",12035
...,...,...,...
1744,Fulton St & Pearl St,"[40.707722,-74.004386, 40.70772198491691,-74.0...",2
1745,E 110 St & Madison Ave,"[40.796154,-73.947821, 40.7961535,-73.94782145]",2
1746,E 20 St & FDR Drive,"[40.73314259,-73.97573881, 40.733143,-73.975739]",2
1747,Broad St & Water St,"[40.702938,-74.011494, 40.70293782409166,-74.0...",2


In [40]:
# Determining what are the most common coordinates for each station and then joining them onto the main file
    # Save as view 
duckdb.query(f"""
CREATE OR REPLACE TEMP VIEW cleaned_trips AS
-- CTE to get the common coordinates per start station
WITH most_common_coords_s AS (
    SELECT start_station_name,
        start_lat,
        start_lng,
        ROW_NUMBER() OVER (
            PARTITION BY start_station_name
            ORDER BY COUNT(*) DESC
        ) AS rn
    FROM '{bigfile}'
    GROUP BY start_station_name,
        start_lat,
        start_lng
),

-- CTE to get the common coordinates per end station
most_common_coords_e AS (
    SELECT end_station_name,
        end_lat,
        end_lng,
        ROW_NUMBER() OVER (
            PARTITION BY end_station_name
            ORDER BY COUNT(*) DESC
        ) AS rn
    FROM '{bigfile}'
    GROUP BY end_station_name,
        end_lat,
        end_lng
)

-- joining most common coordinates to main table
SELECT
    s.start_lat AS start_lat,
    s.start_lng AS start_lng,
    e.end_lat AS end_lat,
    e.end_lng AS end_lng,
    t.*
    EXCLUDE(start_lat, start_lng, end_lat, end_lng)
FROM '{bigfile}' t
JOIN most_common_coords_s s
    ON t.start_station_name = s.start_station_name AND s.rn = 1
JOIN most_common_coords_e e
    ON t.end_station_name = e.end_station_name AND e.rn = 1
""")

In [41]:
# Preview first 10 rows of view
duckdb.query("""
    SELECT *
    FROM cleaned_trips 
    LIMIT 10""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

┌───────────────────┬────────────────────┬─────────────┬────────────────┬──────────────────┬───────────────┬─────────────────────────┬─────────────────────────┬────────────────────────┬─────────────────────────┬───────────────┬─────────────────────┬───────┬───────┬───────┬────────────────────┐
│     start_lat     │     start_lng      │   end_lat   │    end_lng     │     ride_id      │ rideable_type │       started_at        │        ended_at         │   start_station_name   │    end_station_name     │ member_casual │        date         │ AWND  │ PRCP  │ TAVG  │   trip_duration    │
│      double       │       double       │   double    │     double     │     varchar      │    varchar    │      timestamp_ns       │      timestamp_ns       │        varchar         │         varchar         │    varchar    │    timestamp_ns     │ int64 │ int64 │ int64 │       double       │
├───────────────────┼────────────────────┼─────────────┼────────────────┼──────────────────┼───────────────┼───────

In [42]:
# Check that each start station only has one set of coords in view
duckdb.query(f"""
SELECT 
    start_station_name,
    COUNT(DISTINCT start_lat::text || ',' || start_lng::text) AS coord_versions
FROM cleaned_trips
GROUP BY start_station_name
ORDER BY coord_versions DESC
""").to_df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,start_station_name,coord_versions
0,W 54 St & 11 Ave,1
1,27 Ave & 3 St,1
2,7 Ave & 40 St,1
3,Longwood Ave & Southern Blvd,1
4,Grand Concourse & E 144 St,1
...,...,...
1756,21 Ave & Shore Blvd,1
1757,Westchester Ave & Longwood Ave,1
1758,46 St & Queens Blvd,1
1759,Church St & Thomas St,1


In [43]:
# Now for end station
duckdb.query(f"""
SELECT 
    end_station_name,
    COUNT(DISTINCT end_lat::text || ',' || end_lng::text) AS coord_versions
FROM cleaned_trips
GROUP BY end_station_name
ORDER BY coord_versions DESC
""").to_df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,end_station_name,coord_versions
0,E 97 St & Madison Ave,1
1,Lafayette Park,1
2,Suydam St & Broadway,1
3,Willis Ave & E 143 St,1
4,Greenwich St & Hubert St,1
...,...,...
1836,Bergen St & 4 Ave,1
1837,River Ave & McClellan St,1
1838,W 170 St & University Ave,1
1839,59 St & 5 Ave,1


In [44]:
# Creating df at the route level showing number of trips between each route
# Including coordinates so it can be used for maps
routes = duckdb.query( """
    SELECT 
        CONCAT(start_station_name, '-', end_station_name) AS route,
        COUNT(*) AS num_trips,
        start_station_name, 
        end_station_name,
        ANY_VALUE(start_lng) AS start_lng,
        ANY_VALUE(start_lat) AS start_lat,
        ANY_VALUE(end_lng) AS end_lng,
        ANY_VALUE(end_lat) AS end_lat
    FROM cleaned_trips
    GROUP BY 
        start_station_name, 
        end_station_name
    ORDER BY num_trips DESC;
""").to_df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [45]:
routes.head()

Unnamed: 0,route,num_trips,start_station_name,end_station_name,start_lng,start_lat,end_lng,end_lat
0,Central Park S & 6 Ave-Central Park S & 6 Ave,12041,Central Park S & 6 Ave,Central Park S & 6 Ave,-73.976342,40.765909,-73.976342,40.765909
1,7 Ave & Central Park South-7 Ave & Central Par...,8541,7 Ave & Central Park South,7 Ave & Central Park South,-73.979069,40.766741,-73.979069,40.766741
2,Roosevelt Island Tramway-Roosevelt Island Tramway,8213,Roosevelt Island Tramway,Roosevelt Island Tramway,-73.9536,40.757284,-73.9536,40.757284
3,Grand Army Plaza & Central Park S-Grand Army P...,7287,Grand Army Plaza & Central Park S,Grand Army Plaza & Central Park S,-73.973715,40.764397,-73.973715,40.764397
4,Soissons Landing-Soissons Landing,7275,Soissons Landing,Soissons Landing,-74.014866,40.692317,-74.014866,40.692317


In [46]:
routes.shape

(1013379, 8)

In [47]:
# Saving object as df
routes.to_csv("C:/Data/Citibike_NY_2022/merged/routes.csv", index=False)

In [48]:
# Checking format of temperature
duckdb.query(f"""
    SELECT
        MIN(TAVG),
        MAX(TAVG)
    FROM '{bigfile}'
""")

┌───────────┬───────────┐
│ min(TAVG) │ max(TAVG) │
│   int64   │   int64   │
├───────────┼───────────┤
│      -117 │       313 │
└───────────┴───────────┘

In [49]:
# Query for making a df with each day as a row - showing number of trips and weather variables
df_weather = duckdb.query("""
    SELECT 
        date,
        ANY_VALUE(TAVG) / 10 AS temperature, --- dividing by 10 so more intuitive
        ANY_VALUE(PRCP) AS precipitation,
        ANY_VALUE(AWND) AS wind,
        COUNT(*) AS trip_count
    FROM cleaned_trips
    GROUP BY date                       
""").to_df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [50]:
df_weather.head()

Unnamed: 0,date,temperature,precipitation,wind,trip_count
0,2022-04-05,9.7,15,34,72590
1,2022-04-17,8.3,0,70,54053
2,2022-03-19,13.1,18,33,77738
3,2022-03-31,9.9,13,48,72571
4,2022-09-21,22.0,0,33,133363


In [51]:
# Saving object as df
df_weather.to_csv("C:/Data/Citibike_NY_2022/merged/df_weather.csv", index=False)