In [1]:
import os
import requests
import pandas as pd
from sqlalchemy import create_engine, inspect, Table, Column, MetaData
from sqlalchemy.types import Text
from sqlalchemy import schema

In [21]:
# Database connection parameters
db_config = {
    'dbname': 'brazil_vra',
    'user': 'postgres',
    'password': 'postgrespass',
    'host': 'localhost',
    'port': '5432'
}

In [3]:
# URL for the CSV file (January 2024)
CSV_URL = "https://siros.anac.gov.br/siros/registros/diversos/vra/2024/VRA_2024_01.csv"

# Directory to temporarily store downloaded CSV files
TEMP_DIR = "temp_csv_files"
os.makedirs(TEMP_DIR, exist_ok=True)

# Batch size (number of rows to process at a time)
BATCH_SIZE = 10000

In [8]:
!pip install psycopg2


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [11]:
import psycopg2

In [24]:
# Function to create the database
def create_database():
    try:
        # Connect to the default 'postgres' database
        conn = psycopg2.connect(
            dbname='postgres',  # Connect to the default 'postgres' database
            user=db_config['user'],
            password=db_config['password'],
            host=db_config['host'],
            port=db_config['port']
        )
        conn.autocommit = True  # Enable autocommit for database creation
        cursor = conn.cursor()

        # Check if the database already exists
        cursor.execute(f"SELECT 1 FROM pg_database WHERE datname = '{db_config['dbname']}';")
        if not cursor.fetchone():
            # Create the database
            cursor.execute(f"CREATE DATABASE {db_config['dbname']};")
            print(f"Database '{db_config['dbname']}' created successfully.")
        else:
            print(f"Database '{db_config['dbname']}' already exists.")

        # Close the connection
        cursor.close()
        conn.close()
    except Exception as e:
        print(f"Error creating database: {e}")

In [25]:
create_database()

Database 'brazil_vra' already exists.


In [26]:
# Function to download the CSV file
def download_csv() -> str:
    response = requests.get(CSV_URL)
    if response.status_code == 200:
        file_path = os.path.join(TEMP_DIR, "vra_2024_01.csv")
        with open(file_path, "wb") as file:
            file.write(response.content)
        print("Downloaded CSV file successfully.")
        return file_path
    else:
        print("Failed to download CSV file.")
        return None

In [27]:
download_csv()

Downloaded CSV file successfully.


'temp_csv_files/vra_2024_01.csv'

