## Part 0: Setup
In this part we set up some basic environment and variables needed for this project.

In [1]:
import os
import re
import bs4
import math
import requests
import warnings
import numpy as np
import pandas as pd
import keplergl as kg
from scipy import stats
import sqlalchemy as db
import geopandas as gpd
import matplotlib.pyplot as plt

warnings.filterwarnings("ignore")

In [2]:
# Variables needed
TAXI_URL = "https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"
UBER_CSV = "uber_rides_sample.csv"
WEATHER_CSV = ["weather-2009.csv", "weather-2010.csv", "weather-2011.csv", 
               "weather-2012.csv", "weather-2013.csv", "weather-2014.csv", "weather-2015.csv"]
ZONE_PATH = "taxi_zones.shp"

NY_COORDS = ((40.560445, -74.242330), (40.908524, -73.717047))

DATABASE = "sqlite:///project.db"
SCHEMA_FILE = "schema.sql"

In [3]:
Taxi_zone = gpd.read_file(ZONE_PATH)
Taxi_zone = Taxi_zone.to_crs(4326)
Taxi_zone['longitude'] = Taxi_zone.centroid.x  
Taxi_zone['latitude'] = Taxi_zone.centroid.y

## Part 1: Data Preprocessing
### Yellow Taxi trip data: Downloading, Cleaning, Sampling

In [4]:
def get_taxi_html() -> bytes:
    response = requests.get(TAXI_URL)
    html = response.content
    return html

In [5]:
def find_taxi_parquet_links() -> list:
    links = []
    pattern = r"yellow_tripdata_2009|yellow_tripdata_201[0-4]|yellow_tripdata_2015-0[1-6]"
    soup = bs4.BeautifulSoup(get_taxi_html(),'html.parser')
    for a in soup.find_all("a",href = True):
        link_text = a.get("href")
        matches = re.findall(pattern,link_text)
        if matches:
            links.append(link_text)
    return links

