In [1]:
import pandas as pd
import numpy as np
import json
from joblib import Parallel, delayed
from glob import glob
from datetime import datetime
import pickle
pickle.HIGHEST_PROTOCOL = 4

In [2]:
with open("raw_data_201306_201709/station_information.json", "r") as jsonfile:
    station_info = json.load(jsonfile)['data']['stations']
station_info = pd.json_normalize(station_info)[['station_id', 'name', 'lat', 'lon', ]]
station_info = station_info.astype({"station_id": np.int16, "lat": np.float32, "lon": np.float32})
station_info.rename(columns={'station_id':'stationid'}, inplace=True)

In [3]:
def read_citibike(path):
    
    dtype = {
        "tripduration": np.int32,
        "startstationlatitude": np.float32,
        "startstationlongitude": np.float32,
        "endstationlatitude": np.float32,
        "endstationlongitude": np.float32,
        "bikeid": np.int32,
        "gender": np.int8,
    }

    names = [
        "tripduration",
        "starttime",
        "stoptime",
        "startstationid",
        "startstationname",
        "startstationlatitude",
        "startstationlongitude",
        "endstationid",
        "endstationname",
        "endstationlatitude",
        "endstationlongitude",
        "bikeid",
        "usertype",
        "birthyear",
        "gender",
    ]

    df = pd.read_csv(
        path,
        header=0,
        names=names,
        dtype=dtype,
        engine="c",
        parse_dates=["starttime", "stoptime"],
    )
    df.dropna(subset=["startstationid", "endstationid"], inplace=True)
    df = df.astype({"startstationid": np.int16, "endstationid": np.int16})
    df.drop_duplicates(
        subset=["bikeid", "startstationid", "endstationid", "starttime", "stoptime",],
        inplace=True,
    )
    df.drop(columns=['bikeid', 'birthyear'], inplace=True)
    df.query(
        "starttime < stoptime",
        inplace=True,
    )

    return df

In [4]:
df_whole = Parallel(n_jobs=-1, verbose=50, backend="loky")(delayed(read_citibike)(path) for path in glob("raw_data_201306_201709/*trip*.csv"))
df_whole = pd.concat(df_whole)

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 32 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:    3.9s
[Parallel(n_jobs=-1)]: Done   2 out of  51 | elapsed:    4.3s remaining:  1.7min
[Parallel(n_jobs=-1)]: Done   4 out of  51 | elapsed:    6.0s remaining:  1.2min
[Parallel(n_jobs=-1)]: Done   6 out of  51 | elapsed:    6.7s remaining:   50.4s
[Parallel(n_jobs=-1)]: Done   8 out of  51 | elapsed:    7.3s remaining:   39.2s
[Parallel(n_jobs=-1)]: Done  10 out of  51 | elapsed:    8.3s remaining:   33.9s
[Parallel(n_jobs=-1)]: Done  12 out of  51 | elapsed:    9.4s remaining:   30.5s
[Parallel(n_jobs=-1)]: Done  14 out of  51 | elapsed:   10.0s remaining:   26.5s
[Parallel(n_jobs=-1)]: Done  16 out of  51 | elapsed:   11.2s remaining:   24.5s
[Parallel(n_jobs=-1)]: Done  18 out of  51 | elapsed:   12.4s remaining:   22.8s
[Parallel(n_jobs=-1)]: Done  20 out of  51 | elapsed:   13.8s remaining:   21.4s
[Parallel(n_jobs=-1)]: Done  22 out of  51 | elapse

In [5]:
# drop records that are test
startstationid = df_whole.startstationid.unique()
endstationid = df_whole.endstationid.unique()

print("---------before---------")
print(f"Total startstationid: {len(startstationid)}")
print(f"Total endstationid: {len(endstationid)}")
    
df_whole.query("startstationlatitude != 0 & startstationlongitude != 0 & endstationlatitude != 0 & endstationlongitude != 0", inplace=True,)

startstationid = df_whole.startstationid.unique()
endstationid = df_whole.endstationid.unique()

print("---------after---------")
print(f"Total startstationid: {len(startstationid)}")
print(f"Total endstationid: {len(endstationid)}")

---------before---------
Total startstationid: 767
Total endstationid: 804
---------after---------
Total startstationid: 764
Total endstationid: 799


In [6]:
# drop the stations that are at NJC or test station
startstationid = df_whole.startstationid.unique()
endstationid = df_whole.endstationid.unique()

