# Trips and Stations
* Create yearly trip parquet files
* Create bike dock stations parquet file

In [None]:
import pandas as pd
import numpy as np
import os
import dask.dataframe as dd
import pyarrow as pa
import logging
import requests, json
import urllib
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter

In [None]:
DATA_DIR = "data/"
# CSV_DIR = DATA_DIR + "tripdata_csv/"
PARQUET_DIR = DATA_DIR + "tripdata_parquet/"
NY_DIR = PARQUET_DIR + "NY/"
NJ_DIR = PARQUET_DIR + "NJ/"
STATIONS_DIR = DATA_DIR + "stations/"
PARQUET_EXTENSION = ".parquet"
# Station Information GBFS json url
STATION_INFO_URL = "https://gbfs.citibikenyc.com/gbfs/en/station_information.json"
# USGS Elevation Point Query Service url
USGS_ELEVATION_POINT_SERVICE_URL = r"https://nationalmap.gov/epqs/pqs.php?"

logging.basicConfig(level=logging.WARNING)

logging.info(
    f"{len(os.listdir(NJ_DIR))} Jersey City files and {len(os.listdir(NY_DIR))} New York City files"
)

# schema for parquet files in
TRIPDATA_COLUMN_DTYPES = {
    "tripduration": "int32",
    "starttime": "datetime64",
    "stoptime": "datetime64",
    "startstationid": "category",
    "startstationname": "category",
    "startstationlatitude": "category",
    "startstationlongitude": "category",
    "endstationid": "category",
    "endstationname": "category",
    "endstationlatitude": "category",
    "endstationlongitude": "category",
    "bikeid": "category",
    "usertype": "category",
    "birthyear": "category",
    "gender": "category",
}

# Create Trips Parquets

In [None]:
if not os.path.exists(NY_DIR):
    os.makedirs(os.path.dirname(NY_DIR))

if not os.path.exists(NJ_DIR):
    os.makedirs(os.path.dirname(NJ_DIR))

In [None]:
def merge_monthly_trips(year, directory: str) -> None:
    """
    Creates a merged parquet file from parquet files in a directory
    :param year: the year (int) to merge monthly data for. if None, then merge all files in directory
    :param directory: a directory containing parquet files with identical schema (column names) across files
    :return: None
    """
    if year:
        range_start = str(year) + "-01"
        range_end = str(year) + "-13"
        month_files = sorted(
            [
                directory + f
                for f in os.listdir(directory)
                if range_start <= f <= range_end
            ]
        )
    else:
        month_files = sorted(
            [
                directory + f
                for f in os.listdir(directory)
                if f.endswith(PARQUET_EXTENSION)
            ]
        )

    parquet_ddfs: list[dd.DataFrame] = []
    for month_file in month_files:
        if os.path.exists(month_file):
            ddf = dd.read_parquet(month_file)
            ddf.astype(TRIPDATA_COLUMN_DTYPES)
            ddf["birthyear"] = ddf["birthyear"].astype(
                "str"
            )  # some issue with birthyear in particular
            parquet_ddfs.append(ddf)

    all_trips = dd.concat(parquet_ddfs)
    filename = str(year) if year else "alltrips"
    all_trips.to_parquet(
        directory + filename + PARQUET_EXTENSION,
        schema={"birthyear": pa.string()},
        engine="pyarrow",
    )

In [None]:
%%time
# create parquet file from all trip data (NY)
# NOTE run this before running the below cell if you want this large file. running it after will not work
merge_monthly_trips(year=None, directory=NY_DIR)

In [None]:
%%time
# create yearly trip data parquet files
for year in range(2013, 2022):
    merge_monthly_trips(year, NY_DIR)

In [None]:
# example: read a yearly parquet file (2019)

trip_columns = [
    "tripduration",
    "starttime",
    "stoptime",
    "startstationid",
    "endstationid",
    "bikeid",
    "usertype",
    "birthyear",
    "gender",
]  # specify columns you want to read
test = pd.read_parquet(
    NY_DIR + "2019.parquet", engine="pyarrow", columns=trip_columns
).reset_index()
test.drop(test.columns[0], axis=1, inplace=True)  # drop the dask index
test

# Create Stations Parquets

In [None]:
if not os.path.exists(STATIONS_DIR):
    os.makedirs(os.path.dirname(STATIONS_DIR))

