In [2]:
import json, os, requests
from dateutil.relativedelta import relativedelta 
from datetime import datetime
import pandas as pd
pd.set_option('display.max_columns', 30)

In [3]:
'''
1. get the data from S3
2. weather_data tranformations
3. taxi_trips transformations
4. update payment_type_master
5. update payment_type_master
6. update taxi_trips with payment_type and payment_type ids (replace the string values with ids from the latest master tables)
7. upload weather_data to S3
8. upload taxi data to S3
9. upload the newest payment_type_master and payment_type_master
'''

'\n1. get the data from S3\n2. weather_data tranformations\n3. taxi_trips transformations\n4. update payment_type_master\n5. update company_master\n6. update taxi_trips with company and payment_type ids (replace the string values with ids from the latest master tables)\n7. upload weather_data to S3\n8. upload taxi data to S3\n9. upload the newest payment_type_master and company_master\n'

### taxi_trips transformation codes

In [4]:
current_datetime = datetime.now() - relativedelta(months=2)

formatted_datetime = current_datetime.strftime('%Y-%m-%d')


url = f"https://data.cityofchicago.org/resource/ajtu-isnz.json?$where=trip_start_timestamp >= '{formatted_datetime}T00:00:00.000' AND trip_start_timestamp <= '{formatted_datetime}T23:59:59.000'&$limit=30000"
headers = {'X-App-Token': os.environ.get('CHICAGO_API_TOKEN')}

response = requests.get(url, headers=headers, verify=False)

data = response.json()



In [5]:
taxi_trips = pd.DataFrame(data)

In [6]:
taxi_trips.drop(['pickup_census_tract','dropoff_census_tract', 'pickup_centroid_location', 'dropoff_centroid_location'], axis=1, inplace=True) 

In [7]:
taxi_trips.dropna(inplace=True)

In [8]:
taxi_trips.rename(columns={
    'pickup_community_area': 'pickup_community_area_id',
    'dropoff_community_area': 'dropoff_community_area_id'},
    inplace=True
)

In [9]:
taxi_trips['datetime_for_weather'] = pd.to_datetime(taxi_trips['trip_start_timestamp']).dt.floor('h')

In [10]:
taxi_trips["datetime_for_weather"] = taxi_trips['trip_start_timestamp'].dt.floor('h')

AttributeError: Can only use .dt accessor with datetimelike values

#### taxi_trips transformation function


In [11]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """
    Perform transformations on a DataFrame containing taxi trip data.

    Parameters:
    - taxi_trips (pd.DataFrame): A pandas DataFrame containing taxi trip data.

    Returns:
    - pd.DataFrame: A DataFrame with the following transformations applied:
        - Columns 'pickup_census_tract', 'dropoff_census_tract', 'pickup_centroid_location',
          and 'dropoff_centroid_location' are dropped.
        - Rows with missing values are dropped.
        - Columns 'pickup_community_area' and 'dropoff_community_area' are renamed to
          'pickup_community_area_id' and 'dropoff_community_area_id' respectively.
        - A new column 'datetime_for_weather' is created, containing the hourly timestamp
          of the 'trip_start_timestamp' column.
    """

    # Error handling:
    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError('taxi_trips is not a valid pandas DataFrame')
    
    columns_to_drop = ['pickup_census_tract', 'dropoff_census_tract', 'pickup_centroid_location', 'dropoff_centroid_location']
    existing_columns = set(taxi_trips.columns)
    for column in columns_to_drop:
        if column not in existing_columns:
            print(f"Warning: Column '{column}' not found in DataFrame.")

    try:
        taxi_trips['datetime_for_weather'] = pd.to_datetime(taxi_trips['trip_start_timestamp']).dt.floor('h')
    except ValueError as e:
        print(f"Error converting timestamps: {e}")

    try:
        taxi_trips.drop(['pickup_census_tract', 'dropoff_census_tract', 'pickup_centroid_location', 'dropoff_centroid_location'], axis=1, inplace=True)
        taxi_trips.dropna(inplace=True)
        taxi_trips.rename(columns={'pickup_community_area': 'pickup_community_area_id', 'dropoff_community_area': 'dropoff_community_area_id'}, inplace=True)
        taxi_trips['datetime_for_weather'] = pd.to_datetime(taxi_trips['trip_start_timestamp']).dt.floor('h')
    except Exception as e:
        print(f"An error occurred during data transformation: {e}")
    
    
    # def:
    taxi_trips.drop(['pickup_census_tract','dropoff_census_tract', 'pickup_centroid_location', 'dropoff_centroid_location'], axis=1, inplace=True)
    
    taxi_trips.dropna(inplace=True)
    
    taxi_trips.rename(columns={
        'pickup_community_area': 'pickup_community_area_id',
        'dropoff_community_area': 'dropoff_community_area_id'},
        inplace=True
        )
    
    taxi_trips['datetime_for_weather'] = pd.to_datetime(taxi_trips['trip_start_timestamp']).dt.floor('h')

    return taxi_trips


