# Importações

In [None]:
import os
import pandas as pd
from google.cloud import storage

# Ambiente de Execução: Colab X VSCode

In [None]:
# # Usar no Google Colab
# from google.colab import auth
# auth.authenticate_user()
# path_folder_silver = '/content/'

In [None]:
# Usar no VSCode
path_folder_silver = os.path.abspath('../data/temp/silver')
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "../gcp_key.json"

In [None]:
def extract_year_month(date): # Função para extrair o ano e o mês de uma data
    return date.strftime('%y%m')

year_month = extract_year_month(pd.to_datetime('2017-06-01'))  # Exemplo de uso da função, como se fosse o dia 01 de junho de 2017
print(f'Year-Month: {year_month}')

# Leitura dos Dados Limpos

In [None]:
def read_csv_cleaned(path_folder_silver, file_name_silver): # Função para ler o arquivo CSV limpo
    df = pd.read_csv(os.path.join(path_folder_silver, file_name_silver))
    
    print(f'Arquivo "{file_name_silver}" lido.')
    return df

file_name_silver = f'mta_{year_month}_cleaned.csv'

df = read_csv_cleaned(path_folder_silver, file_name_silver)
display(df.head(15))
display(df.info())

# Ajustes iniciais

In [None]:
def change_data_types(df): # Função para alterar os tipos de dados das colunas do DataFrame
    df = df.astype({'PublishedLineName': 'string', 'VehicleRef': 'string'}) # Converte as colunas PublishedLineName e VehicleRef para string
    df['RecordedAtDate'] = pd.to_datetime(df['RecordedAtDate'])  # Converte a coluna RecordedAtDate para datetime
    df['RecordedAtTime'] = pd.to_timedelta(df['RecordedAtTime'])  # Converte a coluna RecordedAtTime para timedelta
    df['ScheduledArrivalTime'] = pd.to_timedelta(df['ScheduledArrivalTime'])  # Converte a coluna ScheduledArrivalTime para timedelta
    
    print("Tipos de dados das colunas alterados.")
    return df

df = change_data_types(df)
display(df.info())

# Criação / atualização de arquivos dimensão

In [None]:
path_folder_gold_dim = os.path.abspath(f'../data/temp/gold/dim/') # Caminho para a pasta de saída dos arquivos dimensionais

file_name_dim_PublishedLine = f'dim_mta_PublishedLine.csv' # Nome do arquivo para a dimensão PublishedLine
file_name_dim_VehicleRef = f'dim_mta_VehicleRef.csv' # Nome do arquivo para a dimensão VehicleRef

In [None]:
def read_dim(path_folder_gold_dim, file_name_dim): # Função para ler o arquivo CSV da dimensão PublishedLine já existente
    try:
        df = pd.read_csv(os.path.join(path_folder_gold_dim, file_name_dim))  # Tenta ler o arquivo CSV da dimensão
        print(f'Arquivo "{file_name_dim}" lido.')
        return df
    except:
        print(f'Arquivo "{file_name_dim}" não encontrado.')
        return pd.DataFrame()  # se falhar, cria um DataFrame vazio


df_dim_PublishedLineName = read_dim(path_folder_gold_dim, file_name_dim_PublishedLine)
df_dim_VehicleRef = read_dim(path_folder_gold_dim, file_name_dim_VehicleRef)

In [None]:
def update_dim_PublishedLineName(df_new, df_dim): # Função para criar ou atualizar uma Dimensão com a coluna PublishedLineName
    df_new = df_new[['PublishedLineName']].drop_duplicates() # O duplo colchete é necessário para manter o nome da coluna original
    df_dim = pd.concat([df_new, df_dim], ignore_index=True).drop_duplicates()  # Concatena o DataFrame novo com o Dimensão existente
    
    print("Dimensão 'PublishedLineName' criado/atualizado.")
    return df_dim

df_dim_PublishedLineName = update_dim_PublishedLineName(df, df_dim_PublishedLineName)
display(df_dim_PublishedLineName.head(15))
display(df_dim_PublishedLineName.info())

In [None]:
def update_dim_VehicleRef(df_new, df_dim): # Função para criar um Dimensão com a coluna VehicleRef
    df_new = df_new[['VehicleRef']].drop_duplicates() # O duplo colchete é necessário para manter o nome da coluna original
    df_dim = pd.concat([df_new, df_dim], ignore_index=True).drop_duplicates()  # Concatena o DataFrame novo com o Dimensão existente
    
    print("Dimensão 'VehicleRef' criado/atualizado.")
    return df_dim

df_dim_VehicleRef = update_dim_VehicleRef(df, df_dim_VehicleRef) # Usa um DataFrame vazio como Dimensão inicial
display(df_dim_VehicleRef.head(15))
display(df_dim_VehicleRef.info())

# Exportação para CSV e envio para Cloud

In [None]:
path_folder_gold_fact = os.path.abspath(f'../data/temp/gold/fact/') # Caminho para a pasta de saída dos arquivos factuais

file_name_fact = f'fact_mta_{year_month}.csv' # Nome do arquivo para a tabela factual

In [None]:
def save_to_csv_gold(path_folder_gold, file_name_gold, df): # Função para salvar o DataFrame em um arquivo CSV na pasta Gold
    os.makedirs(path_folder_gold, exist_ok=True)  # Cria a pasta Gold se ela não existir

    df.to_csv(os.path.join(path_folder_gold, file_name_gold), index=False) # Salva o DataFrame em um arquivo CSV na pasta Gold
    
    print(f'Arquivo "{file_name_gold}" salvo na pasta "{path_folder_gold}".')

