# Toronto Bike Stations
#### Jessica Liu

## Pipeline design and Code

In consideration of the future run as daily, the pipeline was designed as follows. 
Imagine the entire process of processing Toronto Bike Station's data as an object, named as __TorontoBikeStation__. Based on this object, other pipeline components are designed,  including:
- __Extract:__ used to extract the data from two APIs
- __Transforming:__ used to merge and clean the data from Extract. Format it including rename,re-order, and re-type, based on the 'toronto_bike_stations' table.
- __Loading:__ used to connect to the MS SQL Server and save the cleaned and formatted data to the database directly. (Specially designed for minutely status updates)
- __TrigerFrequency:__ used to as a Timer. Start time, End time, interval, two URLs are as parameters. Used for cases like minutely status updates or daily updates

#### Import all packages 

In [12]:
#Extract packages
from urllib import request
import json
import pandas as pd

#Loading packages

import pymssql 
from sqlalchemy import create_engine


#frequency packages
from threading import Thread, Event
import datetime    

### Extract
getAllColumns function is used to determine all attributes from each station. In station_status API, some records contain post_code, cross_street, or obcn, but some did not. To ensure we can get all features, I developed this function.

In [13]:
class Extract:
    def __init__(self, url):
        self.url = url
        
        #consider the edge case (HTTP error, timeout, null value)
        try:
            req = request.Request(self.url)
            with request.urlopen(req) as f:
                data = json.loads(f.read().decode('utf-8'))
        except:
            print("HTTP Error or Timeout")
        else:
            try:
                len(data)<1
            except:
                print("empty json file")
            else:
                                       
                self.data = data
                self.dic_list = data['data']['stations']
                self.last_update=self.data['last_updated']
    

    def getAllColumns(self):
        all_keys = []
        for dic in self.dic_list:
            keys = [str(key) for key in dic.keys()]
            union_key = list(set(all_keys).union(set(keys)))
            all_keys = union_key
        return sorted(all_keys)

    def getDataFrame(self):
        # get the python data frame
        dic_value = []
        all_values = []
        for dic in self.dic_list:
            dic_value = [dic.get(key) for key in self.getAllColumns()]
            all_values.append(dic_value)

        result = pd.DataFrame(all_values, columns=self.getAllColumns())
        result['last_updated'] = self.last_update
        return result
 

### Transforming


In [14]:
class Transforming:
    def __init__(self,info_data,status_data):
        self.info_data=info_data
        self.status_data=pd.concat([status_data, status_data['num_bikes_available_types'].apply(pd.Series)], axis = 1).drop('num_bikes_available_types', axis = 1)
        data=self.info_data.merge(self.status_data, how='outer', on='station_id') 
        self.data=data
        


    def cleanedData(self):
        #rename to keep the same label as the database table columns name
        data=self.data.rename(columns={"lat":"latitude","lon":"longitude",
                                              "name":"station_name",
                                              "mechanical":"mechanical_bikes_available",
                                              "ebike":"electric_bikes_available",
                                              "last_updated_x":"last_update",
                                              "last_updated_y":"last_update_info"})
        #keeo the same order as the database table columns order
        columnOrder=["last_update","station_id","station_name","physical_configuration",
             "latitude","longitude","altitude","address","capacity","rental_methods",
             "groups","obcn","nearby_distance","num_bikes_available",
             "mechanical_bikes_available","electric_bikes_available","num_bikes_disabled",
             "num_docks_available", "num_docks_disabled","is_installed","is_renting",
             "is_returning", "last_reported","is_charging_station", "status",
             "cross_street","post_code","last_update_info"]
        data=data[columnOrder]
        
        
        #save all information 
        #station_result.to_csv('station_result.csv',header=True,index=False)
          
        
        #drop out of service station and unuseful columns
        data=data.drop(data[data['status']!='IN_SERVICE'].index)
        cleaned_data=data.drop(columns=["cross_street","post_code","last_update_info"])
        
        
        #formate the data type 
        cleaned_data['last_update'] =  pd.to_datetime(cleaned_data['last_update'], unit='s')
        cleaned_data['last_reported'] =  pd.to_datetime(cleaned_data['last_reported'], unit='s')
        
        cleaned_data[["station_id","capacity"]]=cleaned_data[["station_id","capacity"]].astype(int) 
        cleaned_data[["rental_methods","groups"]]=cleaned_data[["rental_methods","groups"]].astype(str)
        cleaned_data["is_charging_station"]=cleaned_data["is_charging_station"].astype(int)
        #data[["is_installed","is_renting","is_returning"]]=data[["is_installed","is_renting","is_returning"]].astype(bool)

       #remove special characters 
        cleaned_data["rental_methods"]=cleaned_data["rental_methods"].str.strip('[]')
        cleaned_data["groups"]=cleaned_data["groups"].str.strip('[]')
        cleaned_data["groups"]=cleaned_data["groups"].str.replace("\'","")
        

        return cleaned_data

