# OBSERVATÓRIO DA INDÚSTRIA - Prova prática para vaga de Especialista de Dados

## Imports

In [1]:
from imports import *
from sql_params import *

## Configurações globais

In [2]:
BASE_URL = "https://web3.antaq.gov.br/ea/txt/"
RAW_DATA_DIR = r"C:\data\antaq\raw"
JDBC_DRIVER_PATH = r"C:\Users\Fluency Academy\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\jars\mssql-jdbc-12.8.1.jre11.jar"

## Tarefas

### Tarefa: Gerenciamento do banco de dados

In [3]:
@task
def create_database(connection_string_master: str):
    """Cria o banco de dados se não existir"""
    try:
        conn = pyodbc.connect(connection_string_master)
        cursor = conn.cursor()
        cursor.execute("""
            IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = 'antaq_db')
            BEGIN
                CREATE DATABASE antaq_db;
            END
        """)
        conn.commit()
        print("[Database] Banco antaq_db verificado/criado com sucesso")
        cursor.close()
        conn.close()
    except Exception as e:
        print(f"[Database] Erro na criação do banco: {str(e)}")
        raise

@task
def create_sql_tables(connection_string: str):
    """Cria as tabelas se não existirem"""
    table_definitions = {
        "atracacao_fato": """
            IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='atracacao_fato' AND xtype='U')
            CREATE TABLE dbo.atracacao_fato (
                idatracacao INT PRIMARY KEY,
                cdtup VARCHAR(50),
                idberco VARCHAR(50),
                berço VARCHAR(50),
                porto_atracação VARCHAR(100),
                coordenadas VARCHAR(100),
                apelido_instalação_portuária VARCHAR(100),
                complexo_portuário VARCHAR(100),
                tipo_da_autoridade_portuária VARCHAR(100),
                data_atracação DATE,
                data_chegada DATE,
                data_desatracação DATE,
                data_início_operação DATE,
                data_término_operação DATE,
                ano INT,
                mes VARCHAR(50),
                tipo_de_operação VARCHAR(100),
                tipo_de_navegação_da_atracação VARCHAR(100),
                nacionalidade_do_armador INT,
                flagmcoperacaoatracacao INT,
                terminal VARCHAR(50),
                município VARCHAR(50),
                uf VARCHAR(50),
                sguf VARCHAR(50),
                região_geográfica VARCHAR(100),
                região_hidrográfica VARCHAR(100),
                instalação_portuária_em_rio VARCHAR(100),
                nº_da_capitania VARCHAR(50),
                nº_do_imo INT
            )""",
        "carga_fato": """
            IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='carga_fato' AND xtype='U')
            CREATE TABLE dbo.carga_fato (
                idcarga INT PRIMARY KEY,
                idatracacao INT,
                origem VARCHAR(100),
                destino VARCHAR(100),
                cdmercadoria VARCHAR(50),
                tipo_operação_da_carga VARCHAR(100),
                carga_geral_acondicionamento VARCHAR(100),
                conteinerestado VARCHAR(50),
                tipo_navegação VARCHAR(100),
                flagautorizacao INT,
                flagcabotagem INT,
                flagcabotagemmovimentacao INT,
                flagconteinertamanho INT,
                flaglongocurso INT,
                flagmcoperacaocarga INT,
                flagoffshore INT,
                flagtransporteviainterioir INT,
                percurso_transporte_em_vias_interiores INT,
                percurso_transporte_interiores INT,
                stnaturezacarga VARCHAR(100),
                stsh2 VARCHAR(50),
                stsh4 VARCHAR(50),
                natureza_da_carga VARCHAR(100),
                sentido VARCHAR(50),
                teu INT,
                qtcarga INT,
                vlpesocargabruta FLOAT
            )"""
    }
    try:
        conn = pyodbc.connect(connection_string)
        cursor = conn.cursor()
        
        for table_name, ddl in table_definitions.items():
            cursor.execute(f"""
                IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='{table_name}' AND xtype='U')
                BEGIN
                    {ddl}
                END
            """)
            print(f"[Database] Tabela {table_name} verificada/criada")
        
        conn.commit()
        cursor.close()
        conn.close()
    except Exception as e:
        print(f"[Database] Erro na criação das tabelas: {str(e)}")
        raise