#### company update codes

In [14]:
company_master = taxi_trips['company'].drop_duplicates().reset_index(drop=True)

company_master = pd.DataFrame(
    {
        'company_id': range(1, len(company_master) + 1),
        'company': company_master
    }
)

company_master



Unnamed: 0,company_id,company
0,1,Flash Cab
1,2,Taxicab Insurance Agency Llc
2,3,City Service
3,4,Blue Ribbon Taxi Association
4,5,Sun Taxi
5,6,"Taxicab Insurance Agency, LLC"
6,7,5 Star Taxi
7,8,Taxi Affiliation Services
8,9,Tac - Checker Cab Dispatch
9,10,Globe Taxi


In [13]:
new_company_data = [
        {'company': 'Setare Inc'},
        {'company': 'x'},
        {'company': 'y'}
]

new_company_mapping = pd.DataFrame(new_company_data)

new_company_mapping

Unnamed: 0,company
0,Setare Inc
1,x
2,y


In [15]:
company_max_id = company_master['company_id'].max()

company_max_id

30

In [19]:
new_company_list = []

for company in new_company_mapping['company'].values:
    if company not in company_master['company'].values:
        new_company_list.append(company)

# one line 
new_companies_list_one_line = [company for company in new_company_mapping['company'].values if company not in company_master['company'].values]


new_companies_list_one_line

['x', 'y']

In [21]:
new_companies_df = pd.DataFrame({
    'company_id': range(company_max_id +1, company_max_id + len(new_company_list) +1),
    'company' : new_company_list
}
)

new_companies_df

Unnamed: 0,company_id,company
0,31,x
1,32,y


In [23]:
updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)

updated_company_master

Unnamed: 0,company_id,company
0,1,Flash Cab
1,2,Taxicab Insurance Agency Llc
2,3,City Service
3,4,Blue Ribbon Taxi Association
4,5,Sun Taxi
5,6,"Taxicab Insurance Agency, LLC"
6,7,5 Star Taxi
7,8,Taxi Affiliation Services
8,9,Tac - Checker Cab Dispatch
9,10,Globe Taxi


