In [21]:
import pandas as pd
import numpy as np
import requests
import math
import datetime

from google.cloud import bigquery

# Function get_time_distance and get_nearest_taxi_stand

In [2]:
# only for 3 days, need to generate again
ONEMAP_API_TOKEN = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOjgyNjgsInVzZXJfaWQiOjgyNjgsImVtYWlsIjoianByYW1vbm8wMUBnbWFpbC5jb20iLCJmb3JldmVyIjpmYWxzZSwiaXNzIjoiaHR0cDpcL1wvb20yLmRmZS5vbmVtYXAuc2dcL2FwaVwvdjJcL3VzZXJcL3Nlc3Npb24iLCJpYXQiOjE2Mzk3MzA4OTAsImV4cCI6MTY0MDE2Mjg5MCwibmJmIjoxNjM5NzMwODkwLCJqdGkiOiJiMmNiZjk2ZTE0NmNmMjZjYzdmMGMyMTIwYWUyYWM2NCJ9.X3kCImlS9jyAz2d0G_I2QK9Z5P0QTRnDoLoWcrtBLcA"
LAT_1KM = 5/110
LON_1KM = 5/111.320*math.cos(1.3)

In [9]:
def get_time_distance(start_lat, start_lon, end_lat, end_lon):
    start = str(start_lat)+','+str(start_lon)
    end = str(end_lat)+','+str(end_lon)
    url = "https://developers.onemap.sg/privateapi/routingsvc/route"
    params = {"start" : start,
             "end": end,
             "routeType" : "drive",
             "token": ONEMAP_API_TOKEN}
    r = requests.get(url=url, params=params)
    total_distance = round(r.json()["route_summary"]["total_distance"]/1000,2) # in km
    total_time = round(r.json()["route_summary"]["total_time"]/60,2) # in mins
    return (total_time, total_distance)

In [10]:
get_time_distance(1.319728,103.8421,1.326762,103.8559)

(7.47, 3.06)

In [27]:
start_lat = 1.319728
start_lon = 103.8421

In [48]:
end = {"lat" : [(1.326762 + i*0.0091) for i in range(5)],
       "lon" : [(103.8559 + i*0.0024) for i in range(5)]}
end_df = pd.DataFrame(end)
end_df

Unnamed: 0,lat,lon
0,1.326762,103.8559
1,1.335862,103.8583
2,1.344962,103.8607
3,1.354062,103.8631
4,1.363162,103.8655


In [25]:
def get_nearest_taxi_stand(taxi_lat, taxi_lon, stand_df, top=5):
    stand_df["total_distance"] = stand_df.apply(lambda x : get_time_distance(start_lat, start_lon, 
                                                                             x["lat"], x["lon"])[1], axis = 1)
    stand_df = stand_df.sort_values(by="total_distance", ascending=False).reset_index(drop=True)
    return stand_df.iloc[:top]

In [49]:
get_nearest_taxi_stand(start_lat, start_lon, end_df, top=3)

Unnamed: 0,lat,lon,total_distance
0,1.363162,103.8655,8.53
1,1.354062,103.8631,7.38
2,1.344962,103.8607,5.0


# ETL for GCP

## parse taxi availability data

In [9]:
taxi_url = "https://api.data.gov.sg/v1/transport/taxi-availability"
r = requests.get(taxi_url)
coordinates = r.json()["features"][0]["geometry"]["coordinates"]
timestamp = r.json()["features"][0]["properties"]["timestamp"]
taxi_available = pd.DataFrame(np.array(coordinates), columns=["lon","lat"])
taxi_available["update_time"] = str(datetime.datetime.strptime(timestamp,"%Y-%m-%dT%H:%M:%S+08:00"))
len(taxi_available), len(taxi_available.dropna())

(2006, 2006)

In [7]:
taxi_available.columns

Index(['lon', 'lat', 'update_time'], dtype='object')

## parse rainfall data

In [24]:
def parse_rainfall_data(request):
    '''
    This function calls the rainfall API
    and parses it into a dataframe.
    Available columns are id, lat, lon, value and timestamp
    
    value represents rainfall, where 0 is no rain.
    '''
    gov = 'https://api.data.gov.sg/v1'
    weather_api = '/environment/rainfall'

    url = gov+weather_api
    response = requests.get(url).json()

    timestamp_str = response['items'][0]['timestamp']
    timestamp = str(datetime.datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S+08:00'))
    weather_list = []
    for index,value in enumerate(response['items'][0]['readings']):
        weather_list.append(
            {'station_id':response['metadata']['stations'][index]['id'], 
             'station_lat':response['metadata']['stations'][index]['location']['latitude'],
             'station_lon':response['metadata']['stations'][index]['location']['longitude'],
             'rainfall':float(response['items'][0]['readings'][index]['value']),
             'update_time': timestamp})
        
    weather_df = pd.DataFrame(weather_list)
    
    client = bigquery.Client(project='taxi-compass-lewagon')
    table_id = 'api_dataset.h_weather_rainfall'
    
    job = client.load_table_from_dataframe(
        weather_df, table_id
    )

    job.result()  # Wait for the job to complete.

    table = client.get_table(table_id)  # Make an API request.
    print(
        "Loaded {} rows and {} columns to {}".format(
            table.num_rows, len(table.schema), table_id
        )
    )
    return ("Done!", 200)