In [None]:
def create_stations(year, directory):
    """
    Creates station table for year, saves to parquet file
    :param year: year to create stations for using trip data for that year
    :param directory: directory with the trip data parquet file
    :return: None
    """
    trip_filepath = directory + str(year) + PARQUET_EXTENSION
    trips = pd.read_parquet(trip_filepath, engine="pyarrow").reset_index()
    trips.drop(trips.columns[0], axis=1, inplace=True)  # drop the dask index

    station_columns = [
        "startstationid",
        "startstationname",
        "startstationlatitude",
        "startstationlongitude",
    ]
    stations = trips[station_columns]
    col_rename = {
        "startstationid": "stationid",
        "startstationname": "stationname",
        "startstationlatitude": "latitude",
        "startstationlongitude": "longitude",
    }
    stations.rename(columns=col_rename, inplace=True)
    stations.drop_duplicates(subset=["stationid"], inplace=True)

    stations_filepath = STATIONS_DIR + str(year) + PARQUET_EXTENSION
    stations.to_parquet(stations_filepath, engine="pyarrow")

In [None]:
%%time
for year in range(2013, 2022):
    create_stations(year, NY_DIR)

In [None]:
def merge_stations() -> pd.DataFrame:
    """
    Return merged yearly station files
    """
    stations_dfs = []
    stations_files = [
        f for f in os.listdir(STATIONS_DIR) if not f.startswith("stations")
    ]
    for station_file in stations_files:
        filepath = STATIONS_DIR + station_file
        stations_dfs.append(pd.read_parquet(filepath))

    all_stations = pd.concat(stations_dfs)
    all_stations.drop_duplicates(subset=["stationid"], inplace=True)

    return all_stations

In [None]:
def add_station_capacity(stations: pd.DataFrame) -> pd.DataFrame:
    """
    Adds station capacity info from Citibike GBFS feed
    :param stations:
    :return: stations with capacity info
    """
    # get station info
    url = requests.get(STATION_INFO_URL)
    data = json.loads(url.text)
    station_details = pd.DataFrame.from_dict(data["data"]["stations"])

    # extract capacity and merge back to dataframe
    station_details = station_details[["name", "capacity"]]
    station_details.rename(columns={"name": "stationname"}, inplace=True)

    return stations.merge(station_details, how="left", on="stationname")

In [None]:
def add_station_geodata(stations: pd.DataFrame) -> pd.DataFrame:
    """
    Adds station geodata info
    :param stations:
    :return: stations df with geodata info
    """
    logging.debug("reverse geocoding boro and neighbourhood, wait 15-20 mins...")
    geolocator = Nominatim(user_agent="bikegeocode")
    reverse = RateLimiter(geolocator.reverse, min_delay_seconds=1)
    locations_lst = []
    for index, row in stations.iterrows():
        locations_lst.append(
            reverse("{}, {}".format(row["latitude"], row["longitude"])).raw["address"]
        )
    logging.debug("geocode complete, merging...")
    locations = pd.DataFrame(locations_lst, index=stations.stationid).reset_index()
    locations = locations[["stationid", "neighbourhood", "suburb", "postcode"]]
    locations.rename(columns={"suburb": "boro", "postcode": "zipcode"}, inplace=True)
    locations = locations.astype("category")

    return stations.merge(locations, how="left", on="stationid")

In [None]:
def add_elevations(df: pd.DataFrame, lat_column="latitude", lon_column="longitude"):
    """Queries USGS Elevation Point Service to get elevation values

    :param df: dataframe with latitude and longitude
    :param lat_column:
    :param lon_column:
    :return: original df with new elevation column
    """
    elevations = []
    i = 0
    for lat, lon in zip(df[lat_column], df[lon_column]):
        i += 1
        logging.debug(f"Getting elevation {i} for ({lat}, {lon})")
        # define rest query params
        params = {"output": "json", "x": lon, "y": lat, "units": "Feet"}

        # format query string and return query value
        result = requests.get((url + urllib.parse.urlencode(params)))
        elevations.append(
            result.json()["USGS_Elevation_Point_Query_Service"]["Elevation_Query"][
                "Elevation"
            ]
        )

    df["elevation_ft"] = elevations
    return df

In [None]:
# merge yearly stations data, get capacity, get geodata, save
# TODO get elevation
stations = merge_stations()

In [None]:
stations = add_station_capacity(stations)

In [None]:
%%time
stations = add_station_geodata(stations)

In [None]:
%%time
stations = add_elevations(stations)

In [None]:
stations.to_csv(STATIONS_DIR + "stations" + ".csv")

In [None]:
stations["elevation_ft"] = stations["elevation_ft"].astype("str")
stations.to_parquet(STATIONS_DIR + "stations" + PARQUET_EXTENSION, engine="pyarrow")

