# Model Efektywności Operacyjnej Lotów Komercyjnych na Danych ADS-B

In [None]:
#%pip install meteostat
#%pip install pandas
#%pip install requests
#%pip install "psycopg[binary]"
#%pip install pyarrow
#%pip install pathlib 
#%pip install airportsdata

In [1]:
import os
import io
import csv
import zipfile
import requests
import psycopg
import psycopg.sql as sql
import pandas as pd
import numpy as np
from pathlib import Path
import airportsdata

conn_str = 'postgresql://admin:pass@localhost:5432/flights_db'

## Pobranie danych z srtony Bureau of Transportation Statistics - BTS 

In [None]:
base_url = "https://transtats.bts.gov/PREZIP/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2024_{}.zip"

for month in range(1,13):
    url = base_url.format(month)
    filename = f"./data/raw_data/flights/flights_2024_{month}.zip"
    print(f"Pobieranie miesiąca {month}/12...")
    try:
        res = requests.get(url,stream=True)
        res.raise_for_status()
        with open(filename,'wb') as f:
            for chunk in res.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"Zapisano: {filename}")        
    except Exception as ex:
        print(ex)

In [2]:
file_path_preview = './data/raw_data/flights/flights_2024_1.zip'

with zipfile.ZipFile(file_path_preview,'r') as z:
    csv_file = [f for f in z.namelist() if f.endswith('.csv')][0]
    df_preview = pd.read_csv(z.open(csv_file), nrows=5,skipinitialspace=True)

In [3]:
df_preview.head()

Unnamed: 0,Year,Quarter,Month,DayofMonth,DayOfWeek,FlightDate,Marketing_Airline_Network,Operated_or_Branded_Code_Share_Partners,DOT_ID_Marketing_Airline,IATA_Code_Marketing_Airline,...,Div5Airport,Div5AirportID,Div5AirportSeqID,Div5WheelsOn,Div5TotalGTime,Div5LongestGTime,Div5WheelsOff,Div5TailNum,Duplicate,Unnamed: 119
0,2024,1,1,14,7,2024-01-14,UA,UA_CODESHARE,19977,UA,...,,,,,,,,,N,
1,2024,1,1,14,7,2024-01-14,UA,UA_CODESHARE,19977,UA,...,,,,,,,,,N,
2,2024,1,1,14,7,2024-01-14,UA,UA_CODESHARE,19977,UA,...,,,,,,,,,N,
3,2024,1,1,14,7,2024-01-14,UA,UA_CODESHARE,19977,UA,...,,,,,,,,,N,
4,2024,1,1,14,7,2024-01-14,UA,UA_CODESHARE,19977,UA,...,,,,,,,,,N,


In [4]:
print(df_preview.columns.tolist())