print ("---------before---------")
print (f"Total startstationid: {len(startstationid)}")
print (f"Total endstationid: {len(endstationid)}")
    
pop_id = endstationid[~np.isin(endstationid, startstationid)]
df_whole.query(
    "startstationid not in @pop_id & endstationid not in @pop_id", inplace=True
)

startstationid = df_whole.startstationid.unique()
endstationid = df_whole.endstationid.unique()

print ("---------after---------")
print (f"Total startstationid: {len(startstationid)}")
print (f"Total endstationid: {len(endstationid)}")

---------before---------
Total startstationid: 764
Total endstationid: 799
---------after---------
Total startstationid: 764
Total endstationid: 764


In [7]:
lost_station = pd.concat([df_whole[['startstationid', 'startstationname', 'startstationlatitude', 'startstationlongitude']].drop_duplicates(subset='startstationid', keep="last").rename(columns={'startstationid':'stationid', 'startstationname':'name', 'startstationlatitude':'lat','startstationlongitude':'lon'}), df_whole[['endstationid', 'endstationname', 'endstationlatitude', 'endstationlongitude']].drop_duplicates(subset='endstationid', keep="last").rename(columns={'endstationid':'stationid', 'endstationname':'name', 'endstationlatitude':'lat','endstationlongitude':'lon'})])
lost_station = (
    lost_station
    .drop_duplicates(subset='stationid', keep="last")
    .query('stationid not in @station_info.stationid')
)
lost_station.head(5)

Unnamed: 0,stationid,name,lat,lon
1374071,3488,8D QC Station 01,45.506363,-73.569466
1254423,3266,Kiosk in a box Deployment,40.708611,-73.928505
1348812,3485,NYCBS Depot - RIS,40.725208,-73.974724
1811965,3567,11 St & 35 Ave,40.762745,-73.939117
1866243,3556,24 St & 41 Ave,40.752708,-73.939743


In [8]:
station_info = (
    pd.concat([station_info, lost_station])
    .query('stationid in @startstationid | stationid in @endstationid')
    .sort_values("stationid")
    .reset_index(drop=True)
)
len(station_info)

764

In [9]:
# generate station info
starttime = df_whole.groupby("startstationid")["starttime"].min().dt.floor("D").values
stoptime = df_whole.groupby("endstationid")["stoptime"].min().dt.floor("D").values
station_info["earliest"] = np.where(starttime < stoptime, starttime, stoptime)

starttime = df_whole.groupby("startstationid")["starttime"].max().dt.floor("D").values
stoptime = df_whole.groupby("endstationid")["stoptime"].max().dt.floor("D").values
station_info["latest"] = np.where(starttime > stoptime, starttime, stoptime)

# Weather

In [10]:
usecols = [
    "DATE",
    "HourlyDryBulbTemperature",
    "HourlyPrecipitation",
    "HourlyRelativeHumidity",
    "HourlyWindSpeed",
]
weather = pd.read_csv("raw_data_201306_201709/weather.csv", parse_dates=["DATE"], usecols=usecols)

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


In [11]:
# 選擇要的column並對na補值
weather = (
    weather.assign(
        DATE=weather.DATE.dt.ceil("H"),
        HourlyDryBulbTemperature=pd.to_numeric(
            weather.HourlyDryBulbTemperature, errors="coerce", downcast="float"
        ),
        HourlyPrecipitation=pd.to_numeric(
            weather.HourlyPrecipitation, errors="coerce", downcast="float"
        ),
        HourlyRelativeHumidity=pd.to_numeric(
            weather.HourlyRelativeHumidity, errors="coerce", downcast="float"
        ),
    )
    .groupby("DATE", as_index=False)
    .mean()
    .fillna(method="ffill")
    .rename(columns={"DATE": "time"})
)

In [12]:
df_whole.to_hdf("process_data_201306_201709/citibike_raw.h5", key="raw", mode="w")
station_info.to_hdf('process_data_201306_201709/citibike_raw.h5', key="info", mode="r+")
weather.to_hdf('process_data_201306_201709/citibike_raw.h5', key="weather", mode="r+")

your performance may suffer as PyTables will pickle object types that it cannot
map directly to c-types [inferred_type->mixed,key->block5_values] [items->Index(['startstationname', 'endstationname', 'usertype'], dtype='object')]

  pytables.to_hdf(
