In [2]:
import pandas as pd
import numpy as np
import requests
import geopandas as gpd
import zipfile
import matplotlib.pyplot as plt
import re
import os
import math
import warnings
from bs4 import BeautifulSoup
from sqlalchemy import create_engine
from keplergl import KeplerGl
warnings.filterwarnings('ignore')

ModuleNotFoundError: No module named 'requests'

In [2]:
# Constants
DATA_PATH = './data'
YELLOW_TAXI_URL = 'https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page'
ID_LOOKUP_URL = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zones.zip'

### Part 1: Data Preprocessing
#### Yellow Taxi Data

In [1]:
def get_id_lookup():
    """Download the Taxi Zones file and get a lookup for location ID.

    Returns:
        A dataframe contains location IDs and their coordinates.
    """
    zip_path = DATA_PATH + '/taxi_zones.zip'
    file_path = DATA_PATH + '/taxi_zones/'
    url_data = requests.get(ID_LOOKUP_URL).content
    with open(zip_path, 'wb') as f:
        f.write(url_data)
    zip_data = zipfile.ZipFile(zip_path, 'r')
    for f in zip_data.namelist():
        zip_data.extract(f, file_path)
    zip_data.close()
    id_lookup = gpd.read_file(file_path + '/taxi_zones.shp')
    id_lookup = id_lookup.to_crs(4326)  # EPSG:4326
    id_lookup['Lon'] = id_lookup['geometry'].map(lambda x: x.centroid.x)
    id_lookup['Lat'] = id_lookup['geometry'].map(lambda x: x.centroid.y)
    id_lookup = id_lookup[['LocationID', 'Lon', 'Lat']]
    return id_lookup

def get_taxi_urls():
    """Get urls of yellow taxi parquet data from 2009-01 to 2015-06.

    Returns:
        A list of urls.
    """
    rsp = requests.get(YELLOW_TAXI_URL)
    page_content = BeautifulSoup(rsp.content, 'lxml')
    urls = []
    for year_tab in page_content.find_all('div', {'id': re.compile('faq\d{4}')}):
        for month_tab in year_tab.find_all('a', {'title': 'Yellow Taxi Trip Records'}):
            url = month_tab.get('href')
            if url[-15:-8] >= '2009-01' and url[-15:-8] <= '2015-06':
                urls.append(url)
    return urls

def haversine(theta):
    """The Haversine function for caculating distance.

    Args:
        theta: A degree in radians.

    Returns:
        Value of the Haversine function at theta.
    """
    return math.sin(theta / 2.) ** 2

def get_distance(lon1, lat1, lon2, lat2):
    """Get the distance between two longitude and latitude coordinates.

    Args:
        lon1: Longitude of the first coordinate.
        lat1: Latitude of the first coordinate.
        lon2: Longitude of the second coordinate.
        lat2: Latitude of the second coordinate.

    Returns:
        Distance between two longitude and latitude coordinates.
    """
    lon1 = math.radians(lon1)
    lat1 = math.radians(lat1)
    lon2 = math.radians(lon2)
    lat2 = math.radians(lat2)
    hs = haversine(lat2 - lat1) + math.cos(lat1) * math.cos(lat2) * haversine(lon2 - lon1)
    return 2 * 6371 * math.asin(hs ** 0.5)