In [28]:
# Function to ingest CSV data into PostgreSQL in batches
def ingest_csv_to_postgres(file_path: str):
    try:
        # Create a SQLAlchemy engine
        engine = create_engine(
            f"postgresql+psycopg2://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
        )
        
        # Define table name
        table_name = "vra_2024_01"
        
        # Read CSV in chunks using pandas
        for chunk in pd.read_csv(
            file_path,
            chunksize=BATCH_SIZE,
            sep=';',              # Use semicolon as the delimiter
            on_bad_lines='skip',  # Skip problematic rows
            quotechar='"',        # Specify the quote character
            engine='python'       # Use the Python engine for flexible parsing
        ):
            # Ingest each chunk into PostgreSQL
            chunk.to_sql(table_name, engine, if_exists="append", index=False)
            print(f"Ingested batch of {len(chunk)} rows into table {table_name}")
        
        print(f"Finished ingesting data into table {table_name}")
    except exc.SQLAlchemyError as e:
        print(f"Error ingesting data: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

In [31]:
file_path = "/workspaces/data-engineering-project/temp_csv_files/vra_2024_01.csv"
ingest_csv_to_postgres(file_path)

Ingested batch of 10000 rows into table vra_2024_01
Ingested batch of 10000 rows into table vra_2024_01
Ingested batch of 10000 rows into table vra_2024_01
Ingested batch of 10000 rows into table vra_2024_01
Ingested batch of 10000 rows into table vra_2024_01
Ingested batch of 10000 rows into table vra_2024_01
Ingested batch of 10000 rows into table vra_2024_01
Ingested batch of 10000 rows into table vra_2024_01
Ingested batch of 6617 rows into table vra_2024_01
Finished ingesting data into table vra_2024_01


In [33]:
df = pd.read_csv("/workspaces/data-engineering-project/temp_csv_files/vra_2024_01.csv", sep=';')
df.head()

Unnamed: 0,Sigla ICAO Empresa Aérea,Empresa Aérea,Número Voo,Código DI,Código Tipo Linha,Modelo Equipamento,Número de Assentos,Sigla ICAO Aeroporto Origem,Descrição Aeroporto Origem,Partida Prevista,Partida Real,Sigla ICAO Aeroporto Destino,Descrição Aeroporto Destino,Chegada Prevista,Chegada Real,Situação Voo,Justificativa,Referência,Situação Partida,Situação Chegada
0,AAL,"AMERICAN AIRLINES, INC.",904,0,I,B772,288,SBGL,AEROPORTO INTERNACIONAL DO RIO DE JANEIRO (GAL...,01/01/2024 23:55,01/01/2024 23:47,KMIA,"MIAMI INTERNATIONAL AIRPORT - MIAMI, FLORIDA -...",02/01/2024 07:45,02/01/2024 08:19,REALIZADO,,2024-01-01,Antecipado,Atraso 30-60
1,AAL,"AMERICAN AIRLINES, INC.",905,0,I,B772,288,KMIA,"MIAMI INTERNATIONAL AIRPORT - MIAMI, FLORIDA -...",01/01/2024 23:55,01/01/2024 01:29,SBGL,AEROPORTO INTERNACIONAL DO RIO DE JANEIRO (GAL...,02/01/2024 09:25,01/01/2024 09:35,REALIZADO,,2024-01-01,Antecipado,Antecipado
2,AAL,"AMERICAN AIRLINES, INC.",906,0,I,B77W,318,SBGR,GUARULHOS - GOVERNADOR ANDRÉ FRANCO MONTORO - ...,01/01/2024 00:55,01/01/2024 00:46,KMIA,"MIAMI INTERNATIONAL AIRPORT - MIAMI, FLORIDA -...",01/01/2024 08:35,01/01/2024 08:45,REALIZADO,,2024-01-01,Antecipado,Pontual
3,AAL,"AMERICAN AIRLINES, INC.",925,0,I,B772,288,KMIA,"MIAMI INTERNATIONAL AIRPORT - MIAMI, FLORIDA -...",01/01/2024 21:20,01/01/2024 23:17,SBGR,GUARULHOS - GOVERNADOR ANDRÉ FRANCO MONTORO - ...,02/01/2024 07:50,02/01/2024 07:47,REALIZADO,,2024-01-01,Atraso 60-120,Antecipado
4,AAL,"AMERICAN AIRLINES, INC.",929,0,I,B77W,318,KMIA,"MIAMI INTERNATIONAL AIRPORT - MIAMI, FLORIDA -...",01/01/2024 20:50,01/01/2024 21:51,SBGR,GUARULHOS - GOVERNADOR ANDRÉ FRANCO MONTORO - ...,02/01/2024 06:20,02/01/2024 06:13,REALIZADO,,2024-01-01,Atraso 60-120,Antecipado


In [34]:
df.dtypes

Sigla ICAO Empresa Aérea         object
Empresa Aérea                    object
Número Voo                       object
Código DI                        object
Código Tipo Linha                object
Modelo Equipamento               object
Número de Assentos                int64
Sigla ICAO Aeroporto Origem      object
Descrição Aeroporto Origem       object
Partida Prevista                 object
Partida Real                     object
Sigla ICAO Aeroporto Destino     object
Descrição Aeroporto Destino      object
Chegada Prevista                 object
Chegada Real                     object
Situação Voo                     object
Justificativa                   float64
Referência                       object
Situação Partida                 object
Situação Chegada                 object
dtype: object

In [35]:
engine = create_engine(
            f"postgresql+psycopg2://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
        )

# Create a MetaData object
metadata = MetaData()

In [38]:
df.to_sql('brazil_vra', engine)

617

In [39]:
from sqlalchemy import inspect

inspector = inspect(engine)
columns = inspector.get_columns('brazil_vra')
for column in columns:
    print(column['name'], column['type'])

index BIGINT
Sigla ICAO Empresa Aérea TEXT
Empresa Aérea TEXT
Número Voo TEXT
Código DI TEXT
Código Tipo Linha TEXT
Modelo Equipamento TEXT
Número de Assentos BIGINT
Sigla ICAO Aeroporto Origem TEXT
Descrição Aeroporto Origem TEXT
Partida Prevista TEXT
Partida Real TEXT
Sigla ICAO Aeroporto Destino TEXT
Descrição Aeroporto Destino TEXT
Chegada Prevista TEXT
Chegada Real TEXT
Situação Voo TEXT
Justificativa DOUBLE PRECISION
Referência TEXT
Situação Partida TEXT
Situação Chegada TEXT


In [40]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 86617 entries, 0 to 86616
Data columns (total 20 columns):
 #   Column                        Non-Null Count  Dtype  
---  ------                        --------------  -----  
 0   Sigla ICAO Empresa Aérea      86617 non-null  object 
 1   Empresa Aérea                 86617 non-null  object 
 2   Número Voo                    86617 non-null  object 
 3   Código DI                     86617 non-null  object 
 4   Código Tipo Linha             86617 non-null  object 
 5   Modelo Equipamento            86617 non-null  object 
 6   Número de Assentos            86617 non-null  int64  
 7   Sigla ICAO Aeroporto Origem   86617 non-null  object 
 8   Descrição Aeroporto Origem    86617 non-null  object 
 9   Partida Prevista              82370 non-null  object 
 10  Partida Real                  82197 non-null  object 
 11  Sigla ICAO Aeroporto Destino  86617 non-null  object 
 12  Descrição Aeroporto Destino   86617 non-null  object 
 13  C

In [None]:
df.columns

Index(['Sigla ICAO Empresa Aérea', 'Empresa Aérea', 'Número Voo', 'Código DI',
       'Código Tipo Linha', 'Modelo Equipamento', 'Número de Assentos',
       'Sigla ICAO Aeroporto Origem', 'Descrição Aeroporto Origem',
       'Partida Prevista', 'Partida Real', 'Sigla ICAO Aeroporto Destino',
       'Descrição Aeroporto Destino', 'Chegada Prevista', 'Chegada Real',
       'Situação Voo', 'Justificativa', 'Referência', 'Situação Partida',
       'Situação Chegada'],
      dtype='object')

In [45]:
df['Justificativa'].describe()

count    0.0
mean     NaN
std      NaN
min      NaN
25%      NaN
50%      NaN
75%      NaN
max      NaN
Name: Justificativa, dtype: float64

Partida Prevista datetime64
Partida Real datetime64
Chegada Prevista datetime64
Chegada Real datetime64
Justificativa TEXT

In [46]:
from sqlalchemy import DateTime, Text

schema_dict = {
    'Partida Prevista': DateTime,
    'Partida Real': DateTime,
    'Chegada Prevista': DateTime,
    'Chegada Real': DateTime,
    'Justificativa': Text,
}

In [47]:
from sqlalchemy import Table, Column

# Define the table schema dynamically using the dictionary
columns = [Column(col_name, col_type) for col_name, col_type in schema_dict.items()]

# Create the table
table = Table('vra_2024_01', metadata, *columns)

# Create the table in the database
metadata.create_all(engine)

In [52]:
# Convert the relevant columns to datetime (if they are not already)
datetime_columns = ['Partida Prevista', 'Partida Real', 'Chegada Prevista', 'Chegada Real', 'Referência']
for col in datetime_columns:
    #df[col] = pd.to_datetime(df[col], format='%d/%m/%Y %H:%M')
    df[col] = pd.to_datetime(df[col], format='mixed')

# Insert data into the table
df.to_sql('vra_2024_01', engine, if_exists='replace', index=False)

617

In [53]:
# Verify the schema
inspector = inspect(engine)
columns = inspector.get_columns('vra_2024_01')
for column in columns:
    print(column['name'], column['type'])

Sigla ICAO Empresa Aérea TEXT
Empresa Aérea TEXT
Número Voo TEXT
Código DI TEXT
Código Tipo Linha TEXT
Modelo Equipamento TEXT
Número de Assentos BIGINT
Sigla ICAO Aeroporto Origem TEXT
Descrição Aeroporto Origem TEXT
Partida Prevista TIMESTAMP
Partida Real TIMESTAMP
Sigla ICAO Aeroporto Destino TEXT
Descrição Aeroporto Destino TEXT
Chegada Prevista TIMESTAMP
Chegada Real TIMESTAMP
Situação Voo TEXT
Justificativa DOUBLE PRECISION
Referência TIMESTAMP
Situação Partida TEXT
Situação Chegada TEXT
