# data collection and processing

## goals
- collect data from the data sources of interest
    - capital bikeshare: https://s3.amazonaws.com/capitalbikeshare-data/index.html
    - station capacity: https://maps2.dcgis.dc.gov/dcgis/rest/services/DCGIS_DATA/Transportation_Bikes_Trails_WebMercator/MapServer/5/query?outFields=*&where=1%3D1&f=geojson
    - weather: https://www.ncei.noaa.gov/access/services/data/v1
- process data to calculate bike availability
- combine data sources and engineer features

## Capital bikeshare

Collect data from 2018-2019, as well as the end of 2017 to get a starting point for bike capacity at each station.

In [2]:
# packages
from selenium import webdriver
from selenium.webdriver.common.by import By
import zipfile
import pandas as pd
from pandas.tseries.holiday import USFederalHolidayCalendar as calendar
import numpy as np
import time
import os
import re
import requests

In [24]:
## string constants
bike_share_url = 'https://s3.amazonaws.com/capitalbikeshare-data/index.html'
header_xpath = '//*[@id="tbody-content"]'
pattern = r'^\d{4}.*\.csv$'
new_pattern = r'^\d{6}.*\.csv$'
old_pattern= r'^\d{4}[A-Za-z]\d.*\.csv$'

### collecting the raw data

The code is flexible enough to account for different years that the user can customize. Note that the fundamental structure of the data changes from March 2020 to May 2020; while the data collection code is unaffected by this, the processing code would require adjustments to manipulate this data into the expected format for the analysis.

In [5]:
os.mkdir('raw_data')

In [6]:
chrome_options = webdriver.ChromeOptions()
prefs = {
    "download.default_directory": raw_data_path,  # Set download directory
    "download.prompt_for_download": False,  # Disable download prompt
    "download.directory_upgrade": True,
    "safebrowsing.enabled": True,  # Disable safe browsing
}
chrome_options.add_experimental_option("prefs", prefs)

driver = webdriver.Chrome(options=chrome_options)

driver.get(bike_share_url)

time.sleep(5)

zip_links = driver.find_element(By.XPATH, header_xpath)
urls = []

for z in zip_links.find_elements(By.TAG_NAME, 'a'):
    urls.append(z.get_attribute('href'))

for z in urls:
    try:
        result = int(z.split('/')[-1][:4])
        if result >= 2017 and result < 2020:
            driver.get(z)
            time.sleep(5)
    except:
        pass

time.sleep(30)
driver.quit()
time.sleep(10)

In [None]:

for file_name in os.listdir(raw_data_path):
    zip_path = os.path.join(os.getcwd(), 'raw_data', file_name)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        for file in zip_ref.namelist():
            if bool(re.match(pattern, file)):
                zip_ref.extract(file, raw_data_path)
        print(f'Extracted files from {zip_path}')
    os.remove(zip_path)

In [25]:
def read_bike_data(path):
    data = pd.read_csv(
        path,
        usecols=['Bike number', 'Member type', 'Start date', 'End date', 'Start station number', 'End station number', 'Start station', 'End station'],
        parse_dates=['Start date', 'End date'],
    )
    return data

In [28]:
old_raw_data = read_bike_data(os.path.join('raw_data', sorted([f for f in os.listdir(raw_data_path) if bool(re.match(old_pattern, f))], reverse=True)[0])).dropna(ignore_index=True)
old_raw_data.columns = ["_".join(col.split(" ")).lower() for col in old_raw_data.columns]
raw_data = pd.concat([read_bike_data(os.path.join('raw_data', f)) for f in os.listdir(raw_data_path) if bool(re.match(new_pattern, f))], axis=0).dropna(ignore_index=True)
raw_data.columns = ["_".join(col.split(" ")).lower() for col in raw_data.columns]

In [29]:
column_rename_dict = {
    'start_station_number' : 'station', 
    'end_station_number' : 'station',
    'start_station' : 'name',
    'end_station' : 'name'
}
stations = pd.concat(
    [
        raw_data[['start_station_number', 'start_station']].rename(columns=column_rename_dict),
        raw_data[['end_station_number', 'end_station']].rename(columns=column_rename_dict)
    ],
    axis=0
).drop_duplicates()
stations_map = dict(zip(stations['station'], stations['name']))

### reshuffling

Idea: bikes transition from one station to another without a user taking the trip (this is done by contractors who 'reshuffle' vans from one location to the next). To account for this, we need to add rows that account for this reshuffling in order to get a more accurate count of where bikes are and when they're being relocated in order to calculate availability.