### Tarefa: Coleta de dados

In [4]:
@task
def download_and_process_month(base_url: str, year: int, data_type: str):
    """Baixa e extrai dados para um ano e tipo específico"""
    try:
        url = f"{base_url}{year}{data_type}.zip"
        output_dir = os.path.join(RAW_DATA_DIR, str(year), data_type.lower())
        
        if not os.path.exists(output_dir):
            os.makedirs(output_dir, exist_ok=True)
            print(f"[Raw Data] Criado diretório: {output_dir}")

        # Verifica conteúdo antes do download
        existing_files = os.listdir(output_dir)
        if len(existing_files) > 0:
            print(f"[Raw Data] Arquivos existentes ({len(existing_files)}):")
            for f in existing_files:
                file_path = os.path.join(output_dir, f)
                print(f" - {f} ({os.path.getsize(file_path)/1024:.2f} KB)")
            
        if len(os.listdir(output_dir)) == 0:
            print(f"[Download] Iniciando download de {url}")
            response = requests.get(url)
            response.raise_for_status()
            
            with zipfile.ZipFile(io.BytesIO(response.content)) as z:
                # Verifica conteúdo do ZIP antes de extrair
                zip_info = z.infolist()
                print(f"[Download] Arquivos no ZIP ({len(zip_info)}):")
                for info in zip_info:
                    print(f" - {info.filename} ({info.file_size/1024:.2f} KB)")
                
                z.extractall(output_dir)
                print(f"[Download] Dados extraídos em: {output_dir}")

                # Verifica arquivos extraídos
                extracted_files = os.listdir(output_dir)
                print(f"[Raw Data] Arquivos extraídos ({len(extracted_files)}):")
                for f in extracted_files:
                    file_path = os.path.join(output_dir, f)
                    print(f" - {f} ({os.path.getsize(file_path)/1024:.2f} KB)")
        else:
            print(f"[Raw Data] Dados já existem em: {output_dir}")
            
        return output_dir
    except Exception as e:
        print(f"[Download] Falha no processamento de {data_type} {year}: {str(e)}")
        raise

### Tarefa: Processamento Spark

