Load the Taxi Zones

In [None]:
def load_taxi_zones(shapefile:str='./taxi_zones.shp') -> gpd.GeoDataFrame :
    """
    Load the taxi zone shapefile into a GeoDataFrame

    Keyword Arguments:
    shapefile {str} -- the path to the shapefile (default: {'./taxi_zones.shp'})

    Returns:
    gpd.GeoDataFrame -- the GeoDataFrame containing the taxi zones
    """
    gdf = gpd.read_file(shapefile).to_crs(4326)
    gdf['latitude'] = gdf.geometry.centroid.y
    gdf['longitude'] = gdf.geometry.centroid.x
    gdf = gdf[['LocationID', 'latitude', 'longitude']]
    return gdf 

In [None]:
gdf = load_taxi_zones()

Calculate Sample Size

In [None]:
def calculate_sample_size(population,z=1.96, e=0.05) -> int:
    """
    calculate the sample size needed for a given population
    :param population: the size of the population
    :param z: the parameter of confidence level
    :param e: the margin of error

    :return: the sample size needed
    """
    n0 = z**2 * 0.5 * 0.5 / e**2
    n = n0 / (1 + (n0 - 1) / population)
    return int(n)

Common Functions

In [None]:
def convert_location_id_to_lat_lon(df:pd.DataFrame) -> pd.DataFrame:
    """
    convert the pickup and dropoff location id to latitude and longitude

    Keyword arguments:
    df -- the dataframe to be converted

    Returns:
    pd.DataFrame -- the dataframe with latitude and longitude columns
    """
    df = df.merge(gdf, left_on='PULocationID', right_on='LocationID', how='left').rename(columns={'latitude':'PULatitude', 'longitude':'PULongitude'}).drop(columns='LocationID')
    df = df.dropna(subset=['PULatitude', 'PULongitude'])
    df = df.merge(gdf, left_on='DOLocationID', right_on='LocationID', how='left').rename(columns={'latitude':'DOLatitude', 'longitude':'DOLongitude'}).drop(columns='LocationID')
    df = df.dropna(subset=['DOLatitude', 'DOLongitude'])
    return df

In [None]:
def filter_taxi_urls(all_urls:List[str]) -> List[str]:
    """
    filter the urls that contain the yellow taxi data

    Keyword arguments:
    all_urls -- the list of urls to be filtered

    Returns:
    List[str] -- the list of filtered urls
    """
    pattern = re.compile(r'(yellow)_tripdata_.*?(202[0-4])-([0-1][0-9])')
    result = []
    for url in all_urls:
        filename = url.split("/")[-1]
        match = pattern.match(filename)
        if match is not None:
            result.append(url)
    return result


def filter_urls(all_urls:List[str],re_pattern :str= '(fhvhv)_tripdata_.*?(202[0-4])-([0-1][0-9])') -> List[str]:
    """
    filter url with given pattern

    Keyword arguments:
    all_urls -- the list of urls to be filtered
    re_pattern -- the pattern to be matched

    Returns:
    List[str] -- the list of filtered urls
    """
    pattern = re.compile(f"{re_pattern}")
    result = []
    for url in all_urls:
        filename = url.split("/")[-1]
        match = pattern.match(filename)
        if match is not None:
            result.append(url)
    return result


Process Taxi Data

In [None]:
def remove_invalid_record_of_taxi_data(df:pd.DataFrame) -> pd.DataFrame:
    """
    remove the data points outside of NYC
    remove the data with a nan pick
    remove the distance less equal than 0
    
    Keyword arguments:

    Returns:
    pd.Dataframe -- the row if it is valid, None otherwise
    """
    valid = (
            (df['PULatitude'].between(NEW_YORK_BOX_COORDS[0][0], NEW_YORK_BOX_COORDS[1][0])) &
            (df['PULongitude'].between(NEW_YORK_BOX_COORDS[0][1], NEW_YORK_BOX_COORDS[1][1])) &
            (df['DOLatitude'].between(NEW_YORK_BOX_COORDS[0][0], NEW_YORK_BOX_COORDS[1][0])) &
            (df['DOLongitude'].between(NEW_YORK_BOX_COORDS[0][1], NEW_YORK_BOX_COORDS[1][1])) &
            (~df['tpep_pickup_datetime'].isna()) &
            (~df['tpep_dropoff_datetime'].isna()) &
            (df['trip_distance'] > 0)
        )
    
    return df[valid]