def preprocess_taxi_data(url, id_lookup):
    """Preprocess yellow taxi data in a month.

    Args:
        url: Url for the data.
        id_lookup: An dataframe contains location IDs and their coordinate.

    Returns:
        A yellow taxi trip dataframe after filtering and cleaning.
    """
    df = pd.read_parquet(url, engine='fastparquet')
    if 'PULocationID' in df.columns:
        df = pd.merge(df, id_lookup, left_on='PULocationID', right_on='LocationID')
        df = pd.merge(df, id_lookup, left_on='DOLocationID', right_on='LocationID', suffixes=('', '_DO'))
        df = df[['tpep_pickup_datetime', 'tip_amount', 'Lon', 'Lat', 'Lon_DO', 'Lat_DO']]
    elif 'pickup_longitude' in df.columns:
        df = df[['pickup_datetime', 'tip_amount', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']]
    else:
        df = df[['Trip_Pickup_DateTime', 'Tip_Amt', 'Start_Lon', 'Start_Lat', 'End_Lon', 'End_Lat']]
    # keep necessary columns
    df.columns = ['pickup_datetime', 'tip_amount', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']
    df.dropna(inplace=True)
    # filter geographical area
    df = df[(df.pickup_latitude.between(40.560445, 40.908524) & (df.dropoff_latitude.between(40.560445, 40.908524)))
        & (df.pickup_longitude.between(-74.242330,-73.717047)) & (df.dropoff_longitude.between(-74.242330,-73.717047))]
    # sample len(uber) / months rows
    df = df.sample(n=200000 // 78, replace=False)
    # compute distance between two coordinates
    df['straight_distance'] = list(map(get_distance, df['pickup_longitude'], df['pickup_latitude'], df['dropoff_longitude'], df['dropoff_latitude']))
    df.reset_index(drop=True, inplace=True)
    return df

def get_and_preprocess_taxi_data():
    """Download and preprocess all the yellow taxi data.

    Returns:
        Full yellow taxi trip dataframe after filtering and cleaning.
    """
    id_lookup = get_id_lookup()
    urls = get_taxi_urls()
    df_list = []
    # save tmp files in case of crashing
    if not os.path.exists(DATA_PATH + '/tmp'):
        os.makedirs(DATA_PATH + '/tmp')
    for url in urls:
        month = url[-15:-8]
        if month == '2009-12':
            df = preprocess_taxi_data(url, id_lookup)
            df.to_csv(DATA_PATH + '/tmp/' + month + '.csv', index=False)
            df_list.append(df)
            print(month + ' done.')
    full_df = pd.concat(df_list)
    full_df['pickup_datetime'] = pd.to_datetime(full_df['pickup_datetime'])
    full_df.sort_values(by='pickup_datetime', inplace=True)
    full_df.reset_index(drop=True, inplace=True)
    full_df.to_csv(DATA_PATH + '/taxi_df.csv', index=False)
    return full_df

### Uber Data

In [None]:
def get_and_preprocess_uber_data():
    """Download and preprocess the uber data.

    Returns:
        Uber dataframe after filtering and cleaning.
    """
    df = pd.read_csv('./data/uber_rides_sample.csv')
    # filter abnormal rows
    df = df[(df['passenger_count'] > 0) & (df['passenger_count'] < 10)]
    df = df[['pickup_datetime', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']]
    df['straight_distance'] = list(map(get_distance, df['pickup_longitude'], df['pickup_latitude'], df['dropoff_longitude'], df['dropoff_latitude']))
    df.dropna(inplace=True)
    # filter geographical area
    df = df[(df.pickup_latitude.between(40.560445, 40.908524) & (df.dropoff_latitude.between(40.560445, 40.908524)))
        & (df.pickup_longitude.between(-74.242330,-73.717047)) & (df.dropoff_longitude.between(-74.242330,-73.717047))]
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
    df.sort_values(by='pickup_datetime', inplace=True)
    df.reset_index(drop=True, inplace=True)
    return df

### Weather Data

In [3]:
def get_hourly_weather_data(df):
    """Preprocess weather data in hour intervals.

    Args:
        df: Raw weather dataframe.

    Returns:
        Hourly weather data after filtering and cleaning.
    """
    df = df[['DATE', 'HourlyPrecipitation', 'HourlyWindSpeed']].copy()
    df['DATE'] = pd.to_datetime(df['DATE']).dt.floor('h')
    df['HourlyPrecipitation'] = pd.to_numeric(df['HourlyPrecipitation'], errors='coerce')
    df['HourlyWindSpeed'] = pd.to_numeric(df['HourlyWindSpeed'], errors='coerce')
    df.drop_duplicates('DATE', inplace=True)
    df.reset_index(drop=True, inplace=True)
    df.columns = ['date', 'hourly_precipitation', 'hourly_wind_speed']
    return df

def get_daily_weather_data(df):
    """Preprocess weather data in day intervals.

    Args:
        df: Raw weather dataframe.

    Returns:
        Daily weather data after filtering and cleaning.
    """
    df = df[['DATE', 'DailyAverageWindSpeed']].copy()
    df['DATE'] = pd.to_datetime(df['DATE']).dt.floor('d')
    df['DailyAverageWindSpeed'] = pd.to_numeric(df['DailyAverageWindSpeed'], errors='coerce')
    df.dropna(inplace=True)
    df.drop_duplicates('DATE', inplace=True)
    df.reset_index(drop=True, inplace=True)
    df.columns = ['date', 'daily_average_wind_speed']
    return df

def get_and_preprocess_weather_data():
    """Download and preprocess the weather data.

    Returns:
        Hourly weather dataframe and daily weather dataframe.
    """
    hourly = []
    daily = []
    for f in os.listdir('./data/weather'):
        df = pd.read_csv('./data/weather/' + f)
        hourly.append(get_hourly_weather_data(df))
        daily.append(get_daily_weather_data(df))
    hourly_weather_df = pd.concat(hourly)
    hourly_weather_df.sort_values(by='date', inplace=True)
    hourly_weather_df.reset_index(drop=True, inplace=True)
    daily_weather_df = pd.concat(daily)
    daily_weather_df.sort_values(by='date', inplace=True)
    daily_weather_df.reset_index(drop=True, inplace=True)
    return hourly_weather_df, daily_weather_df