In [5]:
@task
def process_and_load_data(atracacao_dir: str, carga_dir: str, connection_properties: dict):
    """Processa e carrega dados no SQL Server"""
    try:
        # Iniciar a sessão Spark
        spark = SparkSession.builder \
            .appName("ETL_ANTAQ") \
            .config("spark.sql.warehouse.dir", "file:///C:/temp") \
            .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
            .config("spark.hadoop.io.nativeio.NativeIO", "false") \
            .config("spark.hadoop.io.nativeio.NativeIO.disable", "true") \
            .getOrCreate()

        # ========== Processamento Atracacao ==========
        print(f"\n[Spark] Verificando arquivos em: {atracacao_dir}")
        atracacao_files = [f for f in os.listdir(atracacao_dir) if f.endswith('.txt')]
        if not atracacao_files:
            print(f"[Spark] Nenhum arquivo TXT encontrado em: {atracacao_dir}")
            return

        atracacao_path = os.path.join(atracacao_dir, atracacao_files[0])
        print(f"[Spark] Processando arquivo de atracacao: {atracacao_path}")

        df_atracacao = spark.read \
            .option("header", True) \
            .option("delimiter", ";") \
            .csv(atracacao_path)
        
        print(f"[Spark] Total de registros brutos de atracacao: {df_atracacao.count():,}")
    
        # Dicionário com as colunas a serem utilizadas para atracação
        atracacao_columns = [
            'IDAtracacao', 'CDTUP', 'IDBerco', 'Berço', 'Porto Atracação', 'Coordenadas',
            'Apelido Instalação Portuária', 'Complexo Portuário', 'Tipo da Autoridade Portuária',
            'Data Atracação', 'Data Chegada', 'Data Desatracação', 'Data Início Operação',
            'Data Término Operação', 'Ano', 'Mes', 'Tipo de Operação', 'Tipo de Navegação da Atracação',
            'Nacionalidade do Armador', 'FlagMCOperacaoAtracacao', 'Terminal', 'Município', 'UF',
            'SGUF', 'Região Geográfica', 'Região Hidrográfica', 'Instalação Portuária em Rio',
            'Nº da Capitania', 'Nº do IMO'
        ]
    
        # Normalizando nomes de colunas, substituindo espaços por _
        atracacao_mapping = {name: name.lower().replace(" ", "_") for name in atracacao_columns}
        
        df_atracacao_fato = df_atracacao.select([col(c).alias(atracacao_mapping[c]) for c in atracacao_mapping]) \
                                    .dropDuplicates([atracacao_mapping.get("idatracacao", "IDatracacao")])

        # Escrevendo no Data Lake (Parquet)
        atracacao_output = "/data/lake/antaq/atracacao_fato/"
        df_atracacao_fato.write.mode("overwrite").parquet(atracacao_output)
        print(f"[Data Lake] Atracação salva em: {atracacao_output}")
    
        # ==================== Processamento de dados de Carga ====================
        df_carga_raw = spark.read.option("header", "true") \
                                 .option("inferSchema", "true") \
                                 .option("delimiter", ";") \
                                 .csv(carga_dir)
        print(f"Total de registros de carga_raw: {df_carga_raw.count()}")
    
        df_atracacao_join = df_atracacao_fato.select(col(atracacao_mapping.get("idatracacao", "idatracacao")).alias("join_id")).distinct()

        # Escreve no SQL Server
        print("\n[Spark] Escrevendo dados de atracação no banco...")
        df_atracacao_fato.write \
            .format("jdbc") \
            .option("url", "jdbc:sqlserver://localhost:1433;databaseName=antaq_db;trustServerCertificate=true") \
            .option("dbtable", "dbo.atracacao_fato") \
            .option("user", connection_properties["user"]) \
            .option("password", connection_properties["password"]) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .mode("append") \
            .save()

        print(f"[SQL Server] Atracação concluída - Esperados {df_atracacao_fato.count():,} registros")
        
        # Usando join com a tabela Atracação para filtrar apenas os dados que constam na primeira tabela
        df_carga = df_carga_raw.join(
            df_atracacao_join,
            df_carga_raw["idatracacao"] == df_atracacao_join["join_id"],
            "inner"
        ).drop("join_id")
        
        carga_columns = [
            'IDCarga', 'IDAtracacao', 'Origem', 'Destino', 'CDMercadoria', 'Tipo Operação da Carga',
            'Carga Geral Acondicionamento', 'ConteinerEstado', 'Tipo Navegação', 'FlagAutorizacao',
            'FlagCabotagem', 'FlagCabotagemMovimentacao', 'FlagConteinerTamanho', 'FlagLongoCurso',
            'FlagMCOperacaoCarga', 'FlagOffshore', 'FlagTransporteViaInterioir', 'Percurso Transporte em vias Interiores',
            'Percurso Transporte Interiores', 'STNaturezaCarga', 'STSH2', 'STSH4', 'Natureza da Carga', 'Sentido',
            'TEU', 'QTCarga', 'VLPesoCargaBruta'
        ]
    
        # Normalizando nomes de colunas, substituindo espaços por _
        carga_mapping = {name: name.lower().replace(" ", "_") for name in carga_columns if name in df_carga.columns}
    
        df_carga_fato = df_carga.select([col(c).alias(carga_mapping[c]) for c in carga_mapping]) \
                                .dropDuplicates([carga_mapping.get("idcarga", "idcarga")])

        # Escrevendo no Data Lake (Parquet)
        carga_output = "/data/lake/antaq/carga_fato/"
        df_carga_fato.write.mode("overwrite").parquet(carga_output)
        print(f"[Data Lake] Carga salva em: {carga_output}")

        # Escreve no SQL Server
        print("\n[Spark] Escrevendo dados de carga no banco...")
        df_carga_fato.write \
            .format("jdbc") \
            .option("url", "jdbc:sqlserver://localhost:1433;databaseName=antaq_db;trustServerCertificate=true") \
            .option("dbtable", "dbo.carga_fato") \
            .option("user", connection_properties["user"]) \
            .option("password", connection_properties["password"]) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .mode("append") \
            .save()
        
        print(f"[SQL Server] Carga concluída - Esperados {df_carga_fato.count():,} registros")

        spark.stop()
        
        # Verificação final
        print("\n[Verificação] Verificando registros no banco...")
        try:
            conn = pyodbc.connect(connection_properties["connection_string"])
            cursor = conn.cursor()
            
            cursor.execute("SELECT COUNT(*) FROM atracacao_fato")
            atracacao_count = cursor.fetchone()[0]
            print(f"[SQL] Total de registros em atracacao_fato: {atracacao_count:,}")
            
            cursor.execute("SELECT COUNT(*) FROM carga_fato")
            carga_count = cursor.fetchone()[0]
            print(f"[SQL] Total de registros em carga_fato: {carga_count:,}")
            
            cursor.close()
            conn.close()
            
            if atracacao_count < df_atracacao_fato.count():
                print(f"[AVISO] Possível perda de dados: Esperados {df_atracacao_fato.count():,} vs Inseridos {atracacao_count:,}")
                
            if carga_count < df_carga_fato.count():
                print(f"[AVISO] Possível perda de dados: Esperados {df_carga_fato.count():,} vs Inseridos {carga_count:,}")
                
        except Exception as e:
            print(f"[ERRO] Falha na verificação do banco: {str(e)}")
        
        return True
    except Exception as e:
        print(f"[Spark] Erro no processamento: {str(e)}")
        raise

