In [40]:
import os
import requests
import pandas as pd
import psycopg2
import subprocess
from datetime import datetime, timedelta
from prefect import task, Flow, unmapped
from time import sleep
from sqlalchemy import create_engine
import dbt
from dbt.adapters.factory import get_adapter
from dbt.config import RuntimeConfig


# Função para fazer a requisição à API e obter os dados de GPS
@task
def get_gps_data():
    url = "https://dados.mobilidade.rio/gps/brt"
    response = requests.get(url)
    data = response.json()
    return data

# Função para estruturar e transformar os dados da API
@task
def process_gps_data(raw_data):
    df = pd.DataFrame(raw_data['veiculos'])
    df['timestamp'] = pd.to_datetime(df['dataHora'], unit='ms')
    return df[["codigo","placa","latitude","longitude","velocidade","timestamp"]]

# Função para salvar os dados em um arquivo CSV
@task
def save_to_csv(data, output_path):
    data.to_csv(output_path, index=False)
    
# Função para carregar o CSV na tabela do PostgreSQL
@task
def load_to_postgres(csv_file):
    engine = create_engine('postgresql://postgres:postgres@localhost:5432/postgres')
    data = pd.read_csv(csv_file)
    data.to_sql('gps_data', engine, if_exists='replace', index=False)
    
# Função para executar a modelagem dbt
@task
def run_dbt_model():
    current_dir = os.getcwd()
    parent_folder = os.path.dirname(current_dir)
    dbt_profile = 'desafio'  # Nome do perfil definido no arquivo profiles.yml

    #os.chdir(parent_folder)
    subprocess.run(['dbt', 'run', '--profiles-dir', os.getcwd(), '--profile', dbt_profile, '--models', 'gps_data_sql'])

# Definição da pipeline
with Flow("GPS_BRT_Pipeline") as flow:    
    # Obtenção dos dados de GPS
    gps_data = get_gps_data()
    
    # Processamento e transformação dos dados
    processed_data = process_gps_data(gps_data)
    
    # Salvando os dados em um arquivo CSV
    output_path = "gps_data.csv"
    save_to_csv(processed_data, output_path)
    
    # Carregando o CSV na tabela do PostgreSQL
    load_to_postgres(output_path)
    
    # Rodando o modelo
    run_dbt_model()

# Agendando a execução da pipeline a cada minuto por 10 minutos
for i in range(10):
    flow.run()
    sleep(60)  # Espera 1 minuto entre cada execução

[2023-07-18 14:08:47-0300] INFO - prefect.FlowRunner | Beginning Flow run for 'GPS_BRT_Pipeline'
[2023-07-18 14:08:47-0300] INFO - prefect.TaskRunner | Task 'get_gps_data': Starting task run...
[2023-07-18 14:08:47-0300] INFO - prefect.TaskRunner | Task 'get_gps_data': Finished task run for task with final state: 'Success'
[2023-07-18 14:08:47-0300] INFO - prefect.TaskRunner | Task 'run_dbt_model': Starting task run...
[2023-07-18 14:08:50-0300] INFO - prefect.TaskRunner | Task 'run_dbt_model': Finished task run for task with final state: 'Success'
[2023-07-18 14:08:50-0300] INFO - prefect.TaskRunner | Task 'load_to_postgres': Starting task run...
[2023-07-18 14:08:50-0300] INFO - prefect.TaskRunner | Task 'load_to_postgres': Finished task run for task with final state: 'Success'
[2023-07-18 14:08:50-0300] INFO - prefect.TaskRunner | Task 'process_gps_data': Starting task run...
[2023-07-18 14:08:50-0300] INFO - prefect.TaskRunner | Task 'process_gps_data': Finished task run for task w

[2023-07-18 14:15:08-0300] INFO - prefect.TaskRunner | Task 'get_gps_data': Starting task run...
[2023-07-18 14:15:09-0300] INFO - prefect.TaskRunner | Task 'get_gps_data': Finished task run for task with final state: 'Success'
[2023-07-18 14:15:09-0300] INFO - prefect.TaskRunner | Task 'run_dbt_model': Starting task run...
[2023-07-18 14:15:11-0300] INFO - prefect.TaskRunner | Task 'run_dbt_model': Finished task run for task with final state: 'Success'
[2023-07-18 14:15:11-0300] INFO - prefect.TaskRunner | Task 'load_to_postgres': Starting task run...
[2023-07-18 14:15:11-0300] INFO - prefect.TaskRunner | Task 'load_to_postgres': Finished task run for task with final state: 'Success'
[2023-07-18 14:15:11-0300] INFO - prefect.TaskRunner | Task 'process_gps_data': Starting task run...
[2023-07-18 14:15:11-0300] INFO - prefect.TaskRunner | Task 'process_gps_data': Finished task run for task with final state: 'Success'
[2023-07-18 14:15:11-0300] INFO - prefect.TaskRunner | Task 'save_to_c