In [5]:
# connect to account
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


In [7]:
# enable display
%load_ext google.colab.data_table 

In [14]:
# Chunk processing: GCP as an upper limit for CPU usage, therefore I chunk process the dataset by date. 
# The function bellow takes a start date and an end date and produce a table retreiving data between these two dates. 

# start:  2013-08-29T12:00:00
# end : 2015-08-31T23:00:00


from google.cloud import bigquery
client = bigquery.Client(project='sfbikeshare')


def createChunk(start_date, end_date, table_id):
  
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("start_date", "STRING", start_date),
            bigquery.ScalarQueryParameter("end_date", "STRING", end_date),
        ]
        ,
        destination = table_id
    )

    df = client.query('''
        WITH T AS
        (SELECT * FROM sfbikeshare.statusJoinStationHrGeom WHERE DTHR BETWEEN @start_date AND @end_date)
        SELECT T1.DTHR as time1, T1.station_id AS ID1, T2.station_id AS ID2, T2.DTHR as time2, 
              T1.bikes_available, T1.docks_available, 	T1.dock_count AS capacity, T1.lat AS lat1, 
              T1.long AS long1, T2.lat AS lat2, T2.long AS long2, T2.dock_count AS others_capacity 
        FROM T T1, T T2
        WHERE (T1.station_id <> T2.station_id)  AND (TIME_DIFF(TIME(T1.DTHR), TIME(T2.DTHR), HOUR) =0) AND ((date_diff(DATE(T1.DTHR), DATE(T2.DTHR), DAY) =0))
        ORDER BY time1, id1, id2
    ''', job_config = job_config).to_dataframe()


    print("Created table: " + table_id)

  

In [None]:
# loop through the entire date range with an incremental of n days per chunk, and save each chunk table to a bigQuery table. 


import datetime
start_date = datetime.datetime(2013, 8, 29, 0,0,0) 
delta = datetime.timedelta(days= 100)
end_date = start_date + delta


last_date = datetime.datetime(2015, 8, 31, 0, 0, 0)


while end_date <= last_date:
  
    table_id= "sfbikeshare.sfbikeshare." +  "status_f_" +  start_date.date().strftime('%Y%m%d') + "_" + end_date.date().strftime('%Y%m%d')  
    
    start_date_in = start_date.isoformat()
    end_date_in = end_date.isoformat()

    createChunk(start_date_in, end_date_in, table_id)

    start_date = end_date
    end_date += delta

Created table: sfbikeshare.sfbikeshare.status_f_20130829_20131207
Created table: sfbikeshare.sfbikeshare.status_f_20131207_20140317
Created table: sfbikeshare.sfbikeshare.status_f_20140317_20140625


In [None]:
# Union Chunk Tables
from google.cloud import bigquery

client = bigquery.Client(project='sfbikeshare')

start_date = datetime.datetime(2013, 8, 29, 0, 0, 0).isoformat()
last_date = datetime.datetime(2015, 8, 31, 0, 0, 0).isoformat()


table_id = "sfbikeshare.sfbikeshare.master_table_test"
table_to_insert = 'sfbikeshare.status_20130829_20131207'

job_config = bigquery.QueryJobConfig(
    query_parameters=[
            bigquery.ScalarQueryParameter("start_date", "STRING", start_date),
            bigquery.ScalarQueryParameter("last_date", "STRING", last_date),
    ]
    ,
    destination = table_id
)


df = client.query('''
    SELECT * FROM `sfbikeshare.sfbikeshare.status_*`
    WHERE _TABLE_SUFFIX BETWEEN @start_date AND @last_date
    
  ''', job_config = job_config).to_dataframe()

df.head()



Unnamed: 0,time1,ID1,ID2,time2,bikes_available,docks_available,capacity,lat1,long1,lat2,long2,others_capacity
0,2013-08-29 17:00:00,3,2,2013-08-29 17:00:00,10.0,5.0,15,37.330698,-121.888979,37.329732,-121.901782,27
1,2013-09-02 05:00:00,3,2,2013-09-02 05:00:00,10.0,5.0,15,37.330698,-121.888979,37.329732,-121.901782,27
2,2013-08-30 19:00:00,3,2,2013-08-30 19:00:00,8.0,7.0,15,37.330698,-121.888979,37.329732,-121.901782,27
3,2013-08-29 19:00:00,3,2,2013-08-29 19:00:00,8.436364,6.563636,15,37.330698,-121.888979,37.329732,-121.901782,27
4,2013-08-29 16:00:00,3,2,2013-08-29 16:00:00,9.381818,5.618182,15,37.330698,-121.888979,37.329732,-121.901782,27


In [None]:
df.shape

(508032, 12)