In [6]:
def monthly_taxi_data_download_clean_sample(url: str) -> pd.core.frame.DataFrame:
    parquet_name = url.split("/")[-1]

    # download if it doesn't exist
    if not os.path.exists(parquet_name):
        print(f"Downloading parquet for {parquet_name[16:23]}.")
        file = requests.get(url)
        with open(parquet_name , "wb") as f:
            f.write(file.content)
    
    # load data from parquet file
    data = pd.read_parquet(parquet_name)
    print(f"Cleaning data for {parquet_name[16:23]}.")
    
    # 为了不占用太多内存，读一个删一个，最后提交之前要删掉
    os.remove(parquet_name) 
    print(f"Parquet for {parquet_name[16:23]} is removed.")
    
    # looking up the latitude and longitude for some months where only location IDs are given for pickups and dropoffs
    # keep NaNs if exists
    if "PULocationID" in data.columns:
        data["pickup_latitude"] = data["PULocationID"].map(Taxi_zone["latitude"], na_action = "ignore")
        data["pickup_longitude"] = data["PULocationID"].map(Taxi_zone["longitude"], na_action = "ignore")
        data["dropoff_latitude"] = data["DOLocationID"].map(Taxi_zone["latitude"], na_action = "ignore")
        data["dropoff_longitude"] = data["DOLocationID"].map(Taxi_zone["longitude"], na_action = "ignore")
    
    # normalize column names
    rename_dict = {
        "VendorID" : "vendor_id",
        "tpep_pickup_datetime" : "pickup_datetime",
        "tpep_dropoff_datetime" : "dropoff_datetime",
        "RatecodeID" : "rate_code",
        "Trip_Pickup_DateTime" : "pickup_datetime",
        "Trip_Dropoff_DateTime" : "dropoff_datetime",
        "Start_Lon" : "pickup_longitude",
        "Start_Lat" : "pickup_latitude",
        "End_Lon" : "dropoff_longitude",
        "End_Lat" : "dropoff_latitude",
        "Fare_Amt" : "fare_amount",
        "Tip_Amt" : "tip_amount",
        "Tolls_Amt" : "tolls_amount",
        "Total_Amt" : "total_amount"
    }
    data.rename(columns = rename_dict, inplace = True)
    
    # remove the trips that the location IDs are be valid
    data.dropna(subset=["pickup_latitude","pickup_longitude","dropoff_latitude","dropoff_longitude"],inplace = True)
    
    # remove invalid data points
    data = data[data["total_amount"] > 0]
    
    # normalize and use appropriate column types for the respective data
    data["pickup_datetime"] = pd.to_datetime(data["pickup_datetime"])
    data["dropoff_datetime"] = pd.to_datetime(data["dropoff_datetime"])
    data = data.astype({"pickup_latitude": "float64","pickup_longitude": "float64",\
                        "dropoff_latitude": "float64","dropoff_longitude": "float64","tip_amount": "float64"})
    
    # remove unnecessary columns and only keeping columns needed
    data = data[["pickup_datetime","pickup_latitude","pickup_longitude","dropoff_latitude","dropoff_longitude","tip_amount"]]
    
    # remove trips that start and/or end outside of NY
    data = data[(data["pickup_latitude"] >= NY_COORDS[0][0]) & (data["pickup_latitude"] <= NY_COORDS[1][0])]
    data = data[(data["pickup_longitude"] >= NY_COORDS[0][1]) & (data["pickup_longitude"] <= NY_COORDS[1][1])]
    data = data[(data["dropoff_latitude"] >= NY_COORDS[0][0]) & (data["dropoff_latitude"] <= NY_COORDS[1][0])]
    data = data[(data["dropoff_longitude"] >= NY_COORDS[0][1]) & (data["dropoff_longitude"] <= NY_COORDS[1][1])]
    
    # Sampling
    # Uber dataset consists of 200000 data points
    # Therefore, we need 200000/78 ~ 2564 data points from each month
    data = data.sample(2564)

    return data


### Yellow Taxi trip data: Filling (Distance)

We calculate the distance between pickup location and dropoff location using the Haversine Formula:

