# Read in and Preprocess Data

In [4]:
import pandas as pd
import numpy as np
import pickle
from datetime import datetime
import os
import json
from geopy import distance

raw_path = '/nfs/Projects/citibike/'
data_path = './data/'

## Read in the Data

In [8]:
# check if the processed file already exists
if os.path.exists('./citibike_original.h5'):
    print('Found hdf file! Un-marshalling...')
    
    # just load it instead of reading from CSV again
    df_whole = pd.read_hdf('./citibike_original.h5', key='df_whole')
    
else:
    # read from .csv files
    dtypes = {
            'Unnamed: 0': np.int32, 
            'bikeid': np.int32,
            'endstationlatitude': np.float64,
            'endstationlongitude': np.float64,
            'endstationname': 'str',
            'gender': np.int16,
            'startstationlatitude': np.float64,
            'startstationlongitude': np.float64,
            'startstationname': 'str',
            'starttime': 'str',
            'stoptime': 'str',
            'tripduration': np.int32,
            'usertype': 'str'
        }
    parse_dates = ['starttime', 'stoptime']

    # loading the whole thing into memory
    df_whole = pd.read_csv(raw_path + 'citibike.csv', dtype=dtypes, parse_dates=parse_dates)
    df_whole.drop(columns=['Unnamed: 0'])

    # clean the data
    # there are NaN in the startstationid and endstationid (the most important ones)
    df_whole.dropna(subset=['startstationid', 'endstationid'], inplace=True)

    df_whole = df_whole.astype({'startstationid': 'int32', 'endstationid':'int32'})
    df_whole.drop_duplicates(subset=['bikeid', 'startstationid', 'endstationid', 'starttime', 'stoptime'], inplace=True)

    # drop entries where starttime = endtime
    df_whole.drop(df_whole[df_whole.starttime == df_whole.stoptime].index, inplace=True)

    # Drop stations that have (lat, long) = (0, 0)
    df_whole = df_whole[(df_whole['startstationlatitude'] != 0) & (df_whole['endstationlatitude'] != 0)]

    # save the data to hdf5
    df_whole.to_hdf('citibike_original.h5', key='df_whole', mode='w')

### We calculcate a dictionary to hold the key attributes of stations

    stations_dict = {
        stationid: {
            index: (int)               // the index in tensors that each station id maps to
            is_alive: (bool)           // whether the station is still alive
                                          (dead stations will be excluded from prediction, but not from training)
            earliest: (datetime)       // the time of the earliest entry involving stationid
            latest: (datetime)         // the time of the latest entry involving stationid
            alive_time: (int)          // the duration this station is alive (in hours)
            
            lat: (float)               // latitude
            long: (float)              // longitude
        }
    }
    
    
<br/>

In [21]:
# get a list of the active stations from the official list
# this does not include downloading from the web
import json
import os

if os.path.exists(data_path + 'stations_dict.pickle'):
    # load from disk
    with (open(data_path + 'stations_dict.pickle', "rb")) as openfile:
        stations_dict = pickle.load(openfile)

else:
    os.system('rm ./stations.json')
    os.system('wget https://feeds.citibikenyc.com/stations/stations.json')

    with open('stations.json', 'r') as jsonfile:
        stations = json.load(jsonfile)['stationBeanList'] # stations contains the info for the stations

    alive_station_set = set()
    count = 0
    for i, station in enumerate(sorted(stations, key = lambda i: i['id'])):
        alive_station_set.add(station['id'])

    # find the earliest and the lastest entries of each station
    start_stations = df_whole.startstationid.unique()
    end_stations = df_whole.endstationid.unique()
    all_stations = np.union1d(start_stations, end_stations)

    df_grouped_start = df_whole.sort_values('starttime').groupby('startstationid')
    df_grouped_stop = df_whole.sort_values('stoptime').groupby('endstationid')

    start_first = df_grouped_start.first()['starttime']
    start_last = df_grouped_start.last()['starttime']
    stop_first = df_grouped_stop.first()['stoptime']
    stop_last = df_grouped_stop.last()['stoptime']

    start_pos = df_grouped_start.last()[['startstationlatitude', 'startstationlongitude']].to_dict()
    end_pos = df_grouped_stop.last()[['endstationlatitude', 'endstationlongitude']].to_dict()


    # build a dictionary of the station data
    stations_dict = {}
    for i, stationid in enumerate(sorted(all_stations)):
        info = {}

        info['index'] = i
        info['is_alive'] = stationid in alive_station_set

        # time info
        earliest_start = datetime.max
        earliest_stop = datetime.max
        latest_start = datetime.min
        latest_stop = datetime.min

        if stationid in start_first:
            earliest_start = start_first[stationid]
        if stationid in start_last:
            latest_start = start_last[stationid]
        if stationid in stop_first:
            earliest_stop = stop_first[stationid]
        if stationid in stop_last:
            latest_stop = stop_last[stationid]

        info['earliest'] = min(earliest_start, earliest_stop)
        info['latest'] = max(latest_start, latest_stop)
        info['alive_time'] = (info['latest'] - info['earliest']).total_seconds() / 3600
        if info['alive_time'] <= 0:
            info['alive_time'] = 0


        # geo info
        if stationid in start_pos['startstationlatitude']:
            info['lat'] = start_pos['startstationlatitude'][stationid]
            info['long'] = start_pos['startstationlongitude'][stationid]
        elif stationid in end_pos['endstationlatitude']:
            info['lat'] = end_pos['endstationlatitude'][stationid]
            info['long'] = end_pos['endstationlongitude'][stationid]

        stations_dict[stationid] = info

    # save to disk
    with open(data_path + 'stations_dict.pickle', 'wb') as handle:
        pickle.dump(stations_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)