In [None]:
# example: read stations (all stations seen across all years)
stations = pd.read_parquet(
    STATIONS_DIR + "stations" + PARQUET_EXTENSION, engine="pyarrow"
)
stations

# Fixing Boros

In [None]:
stations = pd.read_parquet(
    STATIONS_DIR + "stations" + PARQUET_EXTENSION, engine="pyarrow"
)
# drop stations without any geo data (coordiantes, boro, zip, etc...)
stations.drop(
    index=stations.loc[stations.boro.isna() & stations.zipcode.isna()].index.tolist(),
    inplace=True,
)

# drop station missing boro with zip in JC
stations.drop(
    index=stations.loc[stations.zipcode == "07311"].index.tolist(), inplace=True
)

# impute missing boros using zipcode
zip_boro_dic = {
    "11207": "Brooklyn",
    "11217": "Brooklyn",
    "11201": "Brooklyn",
    "11231": "Brooklyn",
    "11238": "Brooklyn",
    "11213": "Brooklyn",
    "11221": "Brooklyn",
    "11201-1832": "Brooklyn",
    "11237": "Brooklyn",
    "11205": "Brooklyn",
    "11251": "Brooklyn",
    "11227": "Brooklyn",
    "11222": "Brooklyn",
    "11216": "Brooklyn",
    "11220": "Brooklyn",
    "11215": "Brooklyn",
    "11209": "Brooklyn",
    "112321": "Brooklyn",  # possibly a typo, determined by other rows with same zip
    "11232": "Brooklyn",
    "10459": "The Bronx",
    "10456": "The Bronx",
    "10451": "The Bronx",
    "10457": "The Bronx",
}
stations = stations.astype({"boro": "string"})  # change type to allow string imputation
stations.boro = stations.boro.fillna(stations.zipcode.map(zip_boro_dic))

# merge queens county and queens
stations.loc[stations.boro == "Queens County", "boro"] = "Queens"

# overwrite parquet file
stations.to_parquet(STATIONS_DIR + "stations" + PARQUET_EXTENSION, engine="pyarrow")

stations

Unnamed: 0,stationid,stationname,latitude,longitude,capacity,neighbourhood,boro,zipcode,elevation_ft
0,455.0,1 Ave & E 44 St,40.750020,-73.969053,59.0,Turtle Bay,Manhattan,10017-6927,46.8
1,434.0,9 Ave & W 18 St,40.743174,-74.003664,60.0,Chelsea District,Manhattan,10019,15.9
2,491.0,E 24 St & Park Ave S,40.740964,-73.986022,,Manhattan Community Board 5,Manhattan,10010,34.87
3,384.0,Fulton St & Waverly Ave,40.683178,-73.965964,31.0,,Brooklyn,11238,78.1
4,474.0,5 Ave & E 29 St,40.745168,-73.986831,56.0,Midtown South,Manhattan,10035,41.55
...,...,...,...,...,...,...,...,...,...
1425,3685.0,Prospect Park - 5 Year Anniversary Celebration,40.660652,-73.964590,,,Brooklyn,11225,85.71
1426,3695.0,E 5 St & 2 Ave,40.726870,-73.989190,,East Village,Manhattan,10003,36.09
1427,3700.0,E 87 St & 3 Ave,40.779406,-73.953336,,Carnegie Hill,Manhattan,10028,79.35
1428,3805.0,E 80 St & Park Ave,40.776173,-73.959757,,Manhattan Community Board 8,Manhattan,10075,79.28


In [None]:
# confirm changes
stations = pd.read_parquet(
    STATIONS_DIR + "stations" + PARQUET_EXTENSION, engine="pyarrow"
)

In [None]:
stations.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1423 entries, 0 to 1429
Data columns (total 9 columns):
 #   Column         Non-Null Count  Dtype   
---  ------         --------------  -----   
 0   stationid      1423 non-null   float64 
 1   stationname    1423 non-null   object  
 2   latitude       1423 non-null   float64 
 3   longitude      1423 non-null   float64 
 4   capacity       1210 non-null   float64 
 5   neighbourhood  862 non-null    category
 6   boro           1423 non-null   string  
 7   zipcode        1419 non-null   category
 8   elevation_ft   1423 non-null   object  
dtypes: category(2), float64(4), object(2), string(1)
memory usage: 100.9+ KB


In [None]:
stations.boro.value_counts()

Manhattan      664
Brooklyn       422
The Bronx      176
Queens         159
Ville-Marie      2
Name: boro, dtype: Int64