 # Opis notatnika
Głównym celem w tym notatniku jest odpowiednie dostosowanie struktury danych z plików źródłowych do formatu zgodnego z `Postgres`, a następnie wgranie ich na nasz serwer. Dzięki temu w późniejszych krokach możemy niezależnie użyć danych do analizy czy raportowania.

## Połączenie z bazą danych

##### Importujemy potrzebne biblioteki oraz plik z ukrytymi danymi do logowania

In [2]:
import psycopg2
from dotenv import load_dotenv
import os
from sqlalchemy import create_engine
import pandas as pd

In [4]:
# Ładujemy zmienne środowiskowe z pliku .env
load_dotenv('DB_pass.env')

True

##### Podłączamy się do bazy danych

In [6]:
# Tworzymy url połączenia
url = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('N_DB_NAME')}"

# Tworzymy engine dla sqlalchemy
engine = create_engine(url)

 # Załadowanie i wczytanie ramek do obszaru roboczego

##### Tworzymy funkcję `load_raw_data`, która przyjmuje jeden parametr `file_name`, czyli nazwę pliku do zaczytania. Jej zadaniem jest wczytanie surowego pliku, zmodyfikowanie nazw kolumn z `NAZWA_KOLUMNY` na `nazwa_kolumny` oraz zwrócenie tak zmodyfikowanej ramki danych.

In [21]:
def load_raw_data(file_name):
    df = pd.read_csv(file_name)
    df.columns = [col.lower() for col in df.columns]
    
    return df

##### Zaczytujemy poszczególne pliki do ramek i sprawdzamy typy danych, czy pokrywają się z bazą danych w postgresql

In [29]:
aircraft_df = load_raw_data('../data/raw/aircraft.csv')
aircraft_df.head()

Unnamed: 0,manufacture_year,tail_num,number_of_seats
0,1944,N54514,0.0
1,1945,N1651M,0.0
2,1953,N100CE,0.0
3,1953,N141FL,0.0
4,1953,N151FL,0.0


In [42]:
aircraft_df.dtypes

manufacture_year      int64
tail_num             object
number_of_seats     float64
dtype: object

In [31]:
airport_weather_df = load_raw_data('../data/raw/airport_weather.csv')
airport_weather_df.head()

Unnamed: 0,wt18,station,name,date,awnd,prcp,snow,snwd,tavg,tmax,...,pgtm,wt10,wesd,sn32,sx32,psun,tsun,tobs,wt07,wt11
0,,USW00013874,ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPO...,2019-01-01,4.7,0.14,0.0,0.0,64.0,66.0,...,,,,,,,,,,
1,,USW00013874,ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPO...,2019-01-02,4.92,0.57,0.0,0.0,56.0,59.0,...,,,,,,,,,,
2,,USW00013874,ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPO...,2019-01-03,5.37,0.15,0.0,0.0,52.0,55.0,...,,,,,,,,,,
3,,USW00013874,ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPO...,2019-01-04,12.08,1.44,0.0,0.0,56.0,66.0,...,,,,,,,,,,
4,,USW00013874,ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPO...,2019-01-05,13.42,0.0,0.0,0.0,49.0,59.0,...,,,,,,,,,,


In [44]:
airport_weather_df.dtypes

wt18       float64
station     object
name        object
date        object
awnd       float64
prcp       float64
snow       float64
snwd       float64
tavg       float64
tmax       float64
tmin       float64
wdf2       float64
wdf5       float64
wsf2       float64
wsf5       float64
wt01       float64
wt08       float64
wt02       float64
wt03       float64
wt04       float64
wt09       float64
wt06       float64
wt05       float64
pgtm       float64
wt10       float64
wesd       float64
sn32       float64
sx32       float64
psun       float64
tsun       float64
tobs       float64
wt07       float64
wt11       float64
dtype: object

In [33]:
flight_df = load_raw_data('../data/raw/flight.csv')
flight_df.head()