## Station-Station Distance

In [9]:
station_distance = {} # pair wise distance using geopy.distance calculations
for startid in stations_dict.keys():
    
    origin = (stations_dict[startid]['lat'], stations_dict[startid]['long'])
    
    dist_temp = {}
    for endid in stations_dict.keys():
        
        destination = (stations_dict[endid]['lat'], stations_dict[endid]['long'])
        dist_temp[endid] = distance.distance(origin, destination).m
        
    station_distance[startid] = dist_temp
    
with open(data_path + 'station_distance.pickle', 'wb') as handle:
    pickle.dump(station_distance, handle, protocol=pickle.HIGHEST_PROTOCOL)

## Station-Station Average Ride Count (per day)

ride_count is a dictionary of dictionary that holds the average ride count for all stations

    ride_count = {
       startstationid: {
           'endstationid_1': count
           'endstationid_2': count
        }
    }


In [12]:
df_count = df_whole.groupby(['startstationid', 'endstationid']).count()['bikeid']

ride_count = {}
ride_accumulate = {}
for startid in stations_dict.keys():
    dct = {}
    for endid in stations_dict.keys(): 
        # count
        try:
            count = df_count[startid, endid]
        except:
            count = 0
            
        # interval
        if count == 0:
            dct[endid] = 0
        else:
            # note that we are using source station as the time reference
            if stations_dict[startid]['alive_time'] == 0:
                dct[endid] = 0
            else:
                dct[endid] = count / stations_dict[startid]['alive_time']
                
    ride_count[startid] = dct
    
with open(data_path + 'ride_count.pickle', 'wb') as handle:
    pickle.dump(ride_count, handle, protocol=pickle.HIGHEST_PROTOCOL)

## Station-Station Per-Hour Inbound/Outbound Correlation

there are two dictionaries for this

    in_hist = {
        'endstationid': {
            * influx from 00:00 to 01:00 * (if exists), 
            * influx from 01:00 to 02:00 *,
            ...
            * influx from 23:00 to 24:00 *
        }
     }
 
    out_hist = {
        'startstationid': {
            * inflow from 00:00 to 01:00 *,
            * inflow from 01:00 to 02:00 *,
            ...
            * inflow from 23:00 to 24:00 *
        }
    }

The values are the count of all the data from 2013-2019

In [14]:
df_whole_endindex = df_whole.set_index('stoptime')
df_group_end = df_whole_endindex.groupby(['endstationid', df_whole_endindex.index.hour]).count()['bikeid']

in_hist_count = {int(stationid): df_group_end.xs(stationid).to_dict() for stationid in df_group_end.index.levels[0]}

# divide the count by alive_time to get the average per hour
in_hist = {}
for endid in stations_dict.keys():
    if endid in in_hist_count:
        alive_time = stations_dict[endid]['alive_time']
        
        if alive_time != 0:
            in_hist[endid] = {v / alive_time for k, v in in_hist_count[endid].items()}
            
            
df_whole_startindex = df_whole.set_index('starttime')
df_group_start = df_whole_startindex.groupby(['startstationid', df_whole_startindex.index.hour]).count()['bikeid']