def get_and_clean_month(url:str)->pd.DataFrame:
    """
    download and clean the data from the given url

    Keyword arguements:
    url -- the url to download the data

    Returns:
    pd.DataFrame -- the cleaned dataframe
    """
    try:
        parquet_file = f"{url.split('/')[-1].strip()}"
        if os.path.exists(f"{PARQUET_DIR}/{parquet_file}"):
            df = pd.read_parquet(f"{PARQUET_DIR}/{parquet_file}")
        else:
            # wget = f"wget {url.strip()} -O {PARQUET_DIR}/{parquet_file}"
            # os.system(wget)

            response = requests.get(url.strip(), stream=True)
            response.raise_for_status()  # Raise an exception for HTTP errors
            file_path = os.path.join(PARQUET_DIR, parquet_file)
            # Save the content to the file
            with open(file_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            df = pd.read_parquet(f"{PARQUET_DIR}/{parquet_file}")
            
        n = calculate_sample_size(df.shape[0])
        df = df.sample(n)
        df = convert_location_id_to_lat_lon(df)
        df = remove_invalid_record_of_taxi_data(df)

        return df
    except Exception as e:
        raise


In [None]:
def get_and_clean_taxi_data(parquet_urls:List[str]) -> pd.DataFrame:
    """
    download and clean the data from the given urls

    Keyword arguements:
    parquet_urls -- the list of urls to download the data

    Returns:
    pd.DataFrame -- the cleaned dataframe
    """

    all_taxi_dataframes = []
    parquet_urls = filter_urls(parquet_urls,'(yellow)_tripdata_.*?(202[0-4])-([0-1][0-9])')
    for parquet_url in tqdm(parquet_urls):
        # maybe: first try to see if you've downloaded this exact
        # file already and saved it before trying again
        dataframe = get_and_clean_month(parquet_url)
        # maybe: if the file hasn't been saved, save it so you can
        # avoid re-downloading it if you re-run the function
        
        all_taxi_dataframes.append(dataframe)
        
    # create one gigantic dataframe with data from every month needed
    taxi_data = pd.concat(all_taxi_dataframes)
    return taxi_data

In [None]:
def get_all_urls_from_taxi_page(url:str) -> List[str]:
    """
    get all urls from the page of the given url

    Keyword arguments:
    url -- the url to get all urls from

    Returns:
    List[str] -- the list of all urls
    """
    response = requests.get(url)
    soup = bs4.BeautifulSoup(response.content, 'html.parser')
    urls = [a['href'] for a in soup.find_all('a', href=True)]
    return urls


def find_all_parquet_urls(urls:List[str]) -> List[str]:
    """
    filter the all parquet urls 

    Keyword arguments:
    urls -- the list of urls to be filtered

    Returns:
    List[str] -- the list of filtered urls
    """
    parquet_urls = [url for url in urls if 'parquet' in url]
    return parquet_urls

In [None]:
def get_taxi_data() ->pd.DataFrame:
    """ 
    get the taxi data from the TLC website
    
    Returns:
    pd.DataFrame -- the taxi data
    """
    if not os.path.exists(PARQUET_DIR):
        os.mkdir(PARQUET_DIR)
    all_urls = get_all_urls_from_taxi_page(TLC_URL)
    all_parquet_urls = find_all_parquet_urls(all_urls)
    taxi_data = get_and_clean_taxi_data(all_parquet_urls)
    return taxi_data

In [None]:
taxi_data_uncleaned = get_taxi_data()

Normalize Columns

In [None]:
taxi_data_cleaned = taxi_data_uncleaned[['tpep_pickup_datetime','tpep_dropoff_datetime',
                               'trip_distance','fare_amount','extra','mta_tax','improvement_surcharge',
                               'tolls_amount',
                               'PULatitude','PULongitude',
                               'DOLatitude','DOLongitude']]

taxi_data_cleaned['base_fare'] = taxi_data_cleaned['fare_amount'] 
taxi_data_cleaned['tax'] = taxi_data_cleaned['mta_tax'] + taxi_data_cleaned['extra']
taxi_data_cleaned['tolls'] = taxi_data_cleaned['tolls_amount']
taxi_data_cleaned['surcharge'] = taxi_data_cleaned['improvement_surcharge']
taxi_data_cleaned = taxi_data_cleaned.rename(columns={'tpep_pickup_datetime':'trip_pickup_datetime',
                                                      'tpep_dropoff_datetime':'trip_dropoff_datetime',
                                                      'trip_distance':'trip_miles',
                                                      'base_fare':'base_fare',
                                                      'tax':'tax',
                                                      'tolls':'tolls',
                                                      'PULatitude':'pickup_latitude',
                                                      'PULongitude':'pickup_longitude',
                                                      'DOLatitude':'dropoff_latitude',
                                                      'DOLongitude':'dropoff_longitude'})
taxi_data_cleaned = taxi_data_cleaned[['trip_pickup_datetime','trip_dropoff_datetime','trip_miles','base_fare','tax','tolls','surcharge',
                                       'pickup_latitude','pickup_longitude','dropoff_latitude','dropoff_longitude']]
taxi_data = taxi_data_cleaned

In [None]:
taxi_data.head()

In [None]:
taxi_data.info()

In [None]:
taxi_data.describe()