save_to_csv_gold(path_folder_gold_dim, file_name_dim_PublishedLine, df_dim_PublishedLineName)
save_to_csv_gold(path_folder_gold_dim, file_name_dim_VehicleRef, df_dim_VehicleRef)
save_to_csv_gold(path_folder_gold_fact, file_name_fact, df)

In [None]:
def upload_to_bucket_gold(path_folder_gold, file_name_gold): # Função para fazer o upload do arquivo CSV para o bucket do GCP
    client = storage.Client() # Cria o cliente para acessar o bucket
    bucket = client.bucket('etl_bus_gps') # Referência pro bucket do GCP
    
    path_folder_file_gold = os.path.join(path_folder_gold, file_name_gold) # Caminho completo do arquivo junto com o nome do arquivo
    
    folder_type = 'fact' if 'fact' in path_folder_gold else 'dim'  # Determina o tipo de pasta (fact ou dim) com base no caminho do arquivo, para que o arquivo seja enviado para a pasta correta no bucket
    
    blob = bucket.blob(f'temp/gold/{folder_type}/{file_name_gold}') # Cria o blob no bucket
    blob.upload_from_filename(path_folder_file_gold) # Faz o upload do arquivo para o bucket
    
    print(f'Arquivo "{file_name_gold}" enviado para a pasta "{blob}" no bucket "{bucket.name}"')

upload_to_bucket_gold(path_folder_gold_dim, file_name_dim_PublishedLine)
upload_to_bucket_gold(path_folder_gold_dim, file_name_dim_VehicleRef) 
upload_to_bucket_gold(path_folder_gold_fact, file_name_fact)

# Pipeline de Execução

In [None]:
def pipeline_gold(year_month): # Função para executar todas as funções de manipulação e upload dos dados
    
    path_folder_silver = os.path.abspath('../data/temp/silver') # Caminho para a pasta de entrada dos arquivos limpos
    file_name_silver = f"mta_{year_month}_cleaned.csv" # Nome do arquivo limpo a ser lido
    
    path_folder_gold_fact = os.path.abspath(f'../data/temp/gold/fact/') # Caminho para a pasta de saída dos arquivos factuais
    file_name_fact = f'fact_mta_{year_month}.csv' # Nome do arquivo para a tabela factual
    
    path_folder_gold_dim = os.path.abspath(f'../data/temp/gold/dim/') # Caminho para a pasta de saída dos arquivos dimensionais
    file_name_dim_PublishedLine = f'dim_mta_PublishedLine.csv' # Nome do arquivo para a dimensão PublishedLine
    file_name_dim_VehicleRef = f'dim_mta_VehicleRef.csv' # Nome do arquivo para a dimensão VehicleRef
    
    print('Iniciando o pipeline Gold...')
    df = read_csv_cleaned(path_folder_silver, file_name_silver) # Lê o arquivo CSV limpo
    df = change_data_types(df) # Altera os tipos de dados das colunas do DataFrame
    
    df_dim_PublishedLineName = read_dim(path_folder_gold_dim, file_name_dim_PublishedLine) # Lê o arquivo CSV da dimensão PublishedLine já existente
    df_dim_PublishedLineName = update_dim_PublishedLineName(df, df_dim_PublishedLineName) # Atualiza ou cria a Dimensão PublishedLineName
    df_dim_VehicleRef = read_dim(path_folder_gold_dim, file_name_dim_VehicleRef) # Lê o arquivo CSV da dimensão VehicleRef já existente
    df_dim_VehicleRef = update_dim_VehicleRef(df, df_dim_VehicleRef) # Atualiza ou cria a Dimensão VehicleRef
    
    data_to_save = { # Dicionário para armazenar: caminhos da pasta de saída, nomes dos arquivos e DataFrames a serem salvos
        path_folder_gold_dim: {file_name_dim_PublishedLine: df_dim_PublishedLineName, file_name_dim_VehicleRef: df_dim_VehicleRef},
        path_folder_gold_fact: {file_name_fact: df}
    }
    
    for path_folder_gold, files_names_gold in data_to_save.items(): # Itera sobre os caminhos da pasta de saída, nomes dos arquivos e DataFrames a serem salvos
        for file_name_gold, df in files_names_gold.items():
            save_to_csv_gold(path_folder_gold, file_name_gold, df) # Salva o DataFrame em um arquivo CSV na pasta Gold
            upload_to_bucket_gold(path_folder_gold, file_name_gold) # Faz o upload do arquivo CSV para o bucket do GCP
    
    print(f'Pipeline Gold concluído para o arquivo "{file_name_silver}"!!\n')
    
pipeline_gold(year_month)  # Executa o pipeline Gold

In [None]:
year_month_list = [] # Lista para armazenar os anos e meses

year_month_list.append(extract_year_month(pd.to_datetime('2017-06-01'))) # Adiciona o ano e mês de junho de 2017
year_month_list.append(extract_year_month(pd.to_datetime('2017-08-01'))) # Adiciona o ano e mês de agosto de 2017
year_month_list.append(extract_year_month(pd.to_datetime('2017-10-01'))) # Adiciona o ano e mês de outubro de 2017
year_month_list.append(extract_year_month(pd.to_datetime('2017-12-01'))) # Adiciona o ano e mês de dezembro de 2017

for year_month in year_month_list: # Itera sobre os anos e meses na lista, simulando o Airflow
    pipeline_gold(year_month)  # Executa o pipeline para o ano e mês especificados