##Load file from Google Cloud

In [None]:
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data_from_api(*args, **kwargs):
    """
    Template for loading data from API
    """
    url = 'https://storage.googleapis.com/uber-data-set/uber.csv'
    response = requests.get(url)

    return pd.read_csv(io.StringIO(response.text), sep=',')


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'


##Transform data

In [None]:
import pandas as pd
import haversine as hs
from haversine import Unit
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def transform(df, *args, **kwargs):
    """
    Template code for a transformer block.

    Add more parameters to this function if this block has multiple parent blocks.
    There should be one parameter for each output variable from each parent block.

    Args:
        data: The output from the upstream parent block
        args: The output from any additional upstream blocks (if applicable)

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    # Specify your transformation logic here
    # First Basic Changes 
    df=df.drop(columns=['tt'])
    df=df.drop(columns=['key'])
    df['pickup_datetime']=pd.to_datetime(df['pickup_datetime'])
    df=df.drop_duplicates().reset_index(drop=True)
    
    #Filter out rows where latitude or longitude are out of bounds
    df = df[
        (df['pickup_latitude'].between(-90, 90)) &
        (df['pickup_longitude'].between(-180, 180)) &
        (df['dropoff_latitude'].between(-90, 90)) &
        (df['dropoff_longitude'].between(-180, 180))
    ]

    df = df[ (df['passenger_count']!= 208) &
        (df['passenger_count']!= 0) ]
    
    #Table Dim Time
    Dim_time=df[['pickup_datetime']].reset_index(drop=True)
    Dim_time['Time_key']=Dim_time.index+1
    Dim_time['pickup_datetime']=Dim_time['pickup_datetime']
    Dim_time['Pickup_weekday']=Dim_time['pickup_datetime'].dt.day_name()
    Dim_time['Pickup_time']=Dim_time['pickup_datetime'].dt.time
    Dim_time['Pickup_hour']=Dim_time['pickup_datetime'].dt.hour
    Dim_time['Pickup_day']=Dim_time['pickup_datetime'].dt.day
    Dim_time['Pickup_month']=Dim_time['pickup_datetime'].dt.month
    Dim_time['Pickup_Year']=Dim_time['pickup_datetime'].dt.year

    Dim_time=Dim_time[['Time_key','pickup_datetime','Pickup_weekday','Pickup_time','Pickup_hour','Pickup_day','Pickup_month','Pickup_Year']]
    
    #Table Dim_loc_pickup
    Dim_Loc_Pickup=df[['pickup_longitude','pickup_latitude']].reset_index(drop=True)
    Dim_Loc_Pickup['LocPick_id']=Dim_Loc_Pickup.index+1

    Dim_Loc_Pickup=Dim_Loc_Pickup[['LocPick_id','pickup_longitude','pickup_latitude']]

    #Table Dim_loc_dropoff
    Dim_Loc_dropoff=df[['dropoff_longitude','dropoff_latitude']].reset_index(drop=True)
    Dim_Loc_dropoff['LocDrop_id']=Dim_Loc_dropoff.index+1

    Dim_Loc_dropoff=Dim_Loc_dropoff[['LocDrop_id','dropoff_longitude','dropoff_latitude']]

    # Dim_Distance Table 
    #step 1 Convert the latitude and longitude to numeric 
    df['pickup_latitude'] = pd.to_numeric(df['pickup_latitude'], errors='coerce')
    df['pickup_longitude'] = pd.to_numeric(df['pickup_longitude'], errors='coerce')
    df['dropoff_latitude'] = pd.to_numeric(df['dropoff_latitude'], errors='coerce')
    df['dropoff_longitude'] = pd.to_numeric(df['dropoff_longitude'], errors='coerce')

    # Step 2: Calculate the Haversine distance for the cleaned data
    df['distance'] = df.apply(lambda row: hs.haversine(
        (row['pickup_latitude'], row['pickup_longitude']),
        (row['dropoff_latitude'], row['dropoff_longitude']),
        unit=Unit.KILOMETERS), axis=1)

    # Make The Table 
    Dim_Distance=df[['distance']].reset_index(drop=True)
    Dim_Distance['Ride_distance']=Dim_Distance['distance']
    Dim_Distance['Distance_id']=Dim_Distance.index+1

    Dim_Distance=Dim_Distance[['Distance_id','Ride_distance']]

    #make RateRide_code Table 
    # first make a Rate ride code 
    df['RateRide_code']=df['fare_amount']/df['distance']
    df['RateRide_code'].quantile([0.25, 0.5, 0.75])
    df.loc[df['RateRide_code'] < 2.988024, 'RateRide_code'] = 1
    df.loc[df['RateRide_code'].between(2.988024, 5.209104), 'RateRide_code'] = 2
    df.loc[df['RateRide_code'] > 5.209104, 'RateRide_code'] = 3

    

    # Make The Table 
    Dim_RateRide=df[['RateRide_code']].reset_index(drop=True)
    Dim_RateRide['RateRide_id']=Dim_RateRide.index+1
    #convert the index to 
    Dim_RateRide.loc[Dim_RateRide['RateRide_code'] == 1, 'RateRide_codeName'] = 'Low Fare'
    Dim_RateRide.loc[Dim_RateRide['RateRide_code'] == 2, 'RateRide_codeName'] = 'Avg Fare' 
    Dim_RateRide.loc[Dim_RateRide['RateRide_code'] == 3, 'RateRide_codeName'] = 'High Fare'

    Dim_RateRide= Dim_RateRide[['RateRide_id','RateRide_code','RateRide_codeName']]

    Fact_Fare=df[['fare_amount', 'passenger_count']].reset_index(drop=True)
    Fact_Fare['VendorID']=Fact_Fare.index+1
    Fact_Fare['Time_key']=Dim_time['Time_key']
    Fact_Fare['LocPick_id']=Dim_Loc_Pickup['LocPick_id']
    Fact_Fare['LocDrop_id']=Dim_Loc_dropoff['LocDrop_id']
    Fact_Fare['Distance_id']=Dim_Distance['Distance_id']
    Fact_Fare['RateRide_id']=Dim_RateRide['RateRide_id']

    Fact_Fare = Fact_Fare[['VendorID', 'Time_key', 'LocPick_id', 'LocDrop_id', 'Distance_id', 'RateRide_id', 'fare_amount', 'passenger_count']]

    # --- Data Cleanup: Remove Invalid Data Rows ---

    # 1. Filter out rows with distance == 0 from distance_table
    rows_to_delete = Dim_Distance[Dim_Distance['Ride_distance'] == 0]['Distance_id']

    # 2. Delete the rows in distance_table
    Dim_Distance = Dim_Distance[Dim_Distance['Ride_distance'] != 0]

    # 3. Remove corresponding rows in other tables based on 'ID'
    Dim_time = Dim_time[~Dim_time['Time_key'].isin(rows_to_delete)]
    Dim_RateRide = Dim_RateRide[~Dim_RateRide['RateRide_id'].isin(rows_to_delete)]
    Dim_Loc_dropoff = Dim_Loc_dropoff[~Dim_Loc_dropoff['LocDrop_id'].isin(rows_to_delete)]
    Dim_Loc_Pickup = Dim_Loc_Pickup[~Dim_Loc_Pickup['LocPick_id'].isin(rows_to_delete)]
    Fact_Fare = Fact_Fare[~Fact_Fare['VendorID'].isin(rows_to_delete)]

    # 4. Filter out rows with Fare Amount == 0 from Fact_table
    rows_to_delete1 = Fact_Fare[Fact_Fare['fare_amount'] == 0]['VendorID']
    # 5. Delete the rows in distance_table
    Fact_Fare = Fact_Fare[Fact_Fare['fare_amount'] != 0]

    # 6. Remove corresponding rows in other tables based on 'ID'
    Dim_time = Dim_time[~Dim_time['Time_key'].isin(rows_to_delete1)]
    Dim_RateRide = Dim_RateRide[~Dim_RateRide['RateRide_id'].isin(rows_to_delete1)]
    Dim_Loc_dropoff = Dim_Loc_dropoff[~Dim_Loc_dropoff['LocDrop_id'].isin(rows_to_delete1)]
    Dim_Loc_Pickup = Dim_Loc_Pickup[~Dim_Loc_Pickup['LocPick_id'].isin(rows_to_delete1)]
    Fact_Fare = Fact_Fare[~Fact_Fare['VendorID'].isin(rows_to_delete1)]

    # 7. Reset the 'index' column for all tables
    Dim_Distance.reset_index(drop=True, inplace=True)
    Dim_time.reset_index(drop=True, inplace=True)
    Dim_RateRide.reset_index(drop=True, inplace=True)
    Dim_Loc_dropoff.reset_index(drop=True, inplace=True)
    Dim_Loc_Pickup.reset_index(drop=True, inplace=True)
    Fact_Fare.reset_index(drop=True, inplace=True)

    # 8. Reset the 'ID' column for all tables
    Dim_Distance['Distance_id'] = range(1, len(Dim_Distance) + 1)
    Dim_time['Time_key'] = range(1, len(Dim_time) + 1)
    Dim_RateRide['RateRide_id'] = range(1, len(Dim_RateRide) + 1)
    Dim_Loc_dropoff['LocDrop_id'] = range(1, len(Dim_Loc_dropoff) + 1)
    Dim_Loc_Pickup['LocPick_id'] = range(1, len(Dim_Loc_Pickup) + 1)
    Fact_Fare['VendorID'] = range(1, len(Fact_Fare) + 1)
        
    return {"Dim_time":Dim_time.to_dict(orient="dict"),
        "Dim_Loc_Pickup":Dim_Loc_Pickup.to_dict(orient="dict"),
        "Dim_Loc_dropoff":Dim_Loc_dropoff.to_dict(orient="dict"),
        "Dim_Distance":Dim_Distance.to_dict(orient="dict"),
        "Dim_RateRide":Dim_RateRide.to_dict(orient="dict"),
        "Fact_Fare":Fact_Fare.to_dict(orient="dict")
        }


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'


##Load data to Google BigQuery

In [None]:
from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_big_query(data, **kwargs) -> None:
    
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    for key, value in data.items():
        table_id = 'uber-de.uber_de_pro.{}'.format(key)
        BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
            DataFrame(value),
            table_id,
            if_exists='replace', 
        )