LIBRARIES / BIBLIOTECAS

In [None]:
# Libraries necessaries for this code
# Bibliotecas necessárias para este código
import io
import pandas as pd
import requests
import pandas as pd
from io import StringIO
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
from os import path

EXTRACT / EXTRAÇÃO

In [None]:
# Import the data_loader decorator if not already imported
# Importa o decorador data_loader se ainda não estiver importado
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader

# Import the test decorator if not already imported
# Importa o decorador test se ainda não estiver importado
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@data_loader
def load_data_from_api(*args, **kwargs):

    # Define the URL of the data file
    # Define a URL do arquivo de dados
    url = 'https://storage.googleapis.com/trip-bucket/trip_data.parquet'
    # Make an HTTP GET request to the URL
    # Faz uma requisição HTTP GET para a URL
    response = requests.get(url)

    # Read the response content as a byte stream
    # Lê o conteúdo da resposta como um fluxo de bytes
    data = io.BytesIO(response.content)

    # Load the data into a pandas DataFrame from the Parquet file
    # Carrega os dados em um DataFrame do pandas a partir do arquivo Parquet
    df_trip = pd.read_parquet(data)

    # Return the loaded DataFrame
    # Retorna o DataFrame carregado
    return df_trip

@test
def test_output(output, *args) -> None:
    
    # Check if the output is not None
    # Verifica se a saída não é indefinida
    assert output is not None, 'The output is undefined'
    # Check if the output is not None 
    # Verifica se a saída não é indefinida 
    assert output is not None, 'A saída está indefinida'

TRANSFORM / TRANSFORMAÇÃO

In [None]:
# Import the transformer decorator if not already imported
# Importa o decorador transformer se ainda não estiver importado
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

