In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
import os
import zipfile
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, year, month
from sqlalchemy import create_engine
import urllib.parse
from dotenv import load_dotenv

Baixar arquivos

In [None]:
def baixar_arquivos():
    DOWNLOAD_URL = "https://web3.antaq.gov.br/ea/txt/"
    anos = ["2021", "2022", "2023"]
    arquivos_desejados = ["Atracacao", "Carga", "CargaConteinerizada"]
    os.makedirs("dados_brutos", exist_ok=True)
    for ano in anos:
        for arquivo in arquivos_desejados:
            caminho = f"dados_brutos/{arquivo}_{ano}.zip"
            if not os.path.exists(caminho):
                url = f"{DOWNLOAD_URL}{ano}{arquivo}.zip"
                with open(caminho, "wb") as f:
                    f.write(requests.get(url).content)

Descompactar arquivos

In [None]:
def descompactar_arquivos():
    os.makedirs("dados_extraidos", exist_ok=True)
    for arquivo in os.listdir("dados_brutos"):
        if arquivo.endswith(".zip"):
            caminho_zip = os.path.join("dados_brutos", arquivo)
            destino = os.path.join("dados_extraidos", arquivo.replace(".zip", ""))
            os.makedirs(destino, exist_ok=True)
            with zipfile.ZipFile(caminho_zip, 'r') as zip_ref:
                zip_ref.extractall(destino)


Processar dados

In [None]:
def processar_dados():
    spark = SparkSession.builder.appName("ConsolidarDadosANTAQ").getOrCreate()

    input_dir = "dados_extraidos"

    arquivos = ["Atracacao", "Carga", "CargaConteinerizada"]
    anos = ["2021", "2022", "2023"]

    atracacao = None
    carga = None
    carga_cont = None

    for arquivo in arquivos:
        input_files = [f"{input_dir}/{arquivo}_{ano}" for ano in anos]
        
        existing_files = []
        for folder in input_files:
            if os.path.exists(folder):
                files = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith(".txt")]
                existing_files.extend(files)
        
        if not existing_files:
            print(f"{arquivo} não encontrado. Indo para o próximo...")
            continue
        
        df = spark.read.option("header", True).option("delimiter", ";").csv(existing_files)

        if arquivo == "Atracacao":
            atracacao = df
        elif arquivo == "Carga":
            carga = df
        elif arquivo == "CargaConteinerizada":
            carga_cont = df

Carregar dados

In [None]:
def carregar_dados():
    load_dotenv("arquivo.env")

    server = os.getenv("SERVER")
    database = os.getenv("DATABASE")
    username = os.getenv("USERNAME")
    password = os.getenv("PASSWORD")

    password_encoded = urllib.parse.quote_plus(password)

    connection_string = f"mssql+pyodbc://{username}:{password_encoded}@{server}/{database}?driver=SQL+Server"

    engine = create_engine(connection_string, fast_executemany=True)

    atracacao_fato_pd = atracacao_fato.toPandas()
    carga_fato_pd = carga_fato.toPandas()

    def importar_dados_para_sql(df, tabela):
        try:
            df.to_sql(tabela, engine, if_exists="append", index=False, chunksize=1000)
            print("Dados importados com sucesso para a tabela {tabela}.")
        except Exception as e:
            print("Erro ao importar os dados para a tabela {tabela}: {e}")

    importar_dados_para_sql(atracacao_fato_pd, "atracacao_fato")
    importar_dados_para_sql(carga_fato_pd, "carga_fato")

    engine.dispose()


Verificar dados

In [None]:
def verificar_dados():
    return os.path.exists("dados_extraidos")

Notificação

In [None]:
def notificar_sucesso():
    print("ETL finalizado")

def notificar_falha():
    print("ETL falhou")

Definir DAG

In [None]:
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'antaq_etl',
    default_args=default_args,
    description='DAG para ETL da base ANTAQ',
    schedule_interval='@daily',
)

task_baixar = PythonOperator(
    task_id='baixar_arquivos',
    python_callable=baixar_arquivos,
    dag=dag,
)

task_descompactar = PythonOperator(
    task_id='descompactar_arquivos',
    python_callable=descompactar_arquivos,
    dag=dag,
)

task_processar = PythonOperator(
    task_id='processar_dados',
    python_callable=processar_dados,
    dag=dag,
)

task_carregar = PythonOperator(
    task_id='carregar_dados',
    python_callable=carregar_dados,
    dag=dag,
)

task_verificar = PythonOperator(
    task_id='verificar_dados',
    python_callable=verificar_dados,
    dag=dag,
)

task_sucesso = EmailOperator(
    task_id='notificar_sucesso',
    to='email@example.com',
    subject='ETL Finalizado com Sucesso',
    html_content='O processo ETL foi concluído com sucesso.',
    dag=dag,
)

task_falha = EmailOperator(
    task_id='notificar_falha',
    to='email@example.com',
    subject='ETL Falhou',
    html_content='O processo ETL falhou.',
    dag=dag,
)

task_baixar >> task_descompactar >> task_processar >> task_carregar >> task_verificar

task_verificar >> task_sucesso

task_verificar >> task_falha