out_hist_count = {int(stationid): df_group_start.xs(stationid).to_dict() for stationid in df_group_start.index.levels[0]}

out_hist = {}
for startid in stations_dict.keys():
    if startid in out_hist_count:
        alive_time = stations_dict[startid]['alive_time']
        
        if alive_time != 0:
            out_hist[endid] = {v / alive_time for k, v in out_hist_count[startid].items()}

with open(data_path + 'in_hist.pickle', 'wb') as handle:
    pickle.dump(in_hist, handle, protocol=pickle.HIGHEST_PROTOCOL)
            
with open(data_path + 'out_hist.pickle', 'wb') as handle:
    pickle.dump(out_hist, handle, protocol=pickle.HIGHEST_PROTOCOL)

CPU times: user 27.6 s, sys: 7.65 s, total: 35.3 s
Wall time: 35.3 s


## Train-Test Data Split

In [None]:
# outflow
df_whole_startindex = df_whole.set_index('starttime')
df_out_sample_h = df_whole_startindex.groupby('startstationid').resample('H').count()['bikeid']

df_whole_endindex = df_whole.set_index('stoptime')
df_in_sample_h = df_whole_endindex.groupby('endstationid').resample('H').count()['bikeid']

time_range = pd.date_range(start='2013-06-01', end='2019-03-01', freq='H')[:-1]

# data is the dataframe that hold all the to-be training data
frame = {'timestamp': time_range, 'inflow': None, 'outflow': None}
data = pd.DataFrame(frame).set_index('timestamp')

num_stations = len(stations_dict)
for time_stamp in time_range:
    df_inflows = df_in_sample_h[:, time_stamp]
    df_outflows = df_out_sample_h[:, time_stamp]
    
    inflow = np.zeros(num_stations)
    outflow = np.zeros(num_stations)

    # populate inflow
    for i, val in df_inflows.items():
        inflow[stations_dict[i]['index']] = val
    
    for i, val in df_outflows.items():
        outflow[stations_dict[i]['index']] = val
        
    data.at[time_stamp] = inflow, outflow
    
with open(data_path + 'data.pickle', 'wb') as handle:
    pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
# Read data into huge `Data` lists.
if os.path.exists(data_path + 'train_data_list.pickle'):
    train_data_list = pickle.load(openfile)
    print(len(train_data_list))
    
else:
    # load in preprocessed inflow-outflow data
    with open(data_path + 'data.pickle', 'rb') as openfile:
        df_data = pickle.load(openfile)

    # read in the weather data
    with open(data_path + 'weather.pickle', 'rb') as openfile:
        df_weather = pickle.load(openfile)
        
    df_weather.drop(columns=['index'], inplace=True)
    
    # get the time stamp of the split
    break_point = df_data.index[-1] - pd.Timedelta(days=40)

    df_train_data = df_data[df_data.index < break_point]
    df_test_data = df_data[df_data.index >= break_point]
    
    # Train data
    train_data_list = []
    for i in range(len(df_train_data) - 1):
        feat = np.array([df_train_data.iloc[i]['inflow'], df_train_data.iloc[i]['outflow']]).T
        target = np.array([df_train_data.iloc[i + 1]['inflow'], df_train_data.iloc[i + 1]['outflow']]).T

        # this gets the time difference between the data[i] and the first entry in df_weather
        # this can be used as an index to retrieve the weather data for the prediction target
        index_in_weather = (df_train_data.index[i + 1] - df_train_data.index[0]).days
        weather_data = np.array(df_weather.iloc[index_in_weather].values)

        x = torch.FloatTensor(feat)
        y = torch.FloatTensor(target)
        target_weather = torch.FloatTensor(weather_data)

        train_data_list.append(Data(x=x, y=y, edge_index=edge_index, edge_weight=edge_weight, target_weather=target_weather))

    with open(data_path + 'train_data_list.pickle', 'wb') as handle:
        pickle.dump(train_data_list, handle, protocol=pickle.HIGHEST_PROTOCOL)


    # Test data
    test_data_list = []
    for i in range(len(df_test_data) - 1):
        feat = np.array([df_test_data.iloc[i]['inflow'], df_test_data.iloc[i]['outflow']]).T
        target = np.array([df_test_data.iloc[i + 1]['inflow'], df_test_data.iloc[i + 1]['outflow']]).T

        index_in_weather = (df_test_data.index[i + 1] - df_test_data.index[0]).days
        weather_data = np.array(df_weather.iloc[index_in_weather].values)

        x = torch.FloatTensor(feat)
        y = torch.FloatTensor(target)
        target_weather = torch.FloatTensor(weather_data)

        test_data_list.append(Data(x=x, y=y, edge_index=edge_index, edge_weight=edge_weight, target_weather=target_weather))

    with open(data_path + 'test_data_list.pickle', 'wb') as handle:
        pickle.dump(test_data_list, handle, protocol=pickle.HIGHEST_PROTOCOL)