![](https://user-images.githubusercontent.com/2789198/27240436-e9a459da-52d4-11e7-8f84-f96d0b312859.png)

where $\lambda$ and $\phi$ are the `longitude` and `latitude` of locations respectively, $r$ is the radius of earth.

In [7]:
def calculate_distance(pu_coord: pd.core.frame.DataFrame, do_coord: pd.core.frame.DataFrame) -> pd.core.series.Series:
    
    pick_lon = pu_coord["pickup_longitude"].map(math.radians)
    pick_lat = pu_coord["pickup_latitude"].map(math.radians)
    drop_lon = do_coord["dropoff_longitude"].map(math.radians)
    drop_lat = do_coord["dropoff_latitude"].map(math.radians)
    
    delta_lat = drop_lat - pick_lat
    delta_lon = drop_lon - pick_lon
    
    # Take the average earth radius (km) as r
    r = 6371
    part_formula = ((delta_lat/2).map(math.sin))**2 + (pick_lat.map(math.cos))*(drop_lat.map(math.cos))*((delta_lon/2).map(math.sin))**2
    dist = 2 * r * part_formula.map(math.sqrt).map(math.asin)
    
    return dist.astype("float64")

In [8]:
def filling_distance(data: pd.core.frame.DataFrame) -> pd.core.frame.DataFrame:
    pu_coord = data[["pickup_longitude","pickup_latitude"]]
    do_coord = data[["dropoff_longitude","dropoff_latitude"]]
    data["distance"] = calculate_distance(pu_coord, do_coord)
    
    return data

In [9]:
def all_taxi_data(urls: list) -> pd.core.frame.DataFrame:
    all_taxi_df = []
    for url in urls:
        data = monthly_taxi_data_download_clean_sample(url)
        data = filling_distance(data)
        all_taxi_df.append(data)
    
    all_data = pd.concat(all_taxi_df)
    
    return all_data

### Uber rides data: Reading, Cleaning and Filling

In [10]:
def uber_data_read_clean_fill() -> pd.core.frame.DataFrame:
    
    data = pd.read_csv(UBER_CSV, low_memory = False)
    print("Cleaning data for Uber rides.")
    
    # remove the trips that the location IDs are be valid
    data.dropna(subset=["pickup_latitude","pickup_longitude","dropoff_latitude","dropoff_longitude"],inplace = True)
    
    # normalize and use appropriate column types for the respective data
    data["pickup_datetime"] = pd.to_datetime(data["pickup_datetime"])
    data = data.astype({"pickup_latitude": "float64","pickup_longitude": "float64",\
                        "dropoff_latitude": "float64","dropoff_longitude": "float64"})
    
    # remove invalid data points
    data = data[data["fare_amount"] > 0]
    
    # remove unnecessary columns and only keeping columns needed 
    data = data[["pickup_datetime","pickup_latitude","pickup_longitude","dropoff_latitude","dropoff_longitude"]]
    
    # remove trips that start and/or end outside of NY
    data = data[(data["pickup_latitude"] >= NY_COORDS[0][0]) & (data["pickup_latitude"] <= NY_COORDS[1][0])]
    data = data[(data["pickup_longitude"] >= NY_COORDS[0][1]) & (data["pickup_longitude"] <= NY_COORDS[1][1])]
    data = data[(data["dropoff_latitude"] >= NY_COORDS[0][0]) & (data["dropoff_latitude"] <= NY_COORDS[1][0])]
    data = data[(data["dropoff_longitude"] >= NY_COORDS[0][1]) & (data["dropoff_longitude"] <= NY_COORDS[1][1])]
    
    # fill in distance column
    data = filling_distance(data)
    
    return data

### Weather data: Reading, Cleaning and Filling

In [11]:
def hourly_weather_data_read_clean_fill(csv: str) -> pd.core.frame.DataFrame:
    
    data = pd.read_csv(csv, low_memory = False)
    
    # remove columns of no use
    data = data[["DATE","HourlyWindSpeed","HourlyPrecipitation"]]

    # remove missing values for wind speed data
    data.dropna(subset=["HourlyWindSpeed"], inplace=True)

    # normalize and use appropriate column types for the respective data
    data["DATE"] = pd.to_datetime(data["DATE"])
    data["HourlyPrecipitation"] = pd.to_numeric(data["HourlyPrecipitation"], errors = "coerce")

    # fill 0 to NAs in the precipitation data
    data["HourlyPrecipitation"].fillna(0, inplace=True)
    data = data.astype({"HourlyWindSpeed":"float64", "HourlyPrecipitation":"float64"})

    return data

In [12]:
def daily_weather_data_read_clean_fill(csv: str) -> pd.core.frame.DataFrame:
    
    data = pd.read_csv(csv, low_memory = False)
    data["DATE"] = pd.to_datetime(data["DATE"])
    data["HourlyPrecipitation"] = pd.to_numeric(data["HourlyPrecipitation"], errors = "coerce")
    data["HourlyPrecipitation"].fillna(0, inplace = True)
    
    # Only take date into consideration now
    data["DATE"] = data["DATE"].dt.date
    data_daily = data.groupby(["DATE"],as_index = False).agg({"HourlyPrecipitation":"sum","HourlyWindSpeed":"mean"})
    data_daily.rename(columns = {"HourlyPrecipitation" : "Precipitation", "HourlyWindSpeed" : "WindSpeed"}, inplace = True)
    
    
    data_daily["WindSpeed"].round(2)
    data_daily["DATE"] = pd.to_datetime(data_daily["DATE"])
    data_daily = data_daily.astype({"WindSpeed":"float64", "Precipitation":"float64"})
    
    return data_daily

In [13]:
def daily_sun_data_read(csv: str) -> pd.core.frame.DataFrame:
    
    data = pd.read_csv(csv, low_memory = False)
    data["DATE"] = pd.to_datetime(data["DATE"]).dt.date
    data_sun = data.groupby(["DATE"], as_index = False).agg({"Sunrise":"first","Sunset":"first"})
    data_sun = data_sun.dropna()
    data_sun["DATE"] = pd.to_datetime(data_sun["DATE"])
    
    # use appropriate column types for the respective data
    data_sun = data_sun.astype({"Sunrise":"int32", "Sunset":"int32"})
    data_sun = data_sun.astype({"Sunrise":"string", "Sunset":"string"})
    
    return data_sun

In [14]:
def all_weather_data() -> pd.core.frame.DataFrame:
    
    hourly_data = []
    daily_data = []
    sun_data = []
    
    for csv in WEATHER_CSV:
        ho_data = hourly_weather_data_read_clean_fill(csv)
        da_data = daily_weather_data_read_clean_fill(csv)
        su_data = daily_sun_data_read(csv)
        
        # for year 2015, only need data for the first six month
        if csv == "weather-2015.csv":
            ho_data = ho_data[ho_data["DATE"].dt.month <= 6]
            da_data = da_data[da_data["DATE"].dt.month <= 6]
            su_data = su_data[su_data["DATE"].dt.month <= 6]
            
        hourly_data.append(ho_data)
        daily_data.append(da_data)
        sun_data.append(su_data)
    
    hour_data = pd.concat(hourly_data)
    day_data = pd.concat(daily_data)
    day_sun_data = pd.concat(sun_data)
    
    return hour_data, day_data, day_sun_data

### Process all datasets

In [15]:
hourly_weather_data, daily_weather_data, daily_sun_data = all_weather_data()

In [16]:
uber_data = uber_data_read_clean_fill()

Cleaning data for Uber rides.


In [17]:
links = find_taxi_parquet_links()
taxi_data = all_taxi_data(links) # 加了个范围

Downloading parquet for 2015-01.
Cleaning data for 2015-01.
Parquet for 2015-01 is removed.
Downloading parquet for 2015-02.
Cleaning data for 2015-02.
Parquet for 2015-02 is removed.
Downloading parquet for 2015-03.
Cleaning data for 2015-03.
Parquet for 2015-03 is removed.
Downloading parquet for 2015-04.
Cleaning data for 2015-04.
Parquet for 2015-04 is removed.
Downloading parquet for 2015-05.
Cleaning data for 2015-05.
Parquet for 2015-05 is removed.
Downloading parquet for 2015-06.
Cleaning data for 2015-06.
Parquet for 2015-06 is removed.
Downloading parquet for 2014-01.
Cleaning data for 2014-01.
Parquet for 2014-01 is removed.
Downloading parquet for 2014-02.
Cleaning data for 2014-02.
Parquet for 2014-02 is removed.
Downloading parquet for 2014-03.
Cleaning data for 2014-03.
Parquet for 2014-03 is removed.
Downloading parquet for 2014-04.
Cleaning data for 2014-04.
Parquet for 2014-04 is removed.
Downloading parquet for 2014-05.
Cleaning data for 2014-05.
Parquet for 2014-05 

## Part 2: Storing Data
### Create database and tables

In [18]:
engine = db.create_engine(DATABASE)

In [19]:
# Define statements to create tables

Taxi_STMT = """
CREATE TABLE IF NOT EXISTS taxi_trip
(
    id INTEGER PRIMARY KEY,
    pickup_datetime DATE,
    pickup_latitude FLOAT64,
    pickup_longitude FLOAT64,
    dropoff_latitude FLOAT64,
    dropoff_longitude FLOAT64,
    tip_amount FLOAT64,
    distance FLOAT64
);
"""

Uber_STMT = """
CREATE TABLE IF NOT EXISTS uber_trip
(
    id INTEGER PRIMARY KEY,
    pickup_datetime DATE,
    pickup_latitude FLOAT64,
    pickup_longitude FLOAT64,
    dropoff_latitude FLOAT64,
    dropoff_longitude FLOAT64,
    distance FLOAT64
);
"""

Hourly_Weather_STMT = """
CREATE TABLE IF NOT EXISTS hourly_weather
(
    id INTEGER PRIMARY KEY,
    DATE DATE,
    HourlyWindSpeed FLOAT64,
    HourlyPrecipitation FLOAT64
);
"""

Daily_Weather_STMT = """
CREATE TABLE IF NOT EXISTS daily_weather
(
    id INTEGER PRIMARY KEY,
    DATE DATE,
    Precipitation FLOAT64,
    WindSpeed FLOAT64
);
"""

Sun_STMT = """
CREATE TABLE IF NOT EXISTS daily_sun
(
    id INTEGER PRIMARY KEY,
    DATE DATE,
    Sunrise STRING,
    Sunset STRING
);
"""

In [20]:
with engine.connect() as connection:
    connection.execute(Taxi_STMT)
    connection.execute(Uber_STMT)
    connection.execute(Hourly_Weather_STMT)
    connection.execute(Daily_Weather_STMT)
    connection.execute(Sun_STMT)

### Add cleaned data to database

In [21]:
# add data from the dataframes to corresponding SQL tables
taxi_data.to_sql("taxi_trip", engine, if_exists = "replace", index = False)
print("Taxi_data added.")
uber_data.to_sql("uber_trip", engine, if_exists = "replace", index = False)
print("Uber_data added.")
hourly_weather_data.to_sql("hourly_weather", engine, if_exists = "replace", index = False)
print("Hourly_weather_data added.")
daily_weather_data.to_sql("daily_weather", engine, if_exists = "replace", index = False)
print("Daily_weather_data added.")
daily_sun_data.to_sql("daily_sun", engine, if_exists = "replace", index = False)
print("Sun_data added.")

Taxi_data added.
Uber_data added.
Hourly_weather_data added.
Daily_weather_data added.
Sun_data added.


### Create a schema

In [22]:
with open(SCHEMA_FILE, "w") as f:
    f.write(Taxi_STMT)
    f.write(Uber_STMT)
    f.write(Hourly_Weather_STMT)
    f.write(Daily_Weather_STMT)
    f.write(Sun_STMT)

## Part 3: Understanding Data
First of all, for simplicity, define a function to store the query files.

In [23]:
def query_to_file(query: str, file_name: str) -> None:
    with open(file_name, "w") as f:
        f.write(query)

### Question 1
For 01-2009 through 06-2015, what hour of the day was the most popular to take a Yellow Taxi? The result should have 24 bins.

In [24]:
# define the query_file name for question 1
query_file_1 = "popular_hour_for_taxi.sql"

In [25]:
# write the query
QUERY_STMT_1 = """
SELECT strftime ("%H",pickup_datetime) AS hour, COUNT(*) AS counts
FROM taxi_trip
GROUP BY hour
ORDER BY counts DESC;
"""

In [26]:
# execute the query
engine.execute(QUERY_STMT_1).fetchall()

[('19', 12490),
 ('18', 12179),
 ('20', 11799),
 ('21', 11507),
 ('22', 11070),
 ('23', 10022),
 ('14', 9948),
 ('17', 9881),
 ('12', 9777),
 ('15', 9604),
 ('13', 9604),
 ('09', 9339),
 ('11', 9237),
 ('10', 9105),
 ('08', 9053),
 ('16', 8267),
 ('00', 7921),
 ('07', 7305),
 ('01', 5919),
 ('02', 4455),
 ('06', 4047),
 ('03', 3223),
 ('04', 2326),
 ('05', 1914)]

In [27]:
# write the query to file
query_to_file(QUERY_STMT_1, query_file_1)