In [3]:
from datetime import datetime
from dateutil.relativedelta import relativedelta
import os

import pandas as pd
pd.set_option('display.max_columns', 30)
import requests

In [4]:
"""
1. Get the data from S3
2. Weather data transformation
3. Taxi trips data transformation
4. Update payment_type_master
5. Update company_master
6. Update taxi_trips with company and payment_type ids (replace the string values with ids)
7. Upload weather data to s3
8. Upload taxi data to s3
9. Upload the newest company_master and payment_type master

"""

'\n1. Get the data from S3\n2. Weather data transformation\n3. Taxi trips data transformation\n4. Update payment_type_master\n5. Update company_master\n6. Update taxi_trips with company and payment_type ids (replace the string values with ids)\n7. Upload weather data to s3\n8. Upload taxi data to s3\n9. Upload the newest company_master and payment_type master\n\n'

### Taxi trips transformation codes

In [5]:
current_datetime = datetime.now() - relativedelta(month=2)

formatted_datetime = current_datetime.strftime('%Y-%m-%d')

url = f"https://data.cityofchicago.org/resource/ajtu-isnz.json?$where=trip_start_timestamp >= '{formatted_datetime}T00:00:00' AND trip_start_timestamp <= '{formatted_datetime}T23:59:59'&$limit=30000"

headers = {"X-App-Token": os.environ.get("CHICAGO_API_TOKEN")}

response = requests.get(url)

data = response.json()

In [6]:
taxi_trips = pd.DataFrame(data)

In [7]:
taxi_trips.drop(['pickup_census_tract', 'dropoff_census_tract'], axis=1, inplace=True)
taxi_trips.drop(['pickup_centroid_location', 'dropoff_centroid_location'], axis=1, inplace=True)

taxi_trips.dropna(inplace=True)

taxi_trips.rename(columns={'pickup_community_area': 'pickup_community_area_id', 
                           'dropoff_community_area': 'dropoff_community_area_id'}, inplace=True)

taxi_trips['datetime_for_weather'] = pd.to_datetime(taxi_trips['trip_start_timestamp']).dt.floor('h')

In [8]:
taxi_trips.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,fare,tips,tolls,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_community_area_id,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather
1,f6b3515a3b49af20b9fd274ced7a55c1c81491d9,f760c301740afe577d6c0e2f6de2cfecccf6ede158b8a3...,2024-02-09T23:45:00.000,2024-02-10T00:00:00.000,1249,9.29,8,23.76,2.0,0,0,25.76,Mobile,Sun Taxi,41.899602111,-87.633308037,1,42.009622881,-87.670166857,2024-02-09 23:00:00
2,f33f1275c1ca28649f6f71d0444a93cfee30680c,e3a458804fe9298cd1ab49287e64a8a8354d9a0766f062...,2024-02-09T23:45:00.000,2024-02-10T00:00:00.000,1200,0.0,22,45.0,9.05,0,0,54.05,Credit Card,Taxi Affiliation Services,41.92276062,-87.699155343,77,41.9867118,-87.663416405,2024-02-09 23:00:00
3,f03f8999cc583750a821fffc043d895b2787eca9,2981edd199f55bdc8b5ed188de69e9f5d8116ae8d7a434...,2024-02-09T23:45:00.000,2024-02-10T00:00:00.000,960,11.5,38,29.75,0.0,0,0,29.75,Unknown,Taxi Affiliation Services,41.812948939,-87.617859676,53,41.673819904,-87.635739777,2024-02-09 23:00:00
4,e8b5fa8ecbcf76132eb1e4038e4307769d254b7f,a38d173f3d3a362a9622b11153393dced7724f53b9c20c...,2024-02-09T23:45:00.000,2024-02-10T00:00:00.000,293,0.65,8,5.5,2.0,0,1,9.0,Credit Card,Flash Cab,41.900221297,-87.629105186,8,41.900221297,-87.629105186,2024-02-09 23:00:00
5,e8786818c28b1a8bc0de935fdfd9faef9826acb9,d86670e36b21209fc6a376b3767b88caa2219ffceec14c...,2024-02-09T23:45:00.000,2024-02-10T00:00:00.000,1348,17.79,76,43.5,9.6,0,4,57.6,Mobile,Chicago Independents,41.97907082,-87.903039661,8,41.892507781,-87.626214906,2024-02-09 23:00:00