In [24]:
def updated_company_master(company_master: pd.DataFrame, taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """
    Update the company master DataFrame with new companies from the taxi trips data.

    Parameters:
    - company_master (pd.DataFrame): The existing company master DataFrame containing company information.
    - taxi_trips (pd.DataFrame): DataFrame containing taxi trip data, including company information.

    Returns:
    - pd.DataFrame: Updated company master DataFrame with new companies added from the taxi trips data.

    Algorithm:
    1. Find the maximum company ID in the existing company master DataFrame.
    2. Iterate through the 'company' column of the taxi trips DataFrame.
    3. Check if each company in the taxi trips data is already in the company master DataFrame.
    4. If a company is not found in the company master DataFrame, add it to the list of new companies.
    5. Create a new DataFrame for the new companies with IDs starting from the maximum company ID + 1.
    6. Concatenate the existing company master DataFrame with the new companies DataFrame.
    7. Return the updated company master DataFrame.
    """

    # Error handling:
    if not isinstance(company_master, pd.DataFrame) or not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError('company_master and taxi_trips must be pandas DataFrames')
    
    required_columns = ['company_id', 'company']
    if not all(col in company_master.columns for col in required_columns):
        raise ValueError("company_master DataFrame is missing required columns")
    if 'company' not in taxi_trips.columns:
        raise ValueError("taxi_trips DataFrame is missing 'company' column")
    
    if company_master.empty:
        raise ValueError("company_master DataFrame is empty")
    if taxi_trips.empty:
        raise ValueError("taxi_trips DataFrame is empty")

    try:
        company_max_id = company_master['company_id'].max()
        # Other DataFrame operations
    except KeyError as e:
        raise KeyError(f"Missing key in DataFrame: {e}")
    except ValueError as e:
        raise ValueError(f"Value error: {e}")
    
    if len(company_master['company_id'].unique()) != len(company_master):
        raise ValueError("company_id column contains duplicate values")
    if not company_master['company_id'].is_monotonic_increasing:
        raise ValueError("company_id column is not sorted in increasing order")
    
    #def:

    company_max_id = company_master['company_id'].max()

    new_company_list = []
    for company in taxi_trips['company'].values:
        if company not in company_master['company'].values:
            new_company_list.append(company)

    new_companies_df = pd.DataFrame({
    'company_id': range(company_max_id +1, company_max_id + len(new_company_list) +1),
    'company' : new_company_list
    }
    )

    updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)

    return updated_company_master

In [26]:
taxi_trips_company_only = pd.DataFrame({
    "company_id": [1,2,3],
    'company': ['Setare Inc', 'x', 'y']
})

taxi_trips_company_only

Unnamed: 0,company_id,company
0,1,Setare Inc
1,2,x
2,3,y


#### payment type master code

In [37]:
payment_type_master = taxi_trips['payment_type'].drop_duplicates().reset_index(drop=True)

payment_type_master = pd.DataFrame(
    {
        'payment_type_id': range(1, len(payment_type_master) + 1),
        'payment_type': payment_type_master
    }
)

In [38]:
payment_type_only = pd.DataFrame({
    "payment_type_id": [1,2,3],
    'payment_type': ['Credit Card', 'x', 'y']
})

payment_type_only

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,x
2,3,y


In [28]:
def updated_payment_type_master(payment_type_master: pd.DataFrame, taxi_trips: pd.DataFrame) -> pd.DataFrame:
   
    payment_type_max_id = payment_type_master['payment_type_id'].max()

    new_payment_type_list = []
    for payment_type in taxi_trips['payment_type'].values:
        if payment_type not in payment_type_master['payment_type'].values:
            new_payment_type_list.append(payment_type)

    new_companies_df = pd.DataFrame({
    'payment_type_id': range(payment_type_max_id +1, payment_type_max_id + len(new_payment_type_list) +1),
    'payment_type' : new_payment_type_list
    }
    )

    updated_payment_type_master = pd.concat([payment_type_master, new_companies_df], ignore_index=True)

    return updated_payment_type_master

In [39]:
test = updated_payment_type_master(payment_type_master=payment_type_master, taxi_trips=payment_type_only)

test

Unnamed: 0,payment_type_id,payment_type
0,1,Cash
1,2,Credit Card
2,3,Mobile
3,4,Prcard
4,5,Unknown
5,6,No Charge
6,7,Dispute
7,8,x
8,9,y


In [None]:
import pandas as pd

