# Understanding Hired Rides in NYC

_[Project prompt](https://docs.google.com/document/d/1uAUJGEUzfNj6OsWNAimnYCw7eKaHhMUfU1MTj9YwYw4/edit?usp=sharing), [grading rubric](https://docs.google.com/document/d/1hKuRWqFcIdhOkow3Nljcm7PXzIkoa9c_aHkMKZDxWa0/edit?usp=sharing)_

_This scaffolding notebook may be used to help setup your final project. It's **totally optional** whether you make use of this or not._

_If you do use this notebook, everything provided is optional as well - you may remove or add prose and code as you wish._

_**All code below should be consider "pseudo-code" - not functional by itself, and only an outline to help you with your own approach.**_

## Group 10 
### Yixuan (Sharon) Qian - yq2348
### Michelle Jingyi Zhou - jz3508

## Project Setup

In [89]:
# all import statements needed for the project

import math
import os

import bs4 
from bs4 import BeautifulSoup
import matplotlib.pyplot as plt
import pandas as pd
import requests
import sqlalchemy as db
import numpy as np
import re
import os.path
import glob
import geopandas as gpd

import warnings
warnings.filterwarnings("ignore")

In [90]:
# any constants we might need

TAXI_URL = "https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

TAXI_ZONES_DIR = "data/taxi_zones"
TAXI_ZONES_SHAPEFILE = f"{TAXI_ZONES_DIR}/taxi_zones.shp"
UBER_CSV = "uber_rides_sample.csv"
WEATHER_CSV_FILES = ["2009_weather.csv", "2010_weather.csv", "2011_weather.csv", "2012_weather.csv",
                    "2013_weather.csv", "2014_weather.csv", "2015_weather.csv"]

EARTH_RADIUS = 6378.137
CRS = 4326  # coordinate reference system

# (lat, lon)
NEW_YORK_BOX_COORDS = ((40.560445, -74.242330), (40.908524, -73.717047))
LGA_BOX_COORDS = ((40.763589, -73.891745), (40.778865, -73.854838))
JFK_BOX_COORDS = ((40.639263, -73.795642), (40.651376, -73.766264))
EWR_BOX_COORDS = ((40.686794, -74.194028), (40.699680, -74.165205))

DATABASE_URL = "sqlite:///project.db"
DATABASE_SCHEMA_FILE = "schema.sql"
QUERY_DIRECTORY = "queries"

In [91]:
# Make sure the QUERY_DIRECTORY exists
try:
    os.mkdir(QUERY_DIRECTORY)
except Exception as e:
    if e.errno == 17:
        # the directory already exists
        pass
    else:
        raise

## Part 1: Data Preprocessing

Overview: For Part 1, we downloaded the Parquet files, cleaned and filtered for the relevant data, filling in missing data, and generating samples of these datasets.

### Calculate distance

1.rad(d) function converts numeric degrees to radians

2.distance calculation function
calculate_distance_with_coords(from_coord, to_coord) calculates the distance btween coordinates

3.add_distance_column(dataframe)

Add a column call_distance to the dataframe

In [92]:
# This function converts numeric degrees to radians
# d is diameter
def rad(d):
    return d * math.pi / 180.0

In [93]:
def calculate_distance_with_coords(from_coord, to_coord):
    rad_lat1 = rad(from_coord['pickup_latitude'])
    rad_lon1 = rad(from_coord['pickup_longitude'])
    rad_lat2 = rad(to_coord['dropoff_longitude'])
    rad_lon2 = rad(to_coord['dropoff_latitude'])
    a = rad_lat1 - rad_lat2
    b = rad_lon1 - rad_lon2
    distance_radius = 2 * math.asin(
        math.sqrt(math.pow(math.sin(a / 2), 2)+ math.cos(rad_lat1) * math.cos(rad_lat2) * math.pow(math.sin(b / 2), 2)))
    distance = distance_radius * EARTH_RADIUS
    return distance

In [117]:
def add_distance_column(df):
    """
    Add a column with name 'cal_distance' to the dataframe
    
    """
    # Filter rows with valid coordinates within the NY box
    valid_rows = (
        (df["pickup_latitude"] > NEW_YORK_BOX_COORDS[0][0])
        & (df["pickup_latitude"] < NEW_YORK_BOX_COORDS[1][0])
        & (df["dropoff_latitude"] > NEW_YORK_BOX_COORDS[0][0])
        & (df["dropoff_latitude"] < NEW_YORK_BOX_COORDS[1][0])
        & (df["pickup_longitude"] > NEW_YORK_BOX_COORDS[0][1])
        & (df["pickup_longitude"] < NEW_YORK_BOX_COORDS[1][1])
        & (df["dropoff_longitude"] > NEW_YORK_BOX_COORDS[0][1])
        & (df["dropoff_longitude"] < NEW_YORK_BOX_COORDS[1][1])
    )
    # Create new dataframe with valid rows we filtered
    valid_df = df[valid_rows].copy()
    
    # Get pickup & dropoff coordinates in valid rows
    from_coord = valid_df[['pickup_latitude', 'pickup_longitude']]
    to_coord = valid_df[['dropoff_latitude', 'dropoff_longitude']]
    
    # Calculate distance for valid rows
    valid_df['cal_distance'] = calculate_distance_with_coords(from_coord, to_coord)
    
    # Merge valid_df back into the original dataframe
    final_df = df.merge(valid_df[['cal_distance']], left_index=True, right_index=True, how='left')

    return final_df

### Use Taxi Zones Shapefile to Convert to Coordinates

1. get_latlon_from_locationID():

We load taxi zones shapefile

2. convert_id_to_latlon(sample_tables)

we convert area ID column into  coordinates

In [95]:
#Load taxi zones from a shapefile and add new columns.

def get_latlon_from_locationID():
    # Read the taxi zone shapefile and convert it to CRS
    gdf = geopandas.read_file("taxi_zones.shp")
    gdf = gdf.to_crs(CRS)
    
    # Get the lon and lat of the centroid of each  zone
    lon = gdf.centroid.x
    lat = gdf.centroid.y
    
    # Add new columns to the dataframe to store the lon and lat
    gdf["lon"] = lon
    gdf["lat"] = lat
    return gdf

In [121]:
def convert_id_to_latlon(sample_tables):
    """
    Convert area ID column into two coordinates
    """
    gdf = get_latlon_from_locationID()

    def get_coords(location_id):
        if location_id < 264 and location_id in gdf["LocationID"].values:
            row = gdf[gdf["LocationID"] == location_id][0]
            lon, lat = row["lon"], row["lat"]

            if (
                NEW_YORK_BOX_COORDS[0][0] < lat < NEW_YORK_BOX_COORDS[1][0]
                and NEW_YORK_BOX_COORDS[0][1] < lon < NEW_YORK_BOX_COORDS[1][1]
            ):
                return lon, lat

        return None, None

    start_coords = samples_df["PULocationID"].apply(get_coords)
    end_coords = samples_df["DOLocationID"].apply(get_coords)

    samples_df["pickup_longitude"] = [coord[0] for coord in start_coords]
    samples_df["pickup_latitude"] = [coord[1] for coord in start_coords]
    samples_df["dropoff_longitude"] = [coord[0] for coord in end_coords]
    samples_df["dropoff_latitude"] = [coord[1] for coord in end_coords]

### Process Taxi Data

1. this function programmatically downloads the Yellow Taxi Parquet files for a specific date range 2009-01 and 2015-06 from the website. it returns a list that contains all taxi data url in TAXI_URL ""https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

2.


3. we added latitude and logitude from taxi_zones. We also added pickup_latitude, dropoff_latitude, dropoff_latitude and dropoff_longitude as columns to the dataframe for convenient calculation

4. Download Parquet files, get some sample from these files, Clean the dataframe according to existing location IDs, Write data into .csv and return dataframe

5. get_and_clean_taxi_data
Get taxi data. If taxi.csv exists, read-only. Otherwise, download data and generate taxi.csv file

In [97]:
def get_taxi_parquet_urls():

    parquet_urls = []
    response = requests.get(url=TAXI_URL)
    if response.status_code == 200:
        soup = BeautifulSoup(response.text, 'html.parser')
        links = soup.find_all('a', href=True)
        for link in links:
            url = link['href']
            if 'yellow_tripdata' in url and '.parquet' in url:
                date_str = url.split('/')[-1].split('_')[-1].split('.')[0]
                year = int(date_str[:4])
                month = int(date_str[4:6])
                if year < 2015 or (year == 2015 and month <= 6):
                    parquet_urls.append(url)
    return parquet_urls

In [98]:
def process_datetime(df):
    """
    Change column type for date column and add columns for specific time data

    Arguments:
    df -- a dataframe with time data column

    """
    if "tpep_pickup_datetime" in df.columns:
        samples_df['tpep_pickup_datetime'] = pd.to_datetime(samples_df['tpep_pickup_datetime'])
        samples_df['tpep_dropoff_datetime'] = pd.to_datetime(samples_df['tpep_dropoff_datetime'])

        datetime_columns = {
            'DATE': samples_df['tpep_pickup_datetime'],
            'YEAR': samples_df['tpep_pickup_datetime'].dt.year.astype(int),
            'MONTH': samples_df['tpep_pickup_datetime'].dt.month.astype(int),
            'DAY': samples_df['tpep_pickup_datetime'].dt.day.astype(int),
            'HOUR': samples_df['tpep_pickup_datetime'].dt.hour.astype(int),
            'WEEK': samples_df['tpep_pickup_datetime'].dt.dayofweek + 1  # 0-6 to 1-7
        }
    else:
        datetime_columns = {
            'tpep_pickup_datetime': None,
            'YEAR': None,
            'MONTH': None,
            'DAY': None,
            'HOUR': None,
            'WEEK': None
        }

    samples_df = samples_df.assign(**datetime_columns) 

In [99]:
def get_and_clean_month_taxi_data(url):
    """
    Download, read, sample, and clean one month of taxi data

    Arguments:
    url -- a url for downloading a specific month's taxi parquet file

    Returns:
    cleaned_data -- a DataFrame containing cleaned taxi data for one month
    """
    file_name = url.split("/")[-1]

    if not os.path.exists(file_name):
        response = requests.get(url, stream=True)

        with open(file_name, 'wb') as f:
            for chunk in response.iter_content(chunk_size=1024):
                f.write(chunk)

    columns_by_year = {
        '2011_2015': ["tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "pickup_longitude",
                  "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "tip_amount"],
        '2010': ["pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance", "pickup_longitude", "pickup_latitude",
               "dropoff_longitude", "dropoff_latitude", "tip_amount"],
        '2009': ["Trip_Pickup_DateTime", "Trip_Dropoff_DateTime", "Passenger_Count", "Trip_Distance", "Start_Lon", "Start_Lat",
               "End_Lon", "End_Lat", "Tip_Amt"]
    }

    raw_data = pd.read_parquet(file_name)
    raw_data = raw_data.sample(20000)
    raw_data.reset_index(inplace=True)

    year_key = '2011_2015' if not re.search(r"2009|2010", file_name) else '2010' if re.search(r"2010", file_name) else '2009'

    if year_key == '2011_2015':
        convert_id_to_latlon(raw_data)

    cleaned_data = raw_data[columns_by_year[year_key]]
    unified_column_names = {columns_by_year[year_key][i]: columns_by_year['2011_2015'][i] for i in range(len(columns_by_year[year_key]))}
    cleaned_data.rename(columns=unified_column_names, inplace=True)

    cleaned_data[columns_by_year['2011_2015'][:8]] = cleaned_data[columns_by_year['2011_2015'][:8]].replace(0.0, None)
    cleaned_data.dropna(inplace=True)
    
    #generate a sampling of Taxi data that's roughly equal to the Uber dataset
    #Uber, 200,000/78=2564. 
    cleaned_data = cleaned_data.sample(2564)
    cleaned_data.reset_index(inplace=True)

    process_datetime(cleaned_data)
    add_distance_column(cleaned_data)
    cleaned_data.drop(["index"], axis=1, inplace=True)

    return cleaned_data


In [100]:
def get_and_clean_taxi_data():
    """
    Get and clean taxi data from 2009-01 to 2015-06
    
    Returns:
    taxi_data -- a dataframe contains all taxi data from 2009-01 to 2015-06
    
    """
    all_taxi_dataframes = []
    
    all_parquet_urls = find_taxi_parquet_urls()
    for parquet_url in all_parquet_urls:

        dataframe = get_and_clean_month_taxi_data(parquet_url)
        
        all_taxi_dataframes.append(dataframe)

    # create one gigantic dataframe with data from every month needed
    taxi_data = pd.concat(all_taxi_dataframes)
    return taxi_data

In [101]:
def add_lat_log_column(dataframe):
    
    print('add_lat_log_column')
    dftaxi = gpd.read_file('taxi_zones.shp')
    dftaxi = dftaxi.to_crs(CRS)
    
    lat1=[]
    lon1=[]
    lat2=[]
    lon2=[]
    for LocationID in dataframe["PULocationID"]:
        lat=dftaxi[dftaxi["LocationID"]==LocationID].geometry.centroid.x
        lon=dftaxi[dftaxi["LocationID"]==LocationID].geometry.centroid.y
        if lat.empty:
            lat1.append(0)
        else:
            lat1.append(lat[0])
        if lon.empty:
            lon1.append(0)
        else:
            lon1.append(log[0])
    
    for LocationID in dataframe["DOLocationID"]:
        lat=dftaxi[dftaxi["LocationID"]==LocationID].geometry.centroid.x
        lon=dftaxi[dftaxi["LocationID"]==LocationID].geometry.centroid.y
        if lat.empty:
            lat2.append(0)
        else:
            lat2.append(lat[0])
        if log.empty:
            lon2.append(0)
        else:
            lon2.append(lon[0])
    dataframe['pickup_latitude']=lat1
    dataframe['pickup_longitude']=lon1
    dataframe['dropoff_latitude']=lat2
    dataframe['dropoff_longitude']=lon2
    dataframe.to_csv("2.csv")

In [102]:
def get_and_clean_month(url):
    
    reponse = requests.get(url)

    filename=url.split('/')[-1]
    with open(filename, "wb") as f:
        f.write(reponse.content)
    
    df = pd.read_parquet(filename)
    print(filename)
    print(df.columns)
    df = df.sample(n=sample_size,ignore_index=True)
    try:
        if "PULocationID" in df.columns:
            add_latlog_column(df)
        df = df[columns]
    except:
        try:
            df = df[columns2]
        except:
            try:
                df = df[columns3]
            except:
                add_latlog_column(df)
                df = df[columns2]
    df.columns = columns2
    df = df.sample(n=sample_size,random_state = 1,ignore_index=True)
    return df

In [103]:
def get_and_clean_taxi_data():
    all_taxi_dataframes = []
    
    all_parquet_urls = get_taxi_parquet_urls()
    
    for parquet_url in all_parquet_urls:
        
        dataframe = get_and_clean_month(parquet_url)
        
        # add a new column to calculate the distance of the taxi ride using latitude and longitude information
        add_distance_column(dataframe)
        
        all_taxi_dataframes.append(dataframe)
    
    # create one gigantic dataframe with data from every month needed
    taxi_data = pd.concat(all_taxi_dataframes)
    return taxi_data


In [104]:
taxi_data = get_and_clean_taxi_data()

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.

### Processing Uber Data
1. load_and_clean_uber_data

this function load data from Uber csv file, and add columns to the pd df 
and Load data from input file and add date columns to it

2. 

In [118]:
def load_and_clean_uber_data(csv_file):
    """
    Load data from input file and add date columns to it  
    
    Arguments:
    csv_file -- file name containing the data
    
    Returns:
    pd_data -- a dataframe that contains cleaned data from input file
    
    """
    # Load data from Uber CSV file
    pd_df = pd.read_csv(csv_file)
    
    # Convert pickup datetime column to pd datetime format
    pd_df['pickup_datetime'] = pd.to_datetime(pd_df['pickup_datetime'])
    
    # get year, month, week, day, hour from pickup datetime column
    pd_df['YEAR'] = pd_df['pickup_datetime'].dt.year.astype(int)
    pd_df['MONTH'] = pd_df['pickup_datetime'].dt.month.astype(int)
    pd_df['WEEK'] = pd_df['pickup_datetime'].dt.dayofweek + 1
    pd_df['DAY'] = pd_df['pickup_datetime'].dt.day.astype(int)
    pd_df['HOUR'] = pd_df['pickup_datetime'].dt.hour.astype(int)
    
    pd_df = pd_df.reset_index(drop=True)
    
    return pd_df

In [119]:
def get_uber_data():
    """
    Load and clean Uber data, and add a column for distance in miles
    
    Returns:
    uber_dataframe -- a dataframe containing cleaned and processed Uber data
    
    """
    uber_dataframe = load_and_clean_uber_data(UBER_CSV)
    
    valid_columns = ["fare_amount", "pickup_datetime", "pickup_longitude", "pickup_latitude",
                     "dropoff_longitude", "dropoff_latitude"]
    
    # put 0.0 values in valid columns with NaN
    uber_dataframe[valid_columns] = uber_dataframe[valid_columns].replace(0.0, np.nan)
    
    # Drop rows with NaN values in valid columns
    uber_dataframe.dropna(subset=valid_columns, inplace=True)
    uber_dataframe = uber_dataframe.reset_index(drop=True)
    
    # Add a new column for the distance traveled during each Uber ride
    add_distance_column(uber_dataframe)
    
    valid_columns.append("cal_distance")
    valid_columns.remove("fare_amount")
    
    # Drop rows with NaN values
    uber_dataframe.dropna(subset=valid_columns, inplace=True)
    uber_dataframe = uber_dataframe.drop(["key", "fare_amount"], axis=1)

    return uber_dataframe

In [122]:
uber_data = get_uber_data()

TypeError: cannot convert the series to <class 'float'>

In [None]:
uber_data.head()

### Processing Weather Data

In [None]:
def get_all_weather_csvs(directory):
    raise NotImplementedError()

In [None]:
def clean_month_weather_data_hourly(csv_file):
    raise NotImplementedError()

In [None]:
def clean_month_weather_data_daily(csv_file):
    raise NotImplementedError()

In [None]:
def load_and_clean_weather_data():
    weather_csv_files = get_all_weather_csvs(WEATHER_CSV_DIR)
    
    hourly_dataframes = []
    daily_dataframes = []
        
    for csv_file in weather_csv_files:
        hourly_dataframe = clean_month_weather_data_hourly(csv_file)
        daily_dataframe = clean_month_weather_data_daily(csv_file)
        hourly_dataframes.append(hourly_dataframe)
        daily_dataframes.append(daily_dataframe)
        
    # create two dataframes with hourly & daily data from every month
    hourly_data = pd.concat(hourly_dataframes)
    daily_data = pd.concat(daily_dataframes)
    
    return hourly_data, daily_data

In [None]:
hourly_weather_data, daily_weather_data = load_and_clean_weather_data()

In [None]:
hourly_weather_data.head()

In [None]:
daily_weather_data.head()

## Part 2: Storing Cleaned Data

In [None]:
engine = db.create_engine(DATABASE_URL)

In [None]:
# if using SQL (as opposed to SQLAlchemy), define the commands 
# to create your 4 tables/dataframes
HOURLY_WEATHER_SCHEMA = """
TODO
"""

DAILY_WEATHER_SCHEMA = """
TODO
"""

TAXI_TRIPS_SCHEMA = """
TODO
"""

UBER_TRIPS_SCHEMA = """
TODO
"""

In [None]:
# create that required schema.sql file
with open(DATABASE_SCHEMA_FILE, "w") as f:
    f.write(HOURLY_WEATHER_SCHEMA)
    f.write(DAILY_WEATHER_SCHEMA)
    f.write(TAXI_TRIPS_SCHEMA)
    f.write(UBER_TRIPS_SCHEMA)

In [None]:
# create the tables with the schema files
with engine.connect() as connection:
    pass

### Add Data to Database

In [None]:
def write_dataframes_to_table(table_to_df_dict):
    raise NotImplemented()

In [None]:
map_table_name_to_dataframe = {
    "taxi_trips": taxi_data,
    "uber_trips": uber_data,
    "hourly_weather": hourly_data,
    "daily_weather": daily_data,
}

In [None]:
write_dataframes_to_table(map_table_name_to_dataframe)

## Part 3: Understanding the Data

In [None]:
# Helper function to write the queries to file
def write_query_to_file(query, outfile):
    raise NotImplementedError()

### Query 1

In [None]:
QUERY_1_FILENAME = ""

QUERY_1 = """
TODO
"""

In [None]:
engine.execute(QUERY_1).fetchall()

In [None]:
write_query_to_file(QUERY_1, QUERY_1_FILENAME)

## Part 4: Visualizing the Data

### Visualization 1

In [None]:
# use a more descriptive name for your function
def plot_visual_1(dataframe):
    figure, axes = plt.subplots(figsize=(20, 10))
    
    values = "..."  # use the dataframe to pull out values needed to plot
    
    # you may want to use matplotlib to plot your visualizations;
    # there are also many other plot types (other 
    # than axes.plot) you can use
    axes.plot(values, "...")
    # there are other methods to use to label your axes, to style 
    # and set up axes labels, etc
    axes.set_title("Some Descriptive Title")
    
    plt.show()

In [None]:
def get_data_for_visual_1():
    # Query SQL database for the data needed.
    # You can put the data queried into a pandas dataframe, if you wish
    raise NotImplementedError()

In [None]:
some_dataframe = get_data_for_visual_1()
plot_visual_1(some_dataframe)