## Import packages & dependencies

In [1]:
#!usr/bin/env python3
import os
import shutil
from zipfile import ZipFile 
import pandas as pd
import numpy as np
import dateutil #https://www.shanelynn.ie/summarising-aggregation-and-grouping-data-in-python-pandas/
from datetime import datetime
import glob
import time
import gc  #garbage collection to free up memory

# Hide warning messages in notebook
import warnings
warnings.filterwarnings('ignore')


In [2]:
#set some static parameters
debug_mode = 'y'
csv_header_ind = 'True'
cur_dir = os.getcwd()


## Load Data

In [3]:
for i in range(2013,2021):
    data_year = i
    print(data_year)

2013
2014
2015
2016
2017
2018
2019
2020


In [4]:
data_year = 9999

In [5]:
# print start timestamp 
execStartDateTime = datetime.now()
print(execStartDateTime)

2020-06-19 21:02:04.351021


In [6]:
raw_dir = os.path.join(cur_dir,'citibike_files','raw', str(data_year))

In [7]:
path = raw_dir

all_files = sorted(glob.glob(os.path.join(path, "*citibike*")))

all_df = []

#parse_dates=['starttime','stoptime'],

for f in all_files:
    try: 
        print('try:' + f)
        df = pd.read_csv(f, sep=',',header = 1,skiprows = 1,\
        names=['tripduration','starttime','stoptime','start station id','start station name'\
        ,'start station latitude','start station longitude','end station id','end station name'\
        ,'end station latitude','end station longitude','bikeid','usertype','birth year','gender'])
        print(len(df.index))
        df['a_file'] = f.split('/')[-1]    
        all_df.append(df)
        citibike_df = pd.concat(all_df, ignore_index=True, sort=True)
    except:
         print('except:' + f)

try:/Users/Werd/boot_camp/gitlib/tableau-challenge/citibike_files/raw/2013/201306-citibike-tripdata.csv
577701
try:/Users/Werd/boot_camp/gitlib/tableau-challenge/citibike_files/raw/2013/201307-citibike-tripdata.zip
843414
try:/Users/Werd/boot_camp/gitlib/tableau-challenge/citibike_files/raw/2013/201308-citibike-tripdata.zip
1001956
try:/Users/Werd/boot_camp/gitlib/tableau-challenge/citibike_files/raw/2013/201309-citibike-tripdata.zip
1034357
try:/Users/Werd/boot_camp/gitlib/tableau-challenge/citibike_files/raw/2013/201310-citibike-tripdata.zip
1037710
try:/Users/Werd/boot_camp/gitlib/tableau-challenge/citibike_files/raw/2013/201311-citibike-tripdata.zip
675772
try:/Users/Werd/boot_camp/gitlib/tableau-challenge/citibike_files/raw/2013/201312-citibike-tripdata.zip
443964


## Pre-processing: Preview data & datatype inspection

In [8]:
list(citibike_df.columns) 

['a_file',
 'bikeid',
 'birth year',
 'end station id',
 'end station latitude',
 'end station longitude',
 'end station name',
 'gender',
 'start station id',
 'start station latitude',
 'start station longitude',
 'start station name',
 'starttime',
 'stoptime',
 'tripduration',
 'usertype']

In [9]:
citibike_df.dtypes

a_file                      object
bikeid                       int64
birth year                  object
end station id             float64
end station latitude       float64
end station longitude      float64
end station name            object
gender                       int64
start station id             int64
start station latitude     float64
start station longitude    float64
start station name          object
starttime                   object
stoptime                    object
tripduration                 int64
usertype                    object
dtype: object

In [10]:
# set text columns as categories
for col in ['gender', 'usertype', 'start station name', 'end station name']:
    citibike_df[col] = citibike_df[col].astype('category')

In [13]:
# set datatypes for numeric columns
citibike_df['start station id'] = citibike_df['start station id'].astype(str).astype(float).astype(int)
citibike_df['start station latitude'] = citibike_df['start station latitude'].astype(float)
citibike_df['start station latitude'] = citibike_df['start station latitude'].round(decimals=3)
citibike_df['start station longitude'] = citibike_df['start station longitude'].astype(float)
citibike_df['start station longitude'] = citibike_df['start station longitude'].round(decimals=3)
citibike_df = citibike_df.dropna(subset=['end station id'])
citibike_df['end station id'] = citibike_df['end station id'].astype(str).astype(float).astype(int)
citibike_df['end station latitude'] = citibike_df['end station latitude'].astype(float)
citibike_df['end station latitude'] = citibike_df['end station latitude'].round(decimals=3)
citibike_df['end station longitude'] = citibike_df['end station longitude'].astype(float)
citibike_df['end station longitude'] = citibike_df['end station longitude'].round(decimals=3)