Unnamed: 0,month,day_of_month,day_of_week,op_unique_carrier,tail_num,op_carrier_fl_num,origin_airport_id,dest_airport_id,crs_dep_time,dep_time,...,crs_elapsed_time,actual_elapsed_time,distance,distance_group,year,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay
0,1,20,7,WN,N204WN,682,10397,11292,605,602.0,...,205,204.0,1199,5,2019,,,,,
1,1,20,7,WN,N8682B,2622,10397,11292,2120,2114.0,...,210,205.0,1199,5,2019,,,,,
2,1,20,7,WN,N717SA,2939,10397,11292,1800,1807.0,...,210,220.0,1199,5,2019,4.0,0.0,10.0,0.0,3.0
3,1,20,7,WN,N709SW,3848,10397,11292,1355,1354.0,...,205,204.0,1199,5,2019,,,,,
4,1,20,7,WN,N7864B,1352,10397,11697,1125,1125.0,...,120,124.0,581,3,2019,,,,,


In [48]:
flight_df.dtypes

month                    int64
day_of_month             int64
day_of_week              int64
op_unique_carrier       object
tail_num                object
op_carrier_fl_num        int64
origin_airport_id        int64
dest_airport_id          int64
crs_dep_time             int64
dep_time               float64
dep_delay_new          float64
dep_time_blk            object
crs_arr_time             int64
arr_time               float64
arr_delay_new          float64
arr_time_blk            object
cancelled                int64
crs_elapsed_time         int64
actual_elapsed_time    float64
distance                 int64
distance_group           int64
year                     int64
carrier_delay          float64
weather_delay          float64
nas_delay              float64
security_delay         float64
late_aircraft_delay    float64
dtype: object

In [36]:
airport_list_df = load_raw_data('../data/raw/airport_list.csv')
airport_list_df.head()

Unnamed: 0,origin_airport_id,display_airport_name,origin_city_name,name
0,11638,Fresno Air Terminal,"Fresno, CA","FRESNO YOSEMITE INTERNATIONAL, CA US"
1,13342,General Mitchell Field,"Milwaukee, WI","MILWAUKEE MITCHELL AIRPORT, WI US"
2,13244,Memphis International,"Memphis, TN","MEMPHIS INTERNATIONAL AIRPORT, TN US"
3,15096,Syracuse Hancock International,"Syracuse, NY","SYRACUSE HANCOCK INTERNATIONAL AIRPORT, NY US"
4,10397,Atlanta Municipal,"Atlanta, GA",ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPO...


In [50]:
airport_list_df.dtypes

origin_airport_id        int64
display_airport_name    object
origin_city_name        object
name                    object
dtype: object

 # Eksport danych na bazę

##### Tworzymy funkcję `export_table_to_db`, która przyjmuje dwa parametry "df" i "table_name", czyli nazwę wczytywanej ramki danych i tabeli docelowej w bazie postgresql.

In [60]:
def export_table_to_db(df, table_name):
    print(f"Loading data into {table_name}...")

    # Eksportujemy ramkę do bazy danych
    df.to_sql(
        name=table_name,
        con=engine,
        if_exists='append',
        index=False,
        chunksize=1000
    )

    print(f"Data loaded successfully into {table_name}.")

 ## Wgrywanie danych

 ### Wgranie `aircraft_df` do tabeli `aircraft`

In [62]:
export_table_to_db(aircraft_df, 'aircraft')

Loading data into aircraft...
Data loaded successfully into aircraft.


 ### Wgranie `airport_weather_df` do tabeli `airport_weather`

In [64]:
export_table_to_db(airport_weather_df, 'airport_weather')

Loading data into airport_weather...
Data loaded successfully into airport_weather.


 ### Wgranie `flight_df` do tabeli `flight`

In [68]:
export_table_to_db(flight_df, 'flight')

Loading data into flight...
Data loaded successfully into flight.


 ### Wgranie `airport_list_df` do tabeli `airport_list`

In [69]:
export_table_to_db(airport_list_df, 'airport_list')

Loading data into airport_list...
Data loaded successfully into airport_list.


 # Sprawdzenie poprawności wykonania notatnika