## Weather Data

In [214]:
df_weather = pd.read_csv('weather.csv')
df_weather = df_weather.loc[df_weather['REPORT_TYPE'] == 'SOD  ']

keep_column_list = [
    'DATE',
    'DailyAverageDryBulbTemperature',
    'DailyAverageRelativeHumidity',
    'DailyAverageStationPressure',
    'DailyCoolingDegreeDays',
    'DailyDepartureFromNormalAverageTemperature',
    'DailyHeatingDegreeDays',
    'DailyMaximumDryBulbTemperature',
    'DailyMinimumDryBulbTemperature',
    'DailyPrecipitation',
    'DailySnowDepth',
    'DailySnowfall'
]
df_weather = df_weather[keep_column_list]
df_weather = df_weather.replace({'T': 0.0})
df_weather = df_weather.astype({'DATE': 'str', 'DailyPrecipitation': 'float64', 'DailySnowDepth': 'float64', 'DailySnowfall': 'float64'})

In [215]:
norm_columns = [
    'DailyAverageDryBulbTemperature',
    'DailyAverageRelativeHumidity',
    'DailyAverageStationPressure',
    'DailyCoolingDegreeDays',
    'DailyDepartureFromNormalAverageTemperature',
    'DailyHeatingDegreeDays',
    'DailyMaximumDryBulbTemperature',
    'DailyMinimumDryBulbTemperature',
    'DailyPrecipitation',
    'DailySnowDepth',
    'DailySnowfall'
]

normalized_df_weather = df_weather

# normalizing
normalized_df_weather[norm_columns] = (df_weather[norm_columns] - df_weather[norm_columns].mean()) / df_weather[norm_columns].std()

# There are missing dates in the weather data (2013-12-31)
# We get the dummy data from interpolating
normalized_df_weather = normalized_df_weather.reset_index(drop=True)
normalized_df_weather['DATE'] = pd.to_datetime(normalized_df_weather['DATE'])
normalized_df_weather.set_index('DATE', inplace=True)

counter = 0
for i in normalized_df_weather.index:
    if counter != (i - pd.Timestamp('2013-06-01 00:00:00')).days:
        # insert the missing date with NaN values (for future interpolation)
        ts = pd.Timestamp('2013-06-01 23:59:00') + pd.Timedelta(days=counter)
        
        print('missing date: ', ts)
        
        new_row = pd.DataFrame(index=[ts]) 
        normalized_df_weather = pd.concat([normalized_df_weather, new_row], ignore_index=False, sort=False)
        
        counter = (i - pd.Timestamp('2013-06-01 00:00:00')).days
        
    counter += 1
    
normalized_df_weather.sort_index(inplace=True)
normalized_df_weather = normalized_df_weather.interpolate()

# turn day of week into one-hot encoding
normalized_df_weather = normalized_df_weather.reset_index()
day_name = normalized_df_weather['index'].dt.day_name()
df_dayname_dummies = pd.get_dummies(pd.Categorical(day_name), prefix='DoW_')
normalized_df_weather = pd.concat([normalized_df_weather, df_dayname_dummies], axis=1)

normalized_df_weather = normalized_df_weather.fillna(0)

with open(data_path + 'weather.pickle', 'wb') as handle:
    pickle.dump(normalized_df_weather, handle, protocol=pickle.HIGHEST_PROTOCOL)

missing date:  2013-12-31 23:59:00
missing date:  2014-12-31 23:59:00


## Important Stations

In [10]:
df_tops = df_whole.groupby(['startstationid', 'endstationid']).count().sort_values(['bikeid'], ascending=False).head(10)

In [18]:
top_stations = set()

for index in df_tops.index:
    top_stations.add(index[0])
    top_stations.add(index[1])
    
    if len(top_stations) >= 10:
        break

top_station_index = []
# convert stationid into node index
for stationid in top_stations:
    top_station_index.append()

10

In [20]:
with open(data_path + 'stations_dict.pickle', 'rb') as openfile:
    stations_dict = pickle.load(openfile)

EOFError: Ran out of input