In [30]:
def generate_reshuffle_data(raw_data):

    print('Filtering and sorting initial data...')
    data = raw_data.copy().query('end_date > start_date')
    data.sort_values(by=['bike_number', 'start_date'], inplace=True, ignore_index=True)

    ## reshaping data
    print('Reshaping data to long format...')
    data['tmp_id'] = data.groupby('bike_number').cumcount()
    data['begin'] = data['start_date'].astype(str) + ',' + data['start_station_number'].astype(str)
    data['end'] = data['end_date'].astype(str) + ',' + data['end_station_number'].astype(str)
    data_long = data.melt(id_vars=['tmp_id','bike_number'], value_vars=['begin', 'end'], var_name='event')
    tmp = data_long.pop('value').str.split(',', expand=True).rename(columns={0:'timestamp', 1:'station'})
    tmp['timestamp'] = pd.to_datetime(tmp['timestamp'])
    tmp['station'] = tmp['station'].astype(int)
    data_long = pd.concat([data_long, tmp], axis=1)

    ## preparing data for reshuffling
    print('Sorting data to identify reshuffling...')
    data_long.sort_values(by=['bike_number', 'tmp_id', 'timestamp'], inplace=True, ignore_index=True)
    data_long['next_station'] = data_long.groupby('bike_number')['station'].shift(-1)
    data_long['previous_station'] = data_long.groupby('bike_number')['station'].shift(1)
    data_long['reshuffle'] = False

    ## identifying reshuffling
    print('Identifying reshuffling...')
    reshuffle_tmp = data_long.copy().query('(event=="begin" and previous_station.notnull() and station!=previous_station) or (event=="end" and next_station.notnull() and station!=next_station)')
    reshuffle_tmp['event'] = np.where(reshuffle_tmp['event'] == 'end', 'begin', 'end')
    reshuffle_tmp['reshuffle'] = True
    reshuffle_tmp.rename(columns={'timestamp' : 'temp_timestamp'}, inplace=True)

    ## generating fake timestamps for reshuffling
    ## assumption: for gap in between riding events, reshuffling occurs
        ## bike is reshuffled away from end station at 1/3 of time between end of ride and beginning of ride
        ## bike is reshuffled to begin station at 2/3 of time between end of ride and beginning of ride
    print('Processing reshuffling...')
    reshuffle_tmp['next_timestamp'] = reshuffle_tmp.groupby('bike_number')['temp_timestamp'].shift(-1)
    reshuffle_tmp['previous_timestamp'] = reshuffle_tmp.groupby('bike_number')['temp_timestamp'].shift(1)
    reshuffle_tmp['calc_timestamp'] = np.where(reshuffle_tmp['event'] == 'begin', reshuffle_tmp['next_timestamp'], reshuffle_tmp['previous_timestamp'])
    reshuffle_tmp['total_seconds_elapsed'] = (reshuffle_tmp['temp_timestamp'] - reshuffle_tmp['calc_timestamp']).dt.total_seconds().abs()
    reshuffle_tmp['timestamp'] = np.where(
        reshuffle_tmp['event'] == 'begin', 
        reshuffle_tmp['temp_timestamp'] + pd.to_timedelta(reshuffle_tmp['total_seconds_elapsed'] // 3, unit='s'),
        reshuffle_tmp['temp_timestamp'] - pd.to_timedelta(reshuffle_tmp['total_seconds_elapsed'] // 3, unit='s')
    )
    

    ## combining raw and reshuffled data
    print('Combining raw and reshuffled data...')
    reshuffle_cols = ['bike_number', 'event', 'timestamp', 'station', 'reshuffle']
    data_long_comb = pd.concat([data_long[reshuffle_cols], reshuffle_tmp[reshuffle_cols]], axis=0)
    data_long_comb.sort_values(by=['bike_number', 'timestamp'], inplace=True, ignore_index=True)
    print('Done.')
    return data_long_comb

def generate_start_point(data):
    start_point = data[['station']].drop_duplicates()
    start_point['date'] = pd.to_datetime('2017-12-31')
    start_point['hour'] = 23
    last_arrival = data.iloc[data.groupby('bike_number')['timestamp'].idxmax(), :].query('event=="end"').groupby('station', as_index=False)['bike_number'].count().rename(columns={'bike_number' : 'arrivals'})
    start_point = start_point.merge(last_arrival, on='station', how='left').fillna(0)
    start_point['arrivals'] = start_point['arrivals'].astype(int)
    start_point['departures'] = 0
    return start_point

def generate_aggregate_data(data, fill_na_value=0):
    combined_columns = ['station', 'date', 'hour']
    def agg_bike_data(data, begin=True, combined_columns=combined_columns):
        query_filter = 'begin' if begin else 'end'
        col_name = 'departures' if begin else 'arrivals'
        agg_data = (
            data.query(f"event=='{query_filter}'")
            .assign(
                date=lambda df: df['timestamp'].dt.date,
                hour=lambda df: df['timestamp'].dt.hour
            )
            .groupby(combined_columns, as_index=False)['bike_number'].count()
            .rename(columns={'bike_number' : col_name})
        )
        return agg_data
    departures = agg_bike_data(data, begin=True)
    arrivals = agg_bike_data(data, begin=False)
    combined = pd.merge(departures, arrivals, how='outer', on=combined_columns).sort_values(by=combined_columns, ignore_index=True).fillna(0)
    combined['date'] = pd.to_datetime(combined['date'])
    return combined

def process_data(data, start_point=False):
    data = generate_reshuffle_data(data)
    if start_point:
        data = generate_start_point(data)
    else:
        data = generate_aggregate_data(data)
    return data

In [None]:
start_data = process_data(old_raw_data, start_point=True)
agg_data = process_data(raw_data, start_point=False)

In [32]:
combined_data = pd.concat([start_data, agg_data], axis=0).sort_values(by=['station', 'date', 'hour'], ignore_index=True)
combined_data['start_ind'] = combined_data['date'].dt.year == 2017

## capacity data

To get relative capacity, need to get the size of the bike stations as well as their locations.

In [33]:
station_url = 'https://maps2.dcgis.dc.gov/dcgis/rest/services/DCGIS_DATA/Transportation_Bikes_Trails_WebMercator/MapServer/5/query?outFields=*&where=1%3D1&f=geojson'
response = requests.get(station_url)
response_dict = response.json() # json.loads(response.text) 

In [34]:
def extract_feature_data(d):
    return dict(
        name=d['properties']['NAME'],
        capacity=d['properties']['CAPACITY'],
        latitude=d['properties']['LATITUDE'],
        longitude=d['properties']['LONGITUDE']
    )

In [35]:
stations = pd.DataFrame([extract_feature_data(d) for d in response_dict['features']])

## weather data

Get weather data information for the dates of interest at the daily level (finest grain of detail available).

In [36]:
noaa_url = 'https://www.ncei.noaa.gov/access/services/data/v1'

In [37]:
def get_date_range(data=combined_data, date_format="%Y-%m-%d"):
    agg_data = data['date'].agg(['min', 'max']).to_dict()
    results_dict = {
        'startDate' : agg_data["min"].strftime(date_format),
        'endDate' : agg_data["max"].strftime(date_format)
    }
    return results_dict
def create_noaa_payload(
        datatypes=['TAVG', 'TMAX', 'TMIN', 'PRCP', 'SNOW'], ## types of weather data
        date_data=agg_data, dataset='daily-summaries', ## table to pull data from
        stations='USW00013743', ## closest station to DC (Ronald Reagon Airport)
        output_format='csv' ## desired output
):
    payload = {**get_date_range(date_data)}
    payload['dataTypes'] = ','.join([c.upper() for c in datatypes])
    payload['dataset'] = dataset
    if isinstance(stations, str):
        stations = [stations]
    payload['stations'] = ','.join(stations)
    payload['format'] = output_format
    return payload
def create_noaa_url(url=noaa_url, **kwargs):
    payload = create_noaa_payload(**kwargs)
    query_string = '&'.join([f'{k}={v}' for k, v in payload.items()])
    query_url = f'{url}?{query_string}'
    return query_url
def process_weather_data(raw_string):
    raw_result = iter(raw_string.text.strip().split('\n'))
    headers = next(raw_result)
    headers = [h.replace('"', '').strip().lower() for h in headers.split(',')[1:]]
    result_lst = []
    for row in raw_result:
        row = iter(row.split(','))
        next(row)
        row_dict = {headers[i] : v.replace('"', '').strip() for i, v in enumerate(row)}
        result_lst.append(row_dict)
    result_df = pd.DataFrame(result_lst)
    result_df['date'] = pd.to_datetime(result_df['date'])
    final_result = pd.concat([result_df['date'], result_df.select_dtypes(object).astype(int).div(10)], axis=1)
    return final_result

In [None]:
r = requests.get(create_noaa_url())
weather_data = process_weather_data(r)

## holidays

Add federal holidays to see if there is special behavior on these specific dates.

In [81]:
# adding dates onto the dataframe - federal holidays
cal = calendar()
holidays = cal.holidays(start= combined_data['date'].min(), end=combined_data['date'].max())

## combining all the data

In [88]:
combined_data['name'] = combined_data['station'].map(stations_map)

In [89]:
merged_data = combined_data.merge(stations, on='name', how='inner').merge(weather_data, on='date').query("date<'2020-01-01'")
merged_data['holiday'] = merged_data['date'].isin(holidays).astype(int)
merged_data.sort_values(['station', 'date', 'hour'], inplace=True, ignore_index=True)

In [96]:
merged_data.to_parquet('../data/processed_data.parquet', index=False)

## removing raw data

In [None]:
print('Cleaning up raw data folder.')
for dirpath, subdirs, filenames in os.walk(raw_data_path):
    for file in filenames:
        os.remove(os.path.join(dirpath, file))
        print('Removed {file}')
    os.rmdir(dirpath)
    print('Removed {dirpath}')
print('Removed raw data folder and contents.')