def updated_payment_type_master(payment_type_master: pd.DataFrame, taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """
    Update the payment type master DataFrame with new payment types from the taxi trips data.

    Parameters:
    - payment_type_master (pd.DataFrame): The existing payment type master DataFrame containing payment type information.
    - taxi_trips (pd.DataFrame): DataFrame containing taxi trip data, including payment type information.

    Returns:
    - pd.DataFrame: Updated payment type master DataFrame with new payment types added from the taxi trips data.

    Algorithm:
    1. Find the maximum payment type ID in the existing payment type master DataFrame.
    2. Iterate through the 'payment_type' column of the taxi trips DataFrame.
    3. Check if each payment type in the taxi trips data is already in the payment type master DataFrame.
    4. If a payment type is not found in the payment type master DataFrame, add it to the list of new payment types.
    5. Create a new DataFrame for the new payment types with IDs starting from the maximum payment type ID + 1.
    6. Concatenate the existing payment type master DataFrame with the new payment types DataFrame.
    7. Return the updated payment type master DataFrame.

    Raises:
    - TypeError: If either payment_type_master or taxi_trips is not a pandas DataFrame.
    - ValueError: If payment_type column is missing in taxi_trips DataFrame.
    """
    # Input validation
    if not isinstance(payment_type_master, pd.DataFrame) or not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError('payment_type_master and taxi_trips must be pandas DataFrames')

    # Check if payment_type column exists in taxi_trips DataFrame
    if 'payment_type' not in taxi_trips.columns:
        raise ValueError("taxi_trips DataFrame is missing 'payment_type' column")

    # Check if DataFrames are empty
    if payment_type_master.empty:
        raise ValueError("payment_type_master DataFrame is empty")
    if taxi_trips.empty:
        raise ValueError("taxi_trips DataFrame is empty")

    payment_type_max_id = payment_type_master['payment_type_id'].max()

    new_payment_type_list = []
    for payment_type in taxi_trips['payment_type'].values:
        if payment_type not in payment_type_master['payment_type'].values:
            new_payment_type_list.append(payment_type)

    new_payment_type_df = pd.DataFrame({
        'payment_type_id': range(payment_type_max_id + 1, payment_type_max_id + len(new_payment_type_list) + 1),
        'payment_type': new_payment_type_list
    })

    updated_payment_type_master = pd.concat([payment_type_master, new_payment_type_df], ignore_index=True)

    return updated_payment_type_master


### Creating generic udpate master table function

In [46]:
def update_master(master: pd.DataFrame, taxi_trips: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
   
    max_id = master[id_column].max()

    new_values_list = []
    for value in taxi_trips[value_column].values:
        if value not in master[value_column].values:
            new_values_list.append(value)

    new_values_df = pd.DataFrame({
    id_column: range(max_id +1, max_id + len(new_values_list) +1),
    value_column : new_values_list
    }
    )

    updated_master = pd.concat([master, new_values_df], ignore_index=True)

    return updated_master

In [47]:
update_master(taxi_trips=taxi_trips, master=payment_type_master, id_column='payment_type_id', value_column='payment_type')

Unnamed: 0,payment_type_id,payment_type
0,1,Cash
1,2,Credit Card
2,3,Mobile
3,4,Prcard
4,5,Unknown
5,6,No Charge
6,7,Dispute


In [None]:
import pandas as pd

def update_master(master: pd.DataFrame, taxi_trips: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """
    Updates the master DataFrame with new values from the taxi_trips DataFrame.

    Parameters:
        master (pd.DataFrame): The master DataFrame to be updated.
        taxi_trips (pd.DataFrame): DataFrame containing new values.
        id_column (str): Name of the column in the master DataFrame representing unique IDs.
        value_column (str): Name of the column in the master and taxi_trips DataFrames containing values.

    Returns:
        pd.DataFrame: Updated master DataFrame with new values appended.

    Raises:
        ValueError: If id_column or value_column is not found in either master or taxi_trips DataFrame.
        TypeError: If master or taxi_trips is not a pandas DataFrame.
    """
    # Check if master and taxi_trips are DataFrames
    if not isinstance(master, pd.DataFrame) or not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("Both master and taxi_trips should be pandas DataFrames.")

    # Check if id_column and value_column exist in both DataFrames
    if id_column not in master.columns or id_column not in taxi_trips.columns:
        raise ValueError(f"{id_column} not found in both master and taxi_trips DataFrames.")
    if value_column not in master.columns or value_column not in taxi_trips.columns:
        raise ValueError(f"{value_column} not found in both master and taxi_trips DataFrames.")

    max_id = master[id_column].max()

    new_values_list = []
    for value in taxi_trips[value_column].values:
        if value not in master[value_column].values:
            new_values_list.append(value)

    new_values_df = pd.DataFrame({
        id_column: range(max_id + 1, max_id + len(new_values_list) + 1),
        value_column: new_values_list
    })

    updated_master = pd.concat([master, new_values_df], ignore_index=True)

    return updated_master