In [70]:
def test_data_export(table_name, expected_count, expected_schema):
    real_count = pd.read_sql(f"SELECT COUNT(*) as cnt FROM {table_name}", engine).iloc[0][0]
    
    real_schema = pd.read_sql(f"SELECT * FROM {table_name} LIMIT 0", engine)
    real_schema = set(real_schema.columns)

    expected_schema = set(expected_schema)

    diff = real_schema.symmetric_difference(expected_schema)

    assert len(diff) == 0, ('Nie zgadzają się kolumny tabel....'
    f'\tOczekiwano: {expected_schema}'
    f'\tOtrzymano: {real_schema}'
    f'\tRóżnica: {diff}')

    assert expected_count == real_count, \
        f'Nie zgadza się liczba wierszy, oczekiwano {expected_count}, otrzymano {real_count} - sprawdź, czy nie dane nie zostały wgrane do tabeli "{table_name}" więcej niż raz.'

 ## Sprawdzenie tabeli `aircraft`

In [71]:
aircraft_expected_count = 7383
aircraft_expected_schema = ['id', 'manufacture_year', 'tail_num', 'number_of_seats']

test_data_export('aircraft', aircraft_expected_count, aircraft_expected_schema)

  real_count = pd.read_sql(f"SELECT COUNT(*) as cnt FROM {table_name}", engine).iloc[0][0]


 ## Sprawdzenie tabeli `airport_weather`

In [72]:
airport_weather_expected_count = 46226
airport_weather_expected_schema = [
       'id', 'station', 'name', 'date', 'awnd', 'prcp', 'snow', 'snwd', 'tavg', 
       'tmax', 'tmin', 'wdf2', 'wdf5', 'wsf2', 'wsf5', 'wt01', 'wt08', 'wt02',
       'wt03', 'wt04', 'wt09', 'wt06', 'wt05', 'pgtm', 'wt10', 'wesd', 'sn32',
       'sx32', 'psun', 'tsun', 'tobs', 'wt07', 'wt11', 'wt18']

test_data_export('airport_weather', airport_weather_expected_count, airport_weather_expected_schema)

  real_count = pd.read_sql(f"SELECT COUNT(*) as cnt FROM {table_name}", engine).iloc[0][0]


 ## Sprawdzenie tabeli `flight`

In [73]:
flight_expected_count = 1386120
flight_expected_schema = [
       'id', 'month', 'day_of_month', 'day_of_week', 'op_unique_carrier', 'tail_num',
       'op_carrier_fl_num', 'origin_airport_id', 'dest_airport_id',
       'crs_dep_time', 'dep_time', 'dep_delay_new', 'dep_time_blk',
       'crs_arr_time', 'arr_time', 'arr_delay_new', 'arr_time_blk',
       'cancelled', 'crs_elapsed_time', 'actual_elapsed_time', 'distance',
       'distance_group', 'year', 'carrier_delay', 'weather_delay', 'nas_delay',
       'security_delay', 'late_aircraft_delay']

test_data_export('flight', flight_expected_count, flight_expected_schema)

  real_count = pd.read_sql(f"SELECT COUNT(*) as cnt FROM {table_name}", engine).iloc[0][0]


 ## Sprawdzenie tabeli `airport_list`


In [80]:
airport_list_expected_count = 97
airport_list_expected_schema = ['id', 'origin_airport_id', 'display_airport_name', 'origin_city_name', 'name']

test_data_export('airport_list', airport_list_expected_count, airport_list_expected_schema)


  real_count = pd.read_sql(f"SELECT COUNT(*) as cnt FROM {table_name}", engine).iloc[0][0]


In [90]:
msg = "Wszystko wygląda OK :) Przechodzimy do kolejnego zadania."
print(msg)

Wszystko wygląda OK :) Przechodzimy do kolejnego zadania.


 # Podsumowanie
 W tym notatniku załadowaliśmy pobrane wcześniej pliki na bazę danych. Dzięki temu stworzyliśmy centralne miejsce ich magazynowania, co wykorzystamy zarówno przy analizie danych, jak i przy późniejszej budowie systemu raportowego.