### Loading
May have pymssql install error on mac

In [15]:
class Loading:
    def __init__(self,data):
        self.data=data
        self.conn = pymssql.connect(host='127.0.0.1:1433',user='sa',
                       password='Mxl123ml',database='test',
                      charset="utf8")


        self.engine = create_engine('mssql+pymssql://sa:Mxl123ml@127.0.0.1/test')

    def loadData(self):
        
        return self.data.to_sql('test', self.engine, if_exists='append', index=False)


Update  2 2020-10-13 07:18:07.068215


### TorontoBikeStations

In [16]:
class TonrontoBikeStations:
    
    def __init__(self, info_url,status_url):
        self.info_url=info_url
        self.status_url=status_url
        info=Extract(self.info_url)
        status=Extract(self.status_url)
        self.info_data=info.getDataFrame()
        self.status_data=status.getDataFrame() 
        trans=Transforming(self.info_data,self.status_data)
        self.data=trans.cleanedData()
        
        
    
    def saveToSQL(self):
        load=Loading(self.data)
        
        return load.loadData()

### TrigerFrequency
A timer, used to automatically run the script

In [17]:
class TrigerFrequency(Thread):

    def __init__(self,start,stop,interval,info_url,status_url):
        Thread.__init__(self)
        self.event = Event()
        self.runningTimes=((stop-start).seconds)/interval
        self.interval=interval
        self.info_url=info_url
        self.status_url=status_url
        

    def run(self):
        i=0
        while not self.event.wait(self.interval):
            i=i+1
            toronto=TonrontoBikeStations(self.info_url,self.status_url)
            print('Update ',i, datetime.datetime.now())
            if i==self.runningTimes:
                self.event.set()



### Result

In [18]:
info_url = 'https://tor.publicbikesystem.net/ube/gbfs/v1/en/station_information'
status_url = 'https://tor.publicbikesystem.net/ube/gbfs/v1/en/station_status'

toronto=TonrontoBikeStations(info_url,status_url)

data=toronto.data

print(data.head())

data.to_csv('final_result.csv',header=True,index=False)
  

          last_update  station_id                     station_name  \
0 2020-10-12 23:17:58        7000     Fort York  Blvd / Capreol Ct   
1 2020-10-12 23:17:58        7001  Lower Jarvis St / The Esplanade   
2 2020-10-12 23:17:58        7002       St. George St / Bloor St W   
3 2020-10-12 23:17:58        7003         Madison Ave / Bloor St W   
4 2020-10-12 23:17:58        7004          University Ave / Elm St   

  physical_configuration   latitude  longitude  altitude  \
0                REGULAR  43.639832 -79.395954       0.0   
1                REGULAR  43.647992 -79.370907       0.0   
2                REGULAR  43.667333 -79.399429       0.0   
3                REGULAR  43.667158 -79.402761       NaN   
4                REGULAR  43.656518 -79.389099       NaN   

                           address  capacity  \
0     Fort York  Blvd / Capreol Ct        35   
1  Lower Jarvis St / The Esplanade        15   
2       St. George St / Bloor St W        19   
3         Madison Ave / Bl

### Triger Frequency Test
##### Assume the start time is now and stop time is 1 min later, get information every 10s

In [19]:
info_url = 'https://tor.publicbikesystem.net/ube/gbfs/v1/en/station_information'
status_url = 'https://tor.publicbikesystem.net/ube/gbfs/v1/en/station_status'


start = datetime.datetime.now()
stop = start + datetime.timedelta(seconds=60)
    
timer_test = TrigerFrequency(start,stop,10,info_url,status_url)
timer_test.start()

</br>
</br>



## QA test script