## Fluxo Principal

In [6]:
@flow
def antaq_etl_pipeline():
    """Orquestração principal do pipeline ETL"""
    try:
        conn_master = f"DRIVER={{{driver}}};SERVER={server};DATABASE=master;UID={user};PWD={password};TrustServerCertificate=yes"
        conn_antaq = f"DRIVER={{{driver}}};SERVER={server};DATABASE=antaq_db;UID={user};PWD={password};TrustServerCertificate=yes"
        
        create_database(conn_master)
        create_sql_tables(conn_antaq)
        
        # Processar cada ano especificado
        for current_year in [2021, 2022, 2023]:
            print(f"\n{'='*50}")
            print(f"Processando ano: {year}")
            print(f"{'='*50}")
            
            atracacao_dir = download_and_process_month(BASE_URL, current_year, "Atracacao")
            carga_dir = download_and_process_month(BASE_URL, current_year, "Carga")
            
            process_and_load_data(
                atracacao_dir,
                carga_dir,
                {
                    "user": user,
                    "password": password,
                    "driver": driver
                }
            )
        data_lake_path = "data/lake/antaq"

        df_atracacao_fato.write.mode("overwrite").parquet(f"{data_lake_path}/atracacao_fato/")
        df_carga_fato.write.mode("overwrite").parquet(f"{data_lake_path}/carga_fato/")

        print("[Pipeline] Processamento concluído para todos os anos")
    except Exception as e:
        print(f"[Pipeline] Erro fatal no pipeline: {str(e)}")
        raise

## Execução e início de orquestração com Prefect

In [7]:
if __name__ == "__main__":
    # Construir connection strings usando parâmetros importados
    conn_master = f"DRIVER={{{driver}}};SERVER={server};DATABASE=master;UID={user};PWD={password};TrustServerCertificate=yes"
    conn_antaq = f"DRIVER={{{driver}}};SERVER={server};DATABASE=antaq_db;UID={user};PWD={password};TrustServerCertificate=yes"
    
    # Executar fora do Prefect (chamada direta das funções)
    create_database(conn_master)
    create_sql_tables(conn_antaq)
    
    for year in [2021, 2022, 2023]:
        print(f"\n{'='*30}")
        print(f"PROCESSANDO ANO {year}")
        print(f"{'='*30}")
        
        # Baixar dados
        atracacao_dir = download_and_process_month(BASE_URL, year, "Atracacao")
        carga_dir = download_and_process_month(BASE_URL, year, "Carga")
        
        # Processar e carregar
        process_and_load_data(
            atracacao_dir,
            carga_dir,
            {
                "user": user,
                "password": password,
                "driver": driver
            }
        )
    
    print("\nPROCESSAMENTO CONCLUÍDO")