['Year', 'Quarter', 'Month', 'DayofMonth', 'DayOfWeek', 'FlightDate', 'Marketing_Airline_Network', 'Operated_or_Branded_Code_Share_Partners', 'DOT_ID_Marketing_Airline', 'IATA_Code_Marketing_Airline', 'Flight_Number_Marketing_Airline', 'Originally_Scheduled_Code_Share_Airline', 'DOT_ID_Originally_Scheduled_Code_Share_Airline', 'IATA_Code_Originally_Scheduled_Code_Share_Airline', 'Flight_Num_Originally_Scheduled_Code_Share_Airline', 'Operating_Airline ', 'DOT_ID_Operating_Airline', 'IATA_Code_Operating_Airline', 'Tail_Number', 'Flight_Number_Operating_Airline', 'OriginAirportID', 'OriginAirportSeqID', 'OriginCityMarketID', 'Origin', 'OriginCityName', 'OriginState', 'OriginStateFips', 'OriginStateName', 'OriginWac', 'DestAirportID', 'DestAirportSeqID', 'DestCityMarketID', 'Dest', 'DestCityName', 'DestState', 'DestStateFips', 'DestStateName', 'DestWac', 'CRSDepTime', 'DepTime', 'DepDelay', 'DepDelayMinutes', 'DepDel15', 'DepartureDelayGroups', 'DepTimeBlk', 'TaxiOut', 'WheelsOff', 'Wheels

### Zapisanie wybranych kolumn do bazy danych.

In [None]:
conn = psycopg.connect(conn_str)
cursor = conn.cursor()

In [None]:
cursor.execute("""--sql
CREATE TABLE IF NOT EXISTS flights_2024 (
    -- Czas i Data
    "Quarter" SMALLINT,
    "Month" SMALLINT,
    "DayofMonth" SMALLINT,
    "DayOfWeek" SMALLINT,
    "FlightDate" DATE,

    -- Przewoźnik i Lot
        --Analiza
    "Marketing_Airline_Network" VARCHAR(10),  -- Np. AA, DL 
    "Operating_Airline" VARCHAR(30),
    "IATA_Code_Operating_Airline" VARCHAR(30),
    "IATA_Code_Marketing_Airline" VARCHAR(30),
    "Flight_Number_Operating_Airline" INT, -- faktyczny przewoznik, bilety sprzedaje duża linia(Marketing) i a lot faktycznie obsługuje inna linia (Operating). Dlatego są różne numery
               
        --Model
    "DOT_ID_Operating_Airline"  VARCHAR(30),
    "DOT_ID_Marketing_Airline"  VARCHAR(30),                           
    "Tail_Number" VARCHAR(10),

    -- Trasa (Origin / Destination)
    "Origin" VARCHAR(5),       -- Kod lotniska wylotu (np. JFK)
    "OriginCityName" VARCHAR(100),
    "OriginState" VARCHAR(5),
               
    "Dest" VARCHAR(5),         -- Kod lotniska przylotu
    "DestCityName" VARCHAR(100),
    "DestState" VARCHAR(5),
               
    "Distance" FLOAT,

    -- Planowany Czas (Scheduled)
    "CRSDepTime" INT,          -- Format HHMM (np. 1430)
    "CRSArrTime" INT,
    "CRSElapsedTime" FLOAT,

    -- Rzeczywisty Czas i Wykonanie (Performance)
    "DepTime" INT,             -- Rzeczywisty wylot
    "DepDelay" FLOAT,          -- Opóźnienie w minutach (może być ujemne)
    "DepDelayMinutes" FLOAT,   -- Opóźnienie (0 dla ujemnych)
    "DepDel15" FLOAT,               
    
    "ArrTime" INT,             -- Rzeczywisty przylot
    "ArrDelay" FLOAT,          -- TARGET: Opóźnienie przylotu
    "ArrDelayMinutes" FLOAT,
    "ArrDel15" FLOAT,
    
    -- Status lotu (0 lub 1)
    "Cancelled" FLOAT,     
    "Diverted" FLOAT,
    
    "ActualElapsedTime" FLOAT,
    "AirTime" FLOAT,

    -- Przyczyny opóźnień (Dla analizy SQL)
    "CarrierDelay" FLOAT,
    "WeatherDelay" FLOAT,
    "NASDelay" FLOAT,
    "SecurityDelay" FLOAT,
    "LateAircraftDelay" FLOAT
);
""")
conn.commit()

cursor.execute("""--sql
    CREATE INDEX "idx_FlightDate" ON "flights_2024" ("FlightDate");

    --sql                         
    -- Indeks linii lotniczych
    CREATE INDEX "idx_Airline" ON "flights_2024" ("Marketing_Airline_Network");

    --sql               
    -- Indeks trasy (Skąd -> Dokąd)
    CREATE INDEX "idx_Route" ON "flights_2024" ("Origin", "Dest");

    --sql               
    -- Indeks wspierający modelowanie (Miesiąc + Dzień tygodnia + Wylot)
    CREATE INDEX "idx_Features" ON "flights_2024" ("Month", "DayOfWeek", "Origin");

    --sql                
    -- Indeks na TARGET (Nowe kolumny) - przydatne, by szybko sprawdzić ile było opóźnień
    CREATE INDEX "idx_ArrDel15" ON "flights_2024" ("ArrDel15");
               
    --sql                 
    CREATE INDEX idx_flights_origin_date_time ON flights_2024 ("Origin", "FlightDate", ("CRSDepTime" / 100));
               
    --sql                 
    CREATE INDEX idx_flights_dest_date_time ON flights_2024 ("Dest", "FlightDate", ("CRSArrTime" / 100));

""")
conn.commit()

In [None]:
col_names =  [
    "Quarter", "Month", "DayofMonth", "DayOfWeek", "FlightDate",
    "Marketing_Airline_Network", "Operating_Airline", "IATA_Code_Operating_Airline", "IATA_Code_Marketing_Airline", "Flight_Number_Operating_Airline",
    "DOT_ID_Operating_Airline", "DOT_ID_Marketing_Airline", "Tail_Number",
    "Origin", "OriginCityName", 
    "OriginState", "Dest", "DestCityName", "DestState", "Distance", 
    "CRSDepTime", "CRSArrTime", "CRSElapsedTime", "DepTime", "DepDelay", 
    "DepDelayMinutes", "DepDel15", "ArrTime", "ArrDelay", "ArrDelayMinutes", 
    "ArrDel15", "Cancelled", "Diverted", "ActualElapsedTime", "AirTime", 
    "CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay"
]

In [5]:
df_preview["IATA_Code_Marketing_Airline"]

0    UA
1    UA
2    UA
3    UA
4    UA
Name: IATA_Code_Marketing_Airline, dtype: str

In [None]:
cols_sql = sql.SQL(', ').join([sql.Identifier(col) for col in col_names])
copy_query = sql.SQL('''COPY flights_2024 ({0}) FROM STDIN WITH (FORMAT CSV, HEADER FALSE, NULL '')''').format(cols_sql)

zip_files = [f"./data/raw_data/flights/flights_2024_{m}.zip" for m in range(1, 13)]

for zip_file in zip_files:
    try:
        with zipfile.ZipFile(zip_file,'r') as z:
            csv_file = [f for f in z.namelist() if f.endswith('.csv')][0]
            with z.open(csv_file) as file_handler:
                header_df = pd.read_csv(file_handler, nrows=0)
                actual_columns = [c.strip() for c in header_df.columns]

                reader = pd.read_csv(
                    file_handler,
                    names=actual_columns,
                    header=0,
                    usecols=col_names,
                    chunksize=50000,
                    engine='c',
                    low_memory=True,
                    dtype=str
                )
                print("Rozpoczynam kopiowanie...")
                with cursor.copy(copy_query) as copy:
                    for i, chunk in enumerate(reader):
                        buf = io.StringIO()
                        chunk[col_names].to_csv(buf, index=False, header=False, sep=',',na_rep='', quoting=csv.QUOTE_MINIMAL)
                        buf.seek(0)
                        copy.write(buf.getvalue())
                        print(f"Wysłano paczkę nr {i+1} (razem {(i+1)*50000} wierszy)")
        print(f"Zapisano {zip_file} w bazie danych")                
    except Exception as  e:
        print(e)
    conn.commit()
print("Import do bazy danych zakończiony pomyślnie")       

In [None]:
print('Obczyszczenie kolumny "Tail_Number" pod przyszłe łączenie tabel w bazie danych')
update_query = t'UPDATE flights_2024 SET "Tail_Number" = UPPER(TRIM("Tail_Number"));'
cursor.execute(update_query)   
conn.commit()
print('Obczyszczenie kolumny "Tail_Number" zakończono pomyślnie')   

In [None]:
conn.rollback()

In [None]:
conn.commit()

In [None]:
conn.close()


## Wczytanie danych o samolotach do DB

In [6]:
df_preview = pd.read_csv(
    'data/raw_data/aircraft-database/aircraft-database-complete-2025-08.csv',
    quotechar="'",        # Informujemy, że tekst jest w pojedynczych cudzysłowach
    skipinitialspace=True, # Opcjonalnie, pomaga gdy po przecinku jest spacja
    nrows=15
)
df_preview = df_preview.replace({np.nan: None})
df_preview.dtypes

icao24                    str
timestamp                 str
acars                   int64
adsb                    int64
built                  object
categoryDescription    object
country                object
engines                object
firstFlightDate        object
firstSeen              object
icaoAircraftClass      object
lineNumber             object
manufacturerIcao       object
manufacturerName       object
model                  object
modes                   int64
nextReg                object
notes                  object
operator               object
operatorCallsign       object
operatorIata           object
operatorIcao           object
owner                  object
prevReg                object
regUntil               object
registered             object
registration           object
selCal                 object
serialNumber           object
status                 object
typecode               object
vdl                     int64
dtype: object

In [7]:
df_preview

Unnamed: 0,icao24,timestamp,acars,adsb,built,categoryDescription,country,engines,firstFlightDate,firstSeen,...,owner,prevReg,regUntil,registered,registration,selCal,serialNumber,status,typecode,vdl
0,000000,2017-10-19 18:30:18,0,0,,,,,,,...,,,,,,,,,,0
1,000001,2016-11-27 00:10:39,0,0,,,,,,,...,Private,,,,SP-FGR,,,,AN2,0
2,000002,2018-05-29 02:00:00,0,0,,Light (< 15500 lbs),,,,,...,,,,,,,,,,0
3,000003,2018-04-01 02:00:00,0,0,,No ADS-B Emitter Category Information,,,,,...,,,,,,,,,,0
4,000004,2018-05-29 02:00:00,0,0,,No ADS-B Emitter Category Information,,,,,...,,,,,,,,,,0
5,000005,1970-01-01 01:00:00,0,0,,,,,,,...,,,,,,,,,,0
6,000006,2017-09-01 02:00:00,0,0,,No ADS-B Emitter Category Information,,,,,...,,,,,,,,,,0
7,000007,2018-01-01 01:00:00,0,0,,No ADS-B Emitter Category Information,,,,,...,,,,,,,,,,0
8,000008,2017-12-01 01:00:00,0,0,,Surface Vehicle â€“ Service Vehicle,,,,,...,,,,,,,,,,0
9,000009,2017-09-15 00:10:26,0,0,,,,,,,...,Private,,,,TC-,,AT3-069,,AAT3,0


In [None]:
with psycopg.connect(conn_str) as conn:
    with conn.cursor() as cur:
        cur.execute("""--sql
                CREATE TABLE IF NOT EXISTS aircrafts (
                    icao24              CHAR(6) PRIMARY KEY, -- Unikalny kod hex
                    timestamp           TEXT,                -- Czas aktualizacji rekordu
                    acars               BOOLEAN,             -- Czy obsługuje ACARS
                    adsb                BOOLEAN,             -- Czy obsługuje ADS-B
                    built               DATE,                -- Rok/Data budowy (bezpiecznie jako TEXT)
                    categorydescription TEXT,                -- Opis kategorii statku
                    country             TEXT,                -- Kraj rejestracji
                    engines             TEXT,                -- Silniki
                    firstflightdate     DATE,                -- Data pierwszego lotu (bezpiecznie jako TEXT)
                    firstseen           DATE,                -- Kiedy po raz pierwszy widziany w OpenSky
                    icaoaircraftclass   TEXT,                -- Klasa statku według ICAO
                    linenumber          TEXT,                -- Numer linii produkcyjnej
                    manufacturericao    TEXT,                -- Kod ICAO producenta
                    manufacturername    TEXT,                -- Nazwa producenta
                    model               TEXT,                -- Model samolotu
                    modes               BOOLEAN,             -- Czy obsługuje Mode S
                    nextreg             TEXT,                -- Następna rejestracja (jeśli zmieniono)
                    notes               TEXT,                -- Notatki
                    operator            TEXT,                -- Nazwa operatora
                    operatorcallsign    TEXT,                -- Znak wywoławczy operatora
                    operatoriata        TEXT,                -- Kod IATA operatora
                    operatoricao        TEXT,                -- Kod ICAO operatora
                    owner               TEXT,                -- Właściciel
                    prevreg             TEXT,                -- Poprzednia rejestracja
                    reguntil            DATE,                -- Data ważności rejestracji
                    registered          DATE,                -- Data rejestracji
                    registration        TEXT,                -- Numer rejestracyjny (np. SP-LSA)
                    selcal              TEXT,                -- Kod SELCAL
                    serialnumber        TEXT,                -- Numer seryjny
                    status              TEXT,                -- Status samolotu
                    typecode            TEXT,                -- Kod typu
                    vdl                 BOOLEAN              -- Czy obsługuje VDL
                );
            """)
        conn.commit()
        cur.execute("""--sql
            CREATE INDEX idx_aircraft_registration ON aircrafts(registration);
            --sql
            CREATE INDEX idx_aircraft_typecode ON aircrafts(typecode);
            --sql
            CREATE INDEX idx_aircraft_operator ON aircrafts(operator);
        """)
        conn.commit()

In [None]:
print("Początek kopiowania aircraft-database")
try:
    with psycopg.connect(conn_str) as conn:
        with conn.cursor() as cur:
            df = pd.read_csv('data/raw_data/aircraft-database/aircraft-database-complete-2025-08.csv',
            quotechar="'",
            skipinitialspace=True,
            dtype=str
            )
            df = df.replace({np.nan: None,'':None,'nan':None})
            with cur.copy("COPY aircrafts FROM STDIN") as copy:
                for row in df.itertuples(index=False,name=None):
                    copy.write_row(row)
        conn.commit()
except Exception as e:
    print(e)
            
print(f"Kopiowania aircraft-database zakończone sukcesem")                   

### Utworzenie widoku

In [None]:
print("Tworzenie widoku vw_aircraft_clean")
try:
    with psycopg.connect(conn_str) as conn:
        with conn.cursor() as cur:
            cur.execute("""--sql
                CREATE MATERIALIZED VIEW mv_aircraft_clean AS
                    SELECT 
                        icao24,
                        timestamp,
                        acars,
                        adsb,
                        
                        -- 1. BEZPIECZNE WYCIĄGANIE ROKU BUDOWY (built)
                        -- Jeśli ciąg zaczyna się od 4 cyfr, wyciąga je i rzutuje na INT. W przeciwnym razie wstawia NULL.
                        CASE 
                            WHEN built::TEXT ~ '^\\d{4}' THEN SUBSTRING(built::TEXT, 1, 4)::INT 
                            ELSE NULL 
                        END AS built_year,
                        
                        -- 2. BEZPIECZNE PARSOWANIE DATY PIERWSZEGO LOTU (firstflightdate)
                        -- Zabezpiecza przed błędem, jeśli w kolumnie są śmieci. Szuka formatu YYYY-MM-DD.
                        CASE 
                            WHEN firstflightdate::TEXT ~ '^\\d{4}-\\d{2}-\\d{2}' THEN TO_DATE(SUBSTRING(firstflightdate::TEXT, 1 , 10), 'YYYY-MM-DD')
                            ELSE NULL 
                        END AS first_flight_clean,

                        -- 3. BEZPIECZNE PARSOWANIE POZOSTAŁYCH DAT (registered, reguntil)
                        CASE 
                            WHEN registered::TEXT ~ '^\\d{4}-\\d{2}-\\d{2}' THEN TO_DATE(SUBSTRING(registered::TEXT, 1 , 10), 'YYYY-MM-DD')
                            ELSE NULL 
                        END AS registered_clean,
                        
                        CASE 
                            WHEN reguntil::TEXT ~ '^\\d{4}-\\d{2}-\\d{2}' THEN TO_DATE(SUBSTRING(reguntil::TEXT, 1 , 10), 'YYYY-MM-DD')
                            ELSE NULL 
                        END AS reguntil_clean,
                        
                        categorydescription,
                        firstseen,
                        icaoaircraftclass,
                        linenumber,
                        modes,
                        selcal,
                        vdl,

                        -- 4. STANDARYZACJA TEKSTU (WIELKIE LITERY + TRIM + ZAMIANA PUSTYCH NA NULL)
                        -- NULLIF(TRIM(kolumna), '') zamieni puste stringi oraz te składające się z samych spacji na prawdziwy NULL
                        NULLIF(TRIM(UPPER(manufacturericao)), '') AS manufacturericao,
                        NULLIF(TRIM(UPPER(manufacturername)), '') AS manufacturername,
                        NULLIF(TRIM(UPPER(model)), '')            AS model,
                        NULLIF(TRIM(UPPER(operator)), '')         AS operator,
                        NULLIF(TRIM(UPPER(operatorcallsign)), '') AS operatorcallsign,
                        NULLIF(TRIM(UPPER(operatoriata)), '')     AS operatoriata,
                        NULLIF(TRIM(UPPER(operatoricao)), '')     AS operatoricao,
                        NULLIF(TRIM(UPPER(registration)), '')     AS registration,
                        NULLIF(TRIM(UPPER(status)), '')           AS status,
                        NULLIF(TRIM(UPPER(typecode)), '')         AS typecode,

                        -- 5. TYLKO USUWANIE ZBĘDNYCH SPACJI I PUSTYCH CIĄGÓW (bez zmiany na wielkie litery)
                        NULLIF(TRIM(country), '')      AS country,
                        NULLIF(TRIM(engines), '')      AS engines,
                        NULLIF(TRIM(nextreg), '')      AS nextreg,
                        NULLIF(TRIM(notes), '')        AS notes,
                        NULLIF(TRIM(owner), '')        AS owner,
                        NULLIF(TRIM(prevreg), '')      AS prevreg,
                        NULLIF(TRIM(serialnumber), '') AS serialnumber

                    FROM aircrafts;

                --sql            
                CREATE UNIQUE INDEX idx_mv_icao24 ON mv_aircraft_clean(icao24);

                --sql    
                -- Indeksy na najczęściej wyszukiwanych polach tekstowych
                CREATE INDEX idx_mv_registration ON mv_aircraft_clean(registration);
                        
                --sql    
                CREATE INDEX idx_mv_operator ON mv_aircraft_clean(operator);
                        
                --sql    
                CREATE INDEX idx_mv_model ON mv_aircraft_clean(model);   
                """)
        conn.commit()
except Exception as e:
    print(e)
print("Widok utworzono")    

## Pogoda

In [None]:
print("Tworzenie tabli weather_2024")
try:
    with psycopg.connect(conn_str) as conn:
        with conn.cursor() as cur:
            cur.execute("""--sql
                CREATE TABLE IF NOT EXISTS weather_2024 (
                    station VARCHAR(4),
                    flight_date DATE,
                    hour SMALLINT,
                    tmpf REAL,
                    sknt REAL,
                    gust REAL,
                    p01m REAL,
                    vsby REAL,
                    is_thunderstorm SMALLINT,
                    is_snow SMALLINT,
                    is_freezing SMALLINT,
                    PRIMARY KEY (station, flight_date, hour)
                );
                """)
        conn.commit()
except Exception as e:
    print(e)
print("Tabela utworzona") 

### Wyszukanie wszystkich unikalnych kodów lotnisk w tabeli flights_2024

In [9]:
query = 'SELECT DISTINCT "Origin" FROM flights_2024'
airport_codes = []
try:
    with psycopg.connect(conn_str) as conn:
        with conn.cursor() as cur:
            cur.execute(query)
            airport_codes = cur.fetchall()
except Exception as e:
    print(e) 
clean_airport_codes: list[str] = [row[0] for row in airport_codes]   
df_clean_airport_codes = pd.DataFrame(clean_airport_codes,columns=["airport_code"])
df_clean_airport_codes.to_parquet('./data/clean_data/airports/airports_codes.parquet',compression='snappy') 


In [None]:
import weather 
weather.download_weather_for_airports(clean_airport_codes)

### Brakujące lotniska

In [10]:
file_list = os.listdir('./data/raw_data/weather')
file_list = [x[8:11] for x in file_list]
diff_list = [x for x in clean_airport_codes if x not in file_list]
print(f'Pogody nie znaleziono dla {len(diff_list)} lotnisk')

Pogody nie znaleziono dla 0 lotnisk


Pogoda była pobierana po kodach IATA lotnisk, w celu uzupełnienia braków zostanie pobrane za pomocą kodów ICAO

In [None]:
airports_empty_weather_IATA_codes = airportsdata.load('IATA')
airports_empty_weather_map = {airport_code : airports_empty_weather_IATA_codes[airport_code]['icao'] for airport_code in diff_list}
isinstance(airports_empty_weather_map,dict)

In [None]:
weather.download_weather_for_airports(airports_empty_weather_map)

### Zapisanie plików w bazie danych do tabeli weather_2024

In [None]:
cols_to_insert = ['station', 'FlightDate', 'Hour', 'tmpf', 'sknt', 'gust', 'p01m', 'vsby', 'is_thunderstorm', 'is_snow', 'is_freezing']
weather_file_list = os.listdir('./data/raw_data/weather')

try:
    with psycopg.connect(conn_str) as conn:
        with conn.cursor() as cur:
            for weather_file in weather_file_list:
                print(f'Kopiowanie pliku: {weather_file} do tabeli weather_2024 w bazie danych')
                p_path = Path('./data/raw_data/weather') / weather_file
                df = pd.read_parquet(p_path)
                df["station"] = weather_file[8:11]
                df = df[cols_to_insert]
                with cur.copy("COPY weather_2024 FROM STDIN") as copy:
                    for row in df.itertuples(index=False,name=None):
                        copy.write_row(row)
                conn.commit()
                print(f'Kopiowanie pliku: {weather_file} do tabeli weather_2024 zakończono')
    print("Kopiowanie zakończono")                            
except Exception as ex:
    print(ex)    


Dane w tabeli weather_2024 są trzymane w czasie UTC, natomiast dane w flights_2024 są czasie lokalnym.
Dlatego w celu dopasowania danych zostanie utworzona tabela pomagające tłumaczyć czas UTC na czas lokalny

In [None]:
airports_db = airportsdata.load('IATA')
my_airports_codes = pd.read_parquet('./data/clean_data/airports/airports_codes.parquet')
timezone_data = []
missing_data = []
DDL_airports_tz = t"""CREATE TABLE IF NOT EXISTS airports_tz(iata_code VARCHAR(3) PRIMARY KEY,tz_name VARCHAR(50));"""


for airport_code in my_airports_codes['airport_code']:
    if airport_code in airports_db:
        timezone_name = airports_db[airport_code]['tz']
        timezone_data.append((airport_code,timezone_name))
    else:
        missing_data.append(airport_code) 

if missing_data:
    print(f"Uwaga: Nie znaleziono w bazie następujących kodów: {missing_data}")        

print(f"Przygotowano dane dla {len(timezone_data)} lotnisk. Kopiowanie do PostgreSQL...") 

try:
    with psycopg.connect(conn_str) as conn:
        with conn.cursor() as cur:
            cur.execute(DDL_airports_tz)
            cur.execute("TRUNCATE TABLE airports_tz")
            with cur.copy("COPY airports_tz (iata_code, tz_name) FROM STDIN") as copy:
                for row in timezone_data:
                    copy.write_row(row)
                    
            conn.commit()            
            print("Dane skopiowano do tabeli airports_tz")
            print("Obliczenie czasu lokalnego dla danych pogodowych")
            cur.execute("""
                        ALTER TABLE weather_2024 ADD COLUMN local_flight_date DATE;
                        ALTER TABLE weather_2024 ADD COLUMN local_hour SMALLINT;
                        CREATE INDEX idx_weather_local_fast ON weather_2024(station, local_flight_date, local_hour);
                        """)
            conn.commit()
            print("Zmodyfikowano table weather_2024 pod obliczenie czasu lokalnego dla danych pogodowych")
            cur.execute("""--sql
                        UPDATE weather_2024 w
                            SET 
                                local_flight_date = DATE((w.flight_date + w.hour * INTERVAL '1 hour') AT TIME ZONE 'UTC' AT TIME ZONE tz.tz_name),
                                local_hour = EXTRACT(HOUR FROM ((w.flight_date + w.hour * INTERVAL '1 hour') AT TIME ZONE 'UTC' AT TIME ZONE tz.tz_name))::SMALLINT
                            FROM airports_tz tz
                            WHERE w.station = tz.iata_code;
                        """)
            conn.commit()
            print("Obliczenie czasu lokalnego dla danych pogodowych zakończono")   
except Exception as ex:
    print(ex)

## Przygotowanie danych i ich złączenie

### Znalezienie top 30 lotnisk w USA, oraz najpopoularjniszje lotnisko w każdym stanie.
Obie listy zostaną połączone w jedną liste bez powtórzeń

In [11]:
query = t"""
WITH Top30Kraju AS (
    -- 30 najpopularniejszych lotnisk w całym USA
    SELECT "Origin", "OriginState", COUNT(*) as liczba_lotow
    FROM flights_2024
    GROUP BY "Origin", "OriginState"
    ORDER BY liczba_lotow DESC
    LIMIT 30
),
TopWStanach AS (
    -- Najpopularniejsze lotnisko w każdym stanie
    SELECT DISTINCT ON ("OriginState") 
        "Origin", "OriginState", COUNT(*) as liczba_lotow
    FROM flights_2024
    GROUP BY "OriginState", "Origin"
    ORDER BY "OriginState", liczba_lotow DESC
)

-- Połączenie obu zbiorów (UNION usunie duplikaty)
SELECT "Origin", "OriginState", liczba_lotow FROM Top30Kraju
UNION
SELECT "Origin", "OriginState", liczba_lotow FROM TopWStanach
ORDER BY liczba_lotow DESC;
"""
df_top_airports:pd.DataFrame = pd.DataFrame()
try:
    with psycopg.connect(conn_str) as conn:
        with conn.cursor() as cur:
            cur.execute(query)
            if cur.description is not None:
                data = cur.fetchall()
                colnames = [desc[0] for desc in cur.description]
                df_top_airports = pd.DataFrame(data,columns=colnames)
            else:
                print("Zapytanie nie zwróciło żadnych kolumn.")
                df_top_airports = pd.DataFrame()
except Exception as ex:
    print(ex)
df_top_airports

Unnamed: 0,Origin,OriginState,liczba_lotow
0,ATL,GA,342509
1,ORD,IL,317886
2,DFW,TX,315390
3,DEN,CO,313277
4,CLT,NC,252351
...,...,...,...
57,FAR,ND,6946
58,JAC,WY,5625
59,STT,VI,5505
60,CRW,WV,3433


In [None]:
create_master_view = t"""
-- Zwiększamy pamięć na łączenie tabel do 2 GB
SET work_mem = '2GB';

-- Zwiększamy pamięć na tworzenie widoków i indeksów do 4 GB
SET maintenance_work_mem = '4GB';

-- (Opcjonalnie) Upewniamy się, że silnik chętniej użyje szybkiego Hash Join
SET enable_nestloop = off;


CREATE MATERIALIZED VIEW mv_flights_2026_02_analysis AS
WITH top_airports AS (
	WITH Top30Kraju AS (
	    SELECT "Origin", "OriginState", COUNT(*) as liczba_lotow
	    FROM flights_2024
	    GROUP BY "Origin", "OriginState"
	    ORDER BY liczba_lotow DESC
	    LIMIT 30
	),
	TopWStanach AS (
	    SELECT DISTINCT ON ("OriginState") 
	        "Origin", "OriginState", COUNT(*) as liczba_lotow
	    FROM flights_2024
	    GROUP BY "OriginState", "Origin"
	    ORDER BY "OriginState", liczba_lotow DESC
	)
	
	-- Połączenie obu zbiorów (UNION usunie duplikaty)
	SELECT "Origin", "OriginState", liczba_lotow FROM Top30Kraju
	UNION
	SELECT "Origin", "OriginState", liczba_lotow FROM TopWStanach
	ORDER BY liczba_lotow DESC
)
SELECT 
    f.*, 
    
    -- DANE O SAMOLOCIE
    ac.*,
    (2024 - ac.built_year) AS ac_age,

    -- POGODA - WYLOT (Origin)
    wo.tmpf AS origin_tmpf,
    wo.sknt AS origin_sknt,
    wo.gust AS origin_gust,
    wo.p01m AS origin_precip,
    wo.vsby AS origin_vsby,
    wo.is_thunderstorm AS origin_thunderstorm,
    wo.is_snow AS origin_snow,
    wo.is_freezing AS origin_freezing,

    -- POGODA - PRZYLOT (Dest)
    wd.tmpf AS dest_tmpf,
    wd.sknt AS dest_sknt,
    wd.gust AS dest_gust,
    wd.p01m AS dest_precip,
    wd.vsby AS dest_vsby,
    wd.is_thunderstorm AS dest_thunderstorm,
    wd.is_snow AS dest_snow,
    wd.is_freezing AS dest_freezing

FROM 
    flights_2024 f

-- ==========================================
-- FILTR: WYLOT I PRZYLOT TYLKO Z/DO TOP 30
-- Łączymy z naszą wirtualną tabelą z bloku WITH
-- ==========================================
INNER JOIN top_airports top_airports_o ON f."Origin" = top_airports_o."Origin"
INNER JOIN top_airports top_airports_d ON f."Dest" = top_airports_d."Origin"

-- DANE O SAMOLOCIE
LEFT JOIN mv_aircraft_clean ac 
    ON f."Tail_Number" = ac.registration

-- STREFA I POGODA WYLOTU
LEFT JOIN weather_2024 wo 
    ON f."Origin" = wo.station 
    AND f."FlightDate" = wo.local_flight_date 
    AND (
        (f."CRSDepTime" / 100) + 
        CASE WHEN (f."CRSDepTime" % 100) >= 30 THEN 1 ELSE 0 END
    )::SMALLINT = wo.local_hour

LEFT JOIN weather_2024 wd 
    ON f."Dest" = wd.station 
    AND f."FlightDate" = wd.local_flight_date 
    AND (
        (f."CRSArrTime" / 100) + 
        CASE WHEN (f."CRSArrTime" % 100) >= 30 THEN 1 ELSE 0 END
    )::SMALLINT = wd.local_hour;
"""

In [None]:
create_idexes_for_master_view = t"""
CREATE INDEX idx_mv_top_flightdate ON mv_flights_2026_02_analysis ("FlightDate");

CREATE INDEX idx_mv_top_origin ON mv_flights_2026_02_analysis ("Origin");

CREATE INDEX idx_mv_top_dest ON mv_flights_2026_02_analysis ("Dest");


CREATE INDEX idx_mv_top_deptime ON mv_flights_2026_02_analysis ("CRSDepTime");    

CREATE INDEX idx_mv_top_arrtime ON mv_flights_2026_02_analysis ("CRSArrTime"); 

CREATE INDEX "idx_mv_top_FlightDate" ON mv_flights_2026_02_analysis ("FlightDate");

CREATE INDEX "idx_mv_top_Airline" ON mv_flights_2026_02_analysis ("Marketing_Airline_Network");

CREATE INDEX "idx_mv_top_Route" ON mv_flights_2026_02_analysis ("Origin", "Dest");

CREATE INDEX "idx_mv_top_Features" ON mv_flights_2026_02_analysis ("Month", "DayOfWeek", "Origin");

CREATE INDEX "idx_mv_top_ArrDel15" ON mv_flights_2026_02_analysis ("ArrDel15");
"""

In [None]:
try:
    with psycopg.connect(conn_str) as conn:
        with conn.cursor() as cur:
            print("Tworzenie Widoku")
            cur.execute(create_master_view)
            conn.commit()
            print("Tworzenie indeksów dla widoku")
            cur.execute(create_idexes_for_master_view)
            conn.commit()
            print("Operacje zakończono pomyślnie")
            
except Exception as ex:
    print(ex)            