In [14]:
citibike_df.dtypes

a_file                       object
bikeid                        int64
birth year                   object
end station id                int64
end station latitude        float64
end station longitude       float64
end station name           category
gender                     category
start station id              int64
start station latitude      float64
start station longitude     float64
start station name         category
starttime                    object
stoptime                     object
tripduration                  int64
usertype                   category
dtype: object

In [None]:
citibike_df.head()

In [None]:
citibike_df['birth year'].value_counts()

In [None]:
# Using try block here since data files were not consistent over time
try:
    if pd.api.types.is_string_dtype:
        citibike_df['birth year'] = citibike_df['birth year'].replace({"\\N":2020})
except:
    print("skip")

In [None]:
citibike_df['birth year'].fillna(2020,inplace=True)

In [None]:
# Now that all fields are prepped drop nans in dataframe.  This is slow.
citibike_df.dropna(inplace=True)

In [None]:
# Set birth year datatype the nans dropped
citibike_df['birth year'] = citibike_df['birth year'].astype(str).astype(float).astype(int)

In [None]:
citibike_df.isnull().sum(axis=0)

In [None]:
#stamp the output files yearmonth to track the source of the data
citibike_df['yearmonth'] =  citibike_df['a_file'].str[:6].astype(int)

## Analyze by date and starthour

In [None]:
citibike_df.dtypes

In [None]:
citibike_df[['begindate','begintime']] = citibike_df.starttime.str.split(expand=True) 

In [None]:
# Possible optimzation:  https://stackoverflow.com/questions/50744369/how-to-speed-up-pandas-string-function
# %timeit [x.split('~', 1)[0] for x in df['facility']]
# def splittime(x):
#     test = [x.split(' ', 1)[0] for x in citibike_df['starttime']]
#     return x.map(test)
# citibike_df['test2'] = splittime(citibike_df['starttime'])
# TypeError: list indices must be integers or slices, not str   

In [None]:
# https://github.com/pandas-dev/pandas/issues/11665
def lookup(s):
    """
    This is an extremely fast approach to datetime parsing.
    For large data, the same dates are often repeated. Rather than
    re-parse these, we store all unique dates, parse them, and
    use a lookup to convert all dates.
    """
    dates = {date:pd.to_datetime(date) for date in s.unique()}
    return s.map(dates)

In [None]:
citibike_df['startdate'] = lookup(citibike_df['begindate'])

In [None]:
citibike_df.head()

In [None]:
citibike_df['starthour'] = citibike_df['begintime'].str.slice(0, 2)

In [None]:
daily_df = citibike_df.groupby(['startdate']).tripduration.agg(['count','sum']).reset_index().set_index(['startdate'])
daily_df

In [None]:
citibike_daily_bike_csv = os.path.join(cur_dir,'citibike_files','cleansed','citibike_trips_daily.csv')

In [None]:
if debug_mode == 'n':
    if not os.path.isfile(citibike_daily_bike_csv):
       daily_df.to_csv(citibike_daily_bike_csv, header='column_names')
    else: # else it exists so append without writing the header
       daily_df.to_csv(citibike_daily_bike_csv, mode='a', header=False)

In [None]:
# Extend analysis tostart hour
hourly_df = citibike_df.groupby(['startdate','starthour']).tripduration.agg(['count','sum']).reset_index()
hourly_df.set_index('startdate', inplace = True) 
hourly_df.head()

In [None]:
citibike_hourly_csv = os.path.join(cur_dir,'citibike_files','cleansed','citibike_trips_hourly.csv')

In [None]:
if debug_mode == 'n':
    if not os.path.isfile(citibike_hourly_csv):
       hourly_df.to_csv(citibike_hourly_csv, header='column_names')
    else: # else it exists so append without writing the header
       hourly_df.to_csv(citibike_hourly_csv, mode='a', header=False)

## Analyze customer data

In [None]:
citibike_df['gender'].value_counts()

In [None]:
citibike_df['birth year'].value_counts()

In [None]:
currentYear = datetime.now().year

In [None]:
citibike_df['rider age'] = currentYear - citibike_df['birth year']
citibike_df

In [None]:
bins = [-1,1,18,25,45,65,100,1000]
citibike_df['age bracket'] = pd.cut(citibike_df['rider age'],bins)
citibike_df

In [None]:
customer_df = citibike_df.groupby(['startdate','gender','age bracket','usertype']).tripduration.agg(['count']).reset_index()
customer_df