In [20]:
class QATest():
    
    def __init__(self,info_url,status_url):
        self.info_url=info_url
        self.status_url=status_url
        self.info=Extract(self.info_url)
        self.status=Extract(self.status_url)
        trans=Transforming(self.info.getDataFrame(),self.status.getDataFrame())
        self.cleaned_data=trans.cleanedData()
        
        
      
    
    #test Extract class:
    #if the API's json structure are still the same
    def jsonStructure(self):
           # ensure loaded json is dictonary 
        if type(self.info.data) != dict:
            print("wrong info json format")
        if type(self.status.data) != dict:
            print("wrong status json format")
            
        # ensure it contains 'last_updated' field                             
        if 'last_updated'not in self.info.data:
            print("no info updated date")
        if 'last_updated'not in self.status.data:
            print("no info status date")
            
        # ensure 'data' is exist   
        if 'data'not in self.info.data:
            print("no data")
        if 'data'not in self.status.data:
            print("no data")
            
        # ensure 'data' contains records                
        if len(self.info.dic_list)==0:            
            print("info API is empty")
        if len(self.status.dic_list)==0:
            print("info API is empty")
            
            
            

    #test Transforming class:
    def formattedData(self):
        tras=Transforming(self.info.getDataFrame(),self.status.getDataFrame())
        
        
        # ensure no dupilicate in the key attribute 'station_id' 
        dic={}.fromkeys(tras.data['station_id'])
        if len(dic)!=len(tras.data['station_id']):
            print('There is dupilicate station_id')
            
        # ensure get all feature
        if tras.data.shape[1]!=28:
            print('Did not extract all features')
                    
        # ensure cleaned DataFrame contains all requried attributes
        if tras.cleanedData().shape[1]!=25:
            print("Attibutes did not consistent with 'toronto_bike_stations' attributes")
            
            
            
    
    #test if connect to the database
    def connectDatabase(self):
        connect=Loading(self.cleaned_data)
        
        conn=connect.conn
        cursor = conn.cursor()
        sql = 'select top 2 * from sql_test'
        cursor.execute(sql)
        rs = cursor.fetchall()
        print(rs)
       

### Result

In [23]:
#test code
info_url = 'https://tor.publicbikesystem.net/ube/gbfs/v1/en/station_information'
status_url = 'https://tor.publicbikesystem.net/ube/gbfs/v1/en/station_status'
   
test=QATest(info_url,status_url)
test.jsonStructure()
test.formattedData()
test.connectDatabase()

[('2020-10-12 22:43:44.0000000', 7000, 'Fort York  Blvd / Capreol Ct', 'REGULAR', 43.639832, -79.395954, 0.0, 'Fort York  Blvd / Capreol Ct', 35, "'KEY', 'CREDITCARD', 'TRANSITCARD', 'PHONE'", None, '647-643-9607', 500.0, '21', '20', '1', '0', '14', '0', '1', '1', '1', '2020-10-12 22:39:43.0000000', '0', 'IN_SERVICE'), ('2020-10-12 22:43:44.0000000', 7001, 'Lower Jarvis St / The Esplanade', 'REGULAR', 43.647992, -79.370907, 0.0, 'Lower Jarvis St / The Esplanade', 15, "'KEY', 'CREDITCARD', 'TRANSITCARD', 'PHONE'", None, '416-617-9576', 500.0, '7', '7', '0', '1', '7', '0', '1', '1', '1', '2020-10-12 22:39:33.0000000', '0', 'IN_SERVICE')]


</br>
</br>

## Determine the closest five bike stations

In [24]:
from math import sin, asin, cos, radians, fabs, sqrt

def distance(lon1, lat1, lon2, lat2): 
    # Convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
 
    # distance
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # The average radius of the earth, in kilometers
    return c * r * 1000



dis_data=data.loc[:,['station_id','station_name','address','latitude','longitude']]

dis=[]
for i in range(len(dis_data)):
    dis.append(distance(-79.396160,43.661896,dis_data['longitude'][i],dis_data['latitude'][i]))    

dis_data['dis']=dis
#ordered data by distance
ordered_dis=dis_data.sort_values(by=['dis'])

print(ordered_dis[['station_id','station_name','dis']].head(5))

     station_id                                 station_name         dis
66         7066                 Willcocks St / St. George St  128.426129
326        7358  Gailbraith Rd / King’s College Cr. (U of T)  179.106283
229        7250           St. George St / Russell St - SMART  193.535201
544        7600                Russell St / Huron St - SMART  235.838132
177        7190                   St. George St / Hoskin Ave  255.980827