[Database] Banco antaq_db verificado/criado com sucesso


[Database] Tabela atracacao_fato verificada/criada
[Database] Tabela carga_fato verificada/criada



PROCESSANDO ANO 2021
[Raw Data] Arquivos existentes (1):
 - 2021Atracacao.txt (26464.83 KB)
[Raw Data] Dados já existem em: C:\data\antaq\raw\2021\atracacao


[Raw Data] Arquivos existentes (1):
 - 2021Carga.txt (428676.96 KB)
[Raw Data] Dados já existem em: C:\data\antaq\raw\2021\carga



[Spark] Verificando arquivos em: C:\data\antaq\raw\2021\atracacao
[Spark] Processando arquivo de atracacao: C:\data\antaq\raw\2021\atracacao\2021Atracacao.txt
[Spark] Total de registros brutos de atracacao: 79,238
[Data Lake] Atracação salva em: /data/lake/antaq/atracacao_fato/
Total de registros de carga_raw: 2348365

[Spark] Escrevendo dados de atracação no banco...
[SQL Server] Atracação concluída - Esperados 79,238 registros
[Data Lake] Carga salva em: /data/lake/antaq/carga_fato/

[Spark] Escrevendo dados de carga no banco...
[SQL Server] Carga concluída - Esperados 2,348,365 registros

[Verificação] Verificando registros no banco...
[ERRO] Falha na verificação do banco: 'connection_string'



PROCESSANDO ANO 2022
[Raw Data] Arquivos existentes (1):
 - 2022Atracacao.txt (28470.70 KB)
[Raw Data] Dados já existem em: C:\data\antaq\raw\2022\atracacao


[Raw Data] Arquivos existentes (1):
 - 2022Carga.txt (414565.53 KB)
[Raw Data] Dados já existem em: C:\data\antaq\raw\2022\carga



[Spark] Verificando arquivos em: C:\data\antaq\raw\2022\atracacao
[Spark] Processando arquivo de atracacao: C:\data\antaq\raw\2022\atracacao\2022Atracacao.txt
[Spark] Total de registros brutos de atracacao: 85,092
[Data Lake] Atracação salva em: /data/lake/antaq/atracacao_fato/
Total de registros de carga_raw: 2280018

[Spark] Escrevendo dados de atracação no banco...
[SQL Server] Atracação concluída - Esperados 85,092 registros
[Data Lake] Carga salva em: /data/lake/antaq/carga_fato/

[Spark] Escrevendo dados de carga no banco...
[SQL Server] Carga concluída - Esperados 2,280,018 registros

[Verificação] Verificando registros no banco...
[ERRO] Falha na verificação do banco: 'connection_string'



PROCESSANDO ANO 2023
[Raw Data] Arquivos existentes (1):
 - 2023Atracacao.txt (31590.45 KB)
[Raw Data] Dados já existem em: C:\data\antaq\raw\2023\atracacao


[Raw Data] Arquivos existentes (1):
 - 2023Carga.txt (395917.38 KB)
[Raw Data] Dados já existem em: C:\data\antaq\raw\2023\carga



[Spark] Verificando arquivos em: C:\data\antaq\raw\2023\atracacao
[Spark] Processando arquivo de atracacao: C:\data\antaq\raw\2023\atracacao\2023Atracacao.txt
[Spark] Total de registros brutos de atracacao: 93,908
[Data Lake] Atracação salva em: /data/lake/antaq/atracacao_fato/
Total de registros de carga_raw: 2200634

[Spark] Escrevendo dados de atracação no banco...
[SQL Server] Atracação concluída - Esperados 93,908 registros
[Data Lake] Carga salva em: /data/lake/antaq/carga_fato/

[Spark] Escrevendo dados de carga no banco...
[SQL Server] Carga concluída - Esperados 2,200,634 registros

[Verificação] Verificando registros no banco...
[ERRO] Falha na verificação do banco: 'connection_string'



PROCESSAMENTO CONCLUÍDO