# Import the test decorator if not already imported
# Importa o decorador test se ainda não estiver importado
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@transformer
def transform(df_trip, *args, **kwargs):

    # Dictionary for storing dataframes
    # Dicionário para armazenamento dos dataframes
    dataframes = {}

    # Create a new global index and reset index
    # Criação de novo índice global e redefinição do índice
    df_trip = df_trip.drop_duplicates().reset_index(drop=True)
    df_trip['trip_id'] = df_trip.index

    # Create and configure datetime dimension table
    # Criação e configuração da tabela de dimensão datetime
    df_trip['lpep_pickup_datetime'] = pd.to_datetime(df_trip['lpep_pickup_datetime'])
    df_trip['lpep_dropoff_datetime'] = pd.to_datetime(df_trip['lpep_dropoff_datetime'])
    datetime_dim = df_trip[['lpep_pickup_datetime', 'lpep_dropoff_datetime']].reset_index()
    datetime_dim['pick_hour'] = datetime_dim['lpep_pickup_datetime'].dt.hour
    datetime_dim['pick_day'] = datetime_dim['lpep_pickup_datetime'].dt.day
    datetime_dim['pick_month'] = datetime_dim['lpep_pickup_datetime'].dt.month
    datetime_dim['pick_year'] = datetime_dim['lpep_pickup_datetime'].dt.year
    datetime_dim['pick_weekday'] = datetime_dim['lpep_pickup_datetime'].dt.weekday
    datetime_dim['drop_hour'] = datetime_dim['lpep_dropoff_datetime'].dt.hour
    datetime_dim['drop_day'] = datetime_dim['lpep_dropoff_datetime'].dt.day
    datetime_dim['drop_month'] = datetime_dim['lpep_dropoff_datetime'].dt.month
    datetime_dim['drop_year'] = datetime_dim['lpep_dropoff_datetime'].dt.year
    datetime_dim['drop_weekday'] = datetime_dim['lpep_dropoff_datetime'].dt.weekday
    datetime_dim['datetime_id'] = datetime_dim.index
    datetime_dim = datetime_dim[['datetime_id', 'lpep_pickup_datetime', 'pick_hour', 'pick_day', 'pick_month', 'pick_year', 'pick_weekday', 'lpep_dropoff_datetime', 'drop_hour', 'drop_day', 'drop_month', 'drop_year', 'drop_weekday']]
    dataframes['datetime_dim'] = datetime_dim

    # Create the distance dimension
    # Criação da dimensão distância
    distance_dim = df_trip[['trip_distance']].reset_index(drop=True)
    distance_dim['distance_id'] = distance_dim.index
    distance_dim = distance_dim[['distance_id', 'trip_distance']]
    dataframes['distance_dim'] = distance_dim

    # Create the rate code dimension
    # Criação da dimensão rate_code
    rate_code_name = {
        1: "Standard rate",
        2: "JFK",
        3: "Newark",
        4: "Nessau or Westchester",
        5: "Negotiated fare",
        6: "Group ride"
    }
    rate_code_dim = df_trip[['RatecodeID']].reset_index(drop=True)
    rate_code_dim['rate_code_id'] = rate_code_dim.index
    rate_code_dim['rate_code_name'] = rate_code_dim['RatecodeID'].map(rate_code_name)
    rate_code_dim['rate_code_name'] = rate_code_dim['rate_code_name'].fillna('Unknown')
    rate_code_dim = rate_code_dim[['rate_code_id', 'RatecodeID', 'rate_code_name']]
    dataframes['rate_code_dim'] = rate_code_dim

    # Create the trip type dimension
    # Criação da dimensão tipo de viagem
    trip_type_name = {
        1: "Street-hail",
        2: "Dispatch"
    }
    trip_type_dim = df_trip[['trip_type']].reset_index(drop=True)
    trip_type_dim['trip_type_id'] = trip_type_dim.index
    trip_type_dim['trip_type_name'] = trip_type_dim['trip_type'].map(trip_type_name)
    trip_type_dim['trip_type_name'] = trip_type_dim['trip_type_name'].fillna('Unknown')
    trip_type_dim = trip_type_dim[['trip_type_id', 'trip_type', 'trip_type_name']]
    dataframes['trip_type_dim'] = trip_type_dim

    # Create the pick-up location dimension
    # Criação da dimensão local de coleta
    pick_up_location_dim = df_trip[['PULocationID']].reset_index(drop=True)
    pick_up_location_dim['pick_up_id'] = pick_up_location_dim.index
    pick_up_location_dim = pick_up_location_dim[['pick_up_id', 'PULocationID']]
    dataframes['pick_up_location_dim'] = pick_up_location_dim

    # Create the drop-off location dimension
    # Criação da dimensão local de entrega
    drop_off_location_dim = df_trip[['DOLocationID']].reset_index(drop=True)
    drop_off_location_dim['drop_off_id'] = drop_off_location_dim.index
    drop_off_location_dim = drop_off_location_dim[['drop_off_id', 'DOLocationID']]
    dataframes['drop_off_location_dim'] = drop_off_location_dim

    # Create the payment type dimension
    # Criação da dimensão tipo de pagamento
    payment_type_name = {
        1: "Credit card",
        2: "Cash",
        3: "No charge",
        4: "Dispute",
        5: "Unknown",
        6: "Voided trip"
    }
    payment_type_dim = df_trip[['payment_type']].reset_index(drop=True)
    payment_type_dim['payment_type_id'] = payment_type_dim.index
    payment_type_dim['payment_type_name'] = payment_type_dim['payment_type'].map(payment_type_name)
    payment_type_dim['payment_type_name'] = payment_type_dim['payment_type_name'].fillna('Unknown')
    payment_type_dim = payment_type_dim[['payment_type_id', 'payment_type', 'payment_type_name']]
    dataframes['payment_type_dim'] = payment_type_dim

    # Create the fact table
    # Criação da tabela fato
    fact_table = df_trip.merge(datetime_dim, left_on='trip_id', right_on='datetime_id') \
                  .merge(distance_dim, left_on='trip_id', right_on='distance_id') \
                  .merge(rate_code_dim, left_on='trip_id', right_on='rate_code_id') \
                  .merge(trip_type_dim, left_on='trip_id', right_on='trip_type_id')  \
                  .merge(pick_up_location_dim, left_on='trip_id', right_on='pick_up_id') \
                  .merge(drop_off_location_dim, left_on='trip_id', right_on='drop_off_id') \
                  .merge(payment_type_dim, left_on='trip_id', right_on='payment_type_id') \
                  [['trip_id', 'VendorID', 'datetime_id', 'trip_type_id', 'rate_code_id', 'distance_id', 'pick_up_id', 'drop_off_id', 'payment_type_id', 'passenger_count', 'total_amount', 'fare_amount', 'mta_tax', 'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge', 'congestion_surcharge', 'extra', 'store_and_fwd_flag']]
    dataframes['fact_table'] = fact_table

    # Return all dataframes as dictionaries
    # Retorna todos os dataframes como dicionários
    return {
        "datetime_dim": datetime_dim.to_dict(orient="records"),
        "distance_dim": distance_dim.to_dict(orient="records"),
        "rate_code_dim": rate_code_dim.to_dict(orient="records"),
        "trip_type_dim": trip_type_dim.to_dict(orient="records"),
        "pick_up_location_dim": pick_up_location_dim.to_dict(orient="records"),
        "drop_off_location_dim": drop_off_location_dim.to_dict(orient="records"),
        "payment_type_dim": payment_type_dim.to_dict(orient="records"),
        "fact_table": fact_table.to_dict(orient="records")
    }

@test
def test_output(output, *args) -> None:

    assert output is not None, 'The output is undefined'
    # Verify if the output it's not undefined
    # Verifica se a saída não é indefinida
    assert output is not None, 'A saída está indefinida'

LOAD / CARREGAMENTO

In [None]:
# Import the data_exporter decorator if not already imported
# Importa o decorador data_exporter se ainda não estiver importado
if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_data_to_big_query(data, **kwargs) -> None:

    # Define the path to the configuration file
    # Define o caminho para o arquivo de configuração
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'  # Define the configuration profile to use
    # Define o perfil de configuração a ser usado

    # Iterate through each key-value pair in the data dictionary
    # Itera por cada par chave-valor no dicionário de dados
    for key, values in data.items():
        # Construct the BigQuery table ID using the key
        # Construa o ID da tabela BigQuery usando a chave
        table_id = f'trip-project-432523.trip_project_dataset.{key}'
        
        # Export the data to BigQuery using the specified configuration
        # Exporte os dados para o BigQuery usando a configuração especificada
        BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
            DataFrame(values),
            table_id,
            if_exists='replace',  # Specify resolution policy if table name already exists
            # Especificar política de resolução se o nome da tabela já existir
        )