In [None]:
citibike_customer_csv = os.path.join(cur_dir,'citibike_files','cleansed','citibike_customer.csv')

In [None]:
if debug_mode == 'n':
    #customer_df.to_csv(citibike_customer_csv, header=csv_header_ind, mode = 'a')
    #https://stackoverflow.com/questions/30991541/pandas-write-csv-append-vs-write
    if not os.path.isfile(citibike_customer_csv):
       customer_df.to_csv(citibike_customer_csv, header='column_names')
    else: # else it exists so append without writing the header
       customer_df.to_csv(citibike_customer_csv, mode='a', header=False)


## Analyze bike stations

In [None]:
start_stations_df = citibike_df.drop_duplicates(subset=["start station id", "start station latitude","start station longitude","start station name"])
start_stations_df = start_stations_df[["start station id", "start station latitude","start station longitude","start station name"]]
start_stations_df = pd.DataFrame(start_stations_df)
start_stations_df.columns = ["station id", "station latitude","station longitude","station name"]

In [None]:
end_stations_df = citibike_df.drop_duplicates(subset=["end station id", "end station latitude","end station longitude","end station name"])
end_stations_df = end_stations_df[["end station id", "end station latitude","end station longitude","end station name"]]
end_stations_df = pd.DataFrame(end_stations_df)
end_stations_df.columns = ["station id", "station latitude","station longitude","station name"]

In [None]:
distinct_stations_df = start_stations_df.append(end_stations_df)
distinct_stations_df = distinct_stations_df.drop_duplicates(subset=["station id", "station latitude","station longitude","station name"])
distinct_stations_df = distinct_stations_df.set_index('station id', inplace = True)

In [None]:
citibike_distinct_station_csv = os.path.join(cur_dir,'citibike_files','cleansed','citibike_distinct_station_csv.csv')

In [None]:
if debug_mode == 'n':
    if not os.path.isfile(citibike_distinct_station_csv):
       start_stations_df.to_csv(citibike_distinct_station_csv, header='column_names')
    else: # else it exists so append without writing the header
       start_stations_df.to_csv(citibike_distinct_station_csv, mode='a', header=False)

In [None]:
start_station_trips_df = citibike_df.groupby(['startdate','start station id']).tripduration.agg(['count']).reset_index()
start_station_trips_df = start_station_trips_df.set_index(['startdate'])

In [None]:
citibike_start_station_csv = os.path.join(cur_dir,'citibike_files','cleansed','citibike_start_station.csv')

In [None]:
if debug_mode == 'n':
    if not os.path.isfile(citibike_start_station_csv):
       start_stations_df.to_csv(citibike_start_station_csv, header='column_names')
    else: # else it exists so append without writing the header
       start_stations_df.to_csv(citibike_start_station_csv, mode='a', header=False)

## Analyze bike equipment

In [None]:
bike_equipment_df = citibike_df.groupby(['bikeid']).tripduration.agg(['count','sum']).reset_index()
bike_equipment_df = bike_equipment_df.set_index('bikeid')
bike_equipment_df = pd.DataFrame(bike_equipment_df)

In [None]:
bike_date_df = citibike_df.groupby(['bikeid']).startdate.agg(['min','max']).reset_index()
bike_date_df = bike_date_df.set_index(['bikeid'])
bike_date_df = pd.DataFrame(bike_date_df)

In [None]:
bike_merged_df = pd.merge(bike_date_df, bike_equipment_df, left_index=True, right_index=True)

In [None]:
citibike_bike_equipment_csv = os.path.join(cur_dir,'citibike_files','cleansed','citibike_bike_date.csv')

In [None]:
if debug_mode == 'n':
    if not os.path.isfile(citibike_bike_equipment_csv):
       bike_merged_df.to_csv(citibike_bike_equipment_csv, header='column_names')
    else: # else it exists so append without writing the header
       bike_merged_df.to_csv(citibike_bike_equipment_csv, mode='a', header=False)

## Cleanup memory for next run

In [None]:
del [[citibike_df,customer_df, distinct_stations_df,start_stations_df,end_stations_df]]
del [[bike_equipment_df, bike_date_df, bike_merged_df]]
gc.collect()
citibike_df = []
customer_df = []
distinct_stations_df = []
start_stations_df = []
end_stations_df = []
bike_equipment_df = []
bike_date_df = []
bike_merged_df = []

In [None]:
# print end 
print(data_year)
execEndDateTime = datetime.now()
print(execStartDateTime)
print(execEndDateTime)