#### Taxi trips transformation function

In [9]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """Perform transformations with the taxi data.

    Parameters:
        taxi_trips (pd.DataFrame): The DataFrame holding the daily taxi trips.

    Returns:
        pd.DataFrame: The cleaned, transformed DataFrame holding the daily taxi trips.
    
    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError('taxi_trips is not a valid pandas DataFrame')

    taxi_trips.drop(['pickup_census_tract', 'dropoff_census_tract', 
                     'pickup_centroid_location', 'dropoff_centroid_location'], axis=1, inplace=True)

    taxi_trips.dropna(inplace=True)

    taxi_trips.rename(columns={'pickup_community_area': 'pickup_community_area_id', 
                            'dropoff_community_area': 'dropoff_community_area_id'}, inplace=True)

    taxi_trips['datetime_for_weather'] = pd.to_datetime(taxi_trips['trip_start_timestamp']).dt.floor('h')

    return taxi_trips

### Company update codes

In [10]:
company_map_table = taxi_trips['company'].drop_duplicates().reset_index(drop=True)

company_map_table = pd.DataFrame(
    {
        'company_id': range(1, len(company_map_table) + 1),
        'company': company_map_table
    }
)
company_map_table.tail()

Unnamed: 0,company_id,company
27,28,Petani Cab Corp
28,29,Setare Inc
29,30,5167 - 71969 5167 Taxi Inc
30,31,3556 - 36214 RC Andrews Cab
31,32,2733 - 74600 Benny Jona


In [11]:
new_company_data = [
    {'company': 'Setare Inc'},
    {'company': 'X'},
    {'company': 'Y'}
]

new_company_mapping = pd.DataFrame(new_company_data)
new_company_mapping

Unnamed: 0,company
0,Setare Inc
1,X
2,Y


In [12]:
company_max_id = company_map_table['company_id'].max()

In [13]:

new_companies_list = [company for company in new_company_mapping['company'].values if company not in company_map_table['company'].values]
new_companies_list

['X', 'Y']

In [14]:
new_companies_df = pd.DataFrame({
    'company_id': range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
    'company': new_companies_list

})
new_companies_df

Unnamed: 0,company_id,company
0,33,X
1,34,Y


In [15]:
updated_company_map_table = pd.concat([company_map_table, new_companies_df], ignore_index=True)
updated_company_map_table.tail()

Unnamed: 0,company_id,company
29,30,5167 - 71969 5167 Taxi Inc
30,31,3556 - 36214 RC Andrews Cab
31,32,2733 - 74600 Benny Jona
32,33,X
33,34,Y


In [16]:
def update_company_master(taxi_trips: pd.DataFrame, company_map_table: pd.DataFrame) -> pd.DataFrame:
    """Extend the company map table with new companies if there are new companies.

    Args:
        taxi_trips (pd.DataFrame): DataFrame holding the daily taxi trips.
        company_map_table (pd.DataFrame): DataFrame holding the company master data.

    Returns:
        pd.DataFrame: The updated company master data.
    """
    
    company_max_id = company_map_table['company_id'].max()

    new_companies_list = [company for company in taxi_trips['company'].values if company not in company_map_table['company'].values]

    new_companies_df = pd.DataFrame({
    'company_id': range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
    'company': new_companies_list

    })
    
    updated_company_map_table = pd.concat([company_map_table, new_companies_df], ignore_index=True)

    return updated_company_map_table

### Payment_type master codes

In [17]:
payment_type_map_table = taxi_trips['payment_type'].drop_duplicates().reset_index(drop=True)

payment_type_map_table = pd.DataFrame(
    {
        'payment_type_id': range(1, len(payment_type_map_table) + 1),
        'payment_type': payment_type_map_table
    }
)

taxi_trips_payment_type_only = pd.DataFrame({
    'payment_type_id': [1, 2, 3],
    'payment_type': ['Credit Card', 'X', 'Y']
})
taxi_trips_payment_type_only

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,X
2,3,Y


In [18]:
def update_payment_type_master(taxi_trips: pd.DataFrame, payment_type_map_table: pd.DataFrame) -> pd.DataFrame:
    """Extend the payment type map table with new payment types if there are new payment types.

    Args:
        taxi_trips (pd.DataFrame): DataFrame holding the daily taxi trips.
        payment_type_map_table (pd.DataFrame): DataFrame holding the payment type master data.

    Returns:
        pd.DataFrame: The updated payment type master data.
    """
    
    payment_type_max_id = payment_type_map_table['company_id'].max()

    new_payment_types_list = [payment_type for payment_type in taxi_trips['payment_type'].values if payment_type not in payment_type_map_table['payment_type'].values]

    new_payment_types_df = pd.DataFrame({
    'payment_type_id': range(payment_type_max_id + 1, payment_type_max_id + len(new_payment_types_list) + 1),
    'payment_type': new_payment_types_list

    })
    
    updated_payment_type_map_table = pd.concat([payment_type_map_table, new_payment_types_df], ignore_index=True)

    return updated_payment_type_map_table

### Creating general update master table function

In [19]:
def update_master(taxi_trips: pd.DataFrame, map_table: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """Extend the map table with new values if there are new values.

    Args:
        taxi_trips (pd.DataFrame): DataFrame holding the daily taxi trips.
        map_table (pd.DataFrame): DataFrame holding the master data.
        id_column (str): The id column of map table.
        value_column (str): Name of the column in map table containing the values.

    Returns:
        pd.DataFrame: The updated master data.
    """
    
    max_id = map_table[id_column].max()

    # new_values_list = [value for value in taxi_trips[value_column].values if value not in map_table[value_column].values]
    new_values_list = list(set(taxi_trips[value_column].values) - set(map_table[value_column].values))

    new_values_df = pd.DataFrame({
    id_column: range(max_id + 1, max_id + len(new_values_list) + 1),
    value_column: new_values_list

    })
    
    updated_map_table = pd.concat([map_table, new_values_df], ignore_index=True)

    return updated_map_table

### Update taxi trips with the most recent company and payment type map table function

In [23]:
def update_taxi_trips_wit_map_table(taxi_trips: pd.DataFrame, company_map_table: pd.DataFrame, payment_type_map_table: pd.DataFrame) -> pd.DataFrame:
    """Update the taxi trips DataFrame with company and payment type map table ids, and delete the string columns.

    Args:
        taxi_trips (pd.DataFrame): DataFrame holding the daily taxi trips.
        company_map_table (pd.DataFrame): The comapny map table.
        payment_type_map_table (pd.DataFrame): The payment type map table.

    Returns:
        pd.DataFrame: The taxi trips data with only payment type id and company id, with no company and payment type values.
    """
    taxi_trips_id = taxi_trips.merge(company_map_table, on='company')
    taxi_trips_id = taxi_trips_id.merge(payment_type_map_table, on='payment_type')

    taxi_trips_id.drop(['company', 'payment_type'], axis=1, inplace=True)

    return taxi_trips_id

### Weather transformation function

In [None]:

def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """Make transformations on daily weather api response.

    Args:
        weather_data (json): The daily weather data from Open-Meteo API.

    Returns:
        pd.DataFrame: A DataFrame representation of the data.
    """

    weather_data_filtered = {
        'datetime': weather_data['hourly']['time'],
        'temperature': weather_data['hourly']['temperature_2m'],
        'wind_speed': weather_data['hourly']['wind_speed_10m'],
        'rain': weather_data['hourly']['rain'],
        'precipitation': weather_data['hourly']['precipitation']
    }

    weather_df = pd.DataFrame(weather_data_filtered)
    weather_df['datetime'] = pd.to_datetime(weather_df['datetime'] )

    return weather_df