# Pipeline

1. Maak SQL scripts voor schema's: RAW, ARCHIVED, CLEANSED
2. Importeer source data in RAW
3. Data cleaning => RAW naar ARCHIVED en CLEANSED
4. Maak SQL scripts voor Data Warehouse / Ster schema
5. Import van CLEANSED naar DWH
6. Prep Data lake: export tabellen naar Parquet files
7. Upload Parquet files naar S3 (eerst bucket aanmaken)
8. Maak Athena tables
9. Gebruik Athena in BI tool naar keuze

In [4]:
%pip install -q pandas sqlalchemy psycopg2-binary

Note: you may need to restart the kernel to use updated packages.


## Stap 1: SQL scripts

In [5]:
import psycopg2

# Verbindingsgegevens
host = "db"
dbname = "postgres"
user = "postgres"
password = "postgres"
port = "5432"  # Standaard PostgreSQL poort

# Maak de verbinding
conn = psycopg2.connect(
    host=host,
    dbname=dbname,
    user=user,
    password=password,
    port=port
)

# Maak een cursor aan
cur = conn.cursor()
# Open het SQL-bestand
with open('./sql_scripts/raw.sql', 'r') as file:
    sql_script = file.read()
cur.execute(sql_script)

with open('./sql_scripts/archived.sql', 'r') as file:
    sql_script = file.read()
cur.execute(sql_script)

with open('./sql_scripts/cleansed.sql', 'r') as file:
    sql_script = file.read()
cur.execute(sql_script)

conn.commit()  # Vergeet niet te committeren als het script wijzigingen maakt

## Stap 2: raw importeren

In [6]:
from sqlalchemy import create_engine, Integer, String, DateTime, Float, DECIMAL, CHAR, SmallInteger, DATE, TIME, Column
from datetime import datetime as dt

# Vervang 'username', 'password', 'host', 'port', en 'database' met jouw databasegegevens
engine = create_engine('postgresql://postgres:postgres@db:5432/postgres')

import pandas as pd

In [9]:
SCHEMA = 'raw'
TABEL = 'aankomst'

# Pas het pad naar je CSV-bestand aan
df = pd.read_csv(f'./source_data/export_{TABEL}.txt', sep='\t', dtype=str, encoding='raw_unicode_escape')

df.to_sql(TABEL, con=engine, schema=SCHEMA, if_exists='replace', index=False, dtype={
    "Vluchtid": String,
    "Vliegtuigcode": String,
    "Terminal": String,
    "Gate": String,
    "Baan": String,
    "Bezetting": String,
    "Vracht": String,
    "Aankomsttijd": String,
})


61

In [10]:
TABEL = 'banen'
file_path = f'./source_data/export_{TABEL}.csv' 
df = pd.read_csv(file_path, sep=';', dtype=str)


df.to_sql(TABEL, con=engine, schema=SCHEMA, if_exists='replace', index=False, dtype={
    "Baannummer": String,
    "Code": String,
    "Naam": String,
    "Lengte": String
})

6

In [11]:
TABEL = 'klant'
file_path = f'./source_data/export_{TABEL}.csv'
df = pd.read_csv(file_path, sep=';', dtype=str)


df.to_sql(TABEL, con=engine, schema=SCHEMA, if_exists='replace', index=False, dtype={
    "Vluchtid": String,
    "Operatie": String,
    "Faciliteiten": String,
    "Shops": String
})

110

In [12]:
TABEL = 'luchthavens'
file_path = f'./source_data/export_{TABEL}.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str, encoding='raw_unicode_escape')


df.to_sql(TABEL, con=engine, schema=SCHEMA, if_exists='replace', index=False, dtype={
    "Airport": String,
    "City": String,
    "Country": String,
    "IATA": String,
    "ICAO": String,
    "Lat": String,
    "Lon": String,
    "Alt": String,
    "TZ": String,
    "DST": String,
    "Tz": String
})

107

In [13]:
TABEL = 'maatschappijen'
file_path = f'./source_data/export_{TABEL}.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str, encoding='raw_unicode_escape')


df.to_sql(TABEL, con=engine, schema=SCHEMA, if_exists='replace', index=False, dtype={
    "Name": String,
    "IATA": String,
    "ICAO": String
})

166

In [14]:
TABEL = 'planning'
file_path = f'./source_data/export_{TABEL}.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str, encoding='raw_unicode_escape')


df.to_sql(TABEL, con=engine, schema=SCHEMA, if_exists='replace', index=False, dtype={
    "Vluchtnr": String,
    "Airlinecode": String,
    "Destcode": String,
    "Planterminal": String,
    "Plangate": String,
    "Plantijd": String
})

693

In [15]:
TABEL = 'vertrek'
file_path = f'./source_data/export_{TABEL}.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str, encoding='raw_unicode_escape')


df.to_sql(TABEL, con=engine, schema=SCHEMA, if_exists='replace', index=False, dtype={
    "Vluchtid": String,
    "Vliegtuigcode": String,
    "Terminal": String,
    "Gate": String,
    "Baan": String,
    "Bezetting": String,
    "Vracht": String,
    "Vertrektijd": String
})

447

In [16]:
TABEL = 'vliegtuig'
file_path = f'./source_data/export_{TABEL}.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str, encoding='raw_unicode_escape')


df.to_sql(TABEL, con=engine, schema=SCHEMA, if_exists='replace', index=False, dtype={
    "Vluchtid": String,
    "Vliegtuigcode": String,
    "Terminal": String,
    "Gate": String,
    "Baan": String,
    "Bezetting": String,
    "Vracht": String,
    "Vertrektijd": String
})

557

In [17]:
TABEL = 'vliegtuigtype'
file_path = f'./source_data/export_{TABEL}.csv'
df = pd.read_csv(file_path, sep=';', dtype=str)


df.to_sql(TABEL, con=engine, schema=SCHEMA, if_exists='replace', index=False, dtype={
    "IATA": String,
    "ICAO": String,
    "Merk": String,
    "Type": String,
    "Wake": String,
    "Cat": String,
    "Capaciteit": String,
    "Vracht": String
})

327

In [18]:
TABEL = 'vlucht'
file_path = f'./source_data/export_{TABEL}.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str, encoding='raw_unicode_escape')


df.to_sql(TABEL, con=engine, schema=SCHEMA, if_exists='replace', index=False, dtype={
    "Vluchtid": String,
    "Vluchtnr": String,
    "Airlinecode": String,
    "Destcode": String,
    "Vliegtuigcode": String,
    "Datum": String
})

512

In [19]:
TABEL = 'weer'
file_path = f'./source_data/export_{TABEL}.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str, encoding='raw_unicode_escape')


df.to_sql(TABEL, con=engine, schema=SCHEMA, if_exists='replace', index=False, dtype={
    "Datum": String,
    "DDVEC": String,
    "FHVEC": String,
    "FG": String,
    "FHX": String,
    "FHXH": String,
    "FHN": String,
    "FHNH": String,
    "FXX": String,
    "FXXH": String,
    "TG": String,
    "TN": String,
    "TNH": String,
    "TX": String,
    "TXH": String,
    "T10N": String,
    "T10NH": String,
    "SQ": String,
    "SP": String,
    "Q": String,
    "DR": String,
    "RH": String,
    "RHX": String,
    "RHXH": String,
    "PG": String,
    "PX": String,
    "PXH": String,
    "PN": String,
    "PNH": String,
    "VVN": String,
    "VVNH": String,
    "VVX": String,
    "VVXH": String,
    "NG": String,
    "UG": String,
    "UX": String,
    "UXH": String,
    "UN": String,
    "UNH": String,
    "EV2": String
})

644

awswrantler.s3.parquet zeker uitzoeken!!!!!!!!!

In [95]:
# Load data from raw schema into DataFrames
def load_data(engine, schema):
    table_names = ["aankomst", "banen", "klant", "luchthavens", "maatschappijen", "planning", "vertrek", "vliegtuig", "vliegtuigtype", "vlucht", "weer"]
    dataframes = {}
    for table in table_names:
        dataframes[table] = pd.read_sql_table(table_name=table, con=engine, schema=schema)
    return dataframes

raw_data = load_data(engine, 'raw')

In [165]:
illegal_chars = {
    "IATA": ['-', ' ', ';', '^', '+', '\\']
}

# Function to check for illegal characters
def contains_illegal_chars(value, illegal_chars):
    if any(char in str(value) for char in illegal_chars):
        return True
    return False

# Validation and conversion functions for each table
def validate_and_convert(df, schema, illegal_chars):
    validated_rows = []
    archived_rows = []
    for index, row in df.iterrows():
        try:
            converted_row = {}
            for column, column_info in schema.items():
                if pd.isna(row[column]):
                    if column_info.nullable:
                        converted_row[column] = None
                    else:
                        raise ValueError(f"Null value in non-nullable column: {column}")
                elif '\\N' in str(row[column]):
                    converted_row[column] = None
                else:
                    if column in illegal_chars and contains_illegal_chars(row[column], illegal_chars[column]):
                        raise ValueError(f"Illegal character found in column: {column}")
                    
                    
                    value = row[column]
                    
                    if isinstance(column_info.type, Integer):
                        converted_row[column] = int(value)
                    elif isinstance(column_info.type, Float):
                        converted_row[column] = float(value)
                    elif isinstance(column_info.type, SmallInteger):
                        converted_row[column] = int(value)
                    elif isinstance(column_info.type, String):
                        if len(str(value)) > column_info.type.length:
                            raise ValueError(f"String value too long for column: {column}")
                        converted_row[column] = str(value)
                    elif isinstance(column_info.type, DateTime):
                        converted_row[column] = pd.to_datetime(value)
                    elif isinstance(column_info.type, DATE):
                        converted_row[column] = pd.to_datetime(value).date()
                    elif isinstance(column_info.type, TIME):
                        converted_row[column] = dt.strptime(value, '%I:%M %p').time()
                    elif isinstance(column_info.type, DECIMAL):
                        converted_row[column] = round(float(value), column_info.type.scale)
            validated_rows.append(converted_row)
        except (ValueError, TypeError, AttributeError) as e:
            archived_rows.append(row.to_dict())
    return pd.DataFrame(validated_rows), pd.DataFrame(archived_rows)

# Define schemas
schema_cleansed = {
    "aankomst": {
        "Vluchtid": Column(Integer),
        "Vliegtuigcode": Column(String(8)),
        "Terminal": Column(CHAR(1)),
        "Gate": Column(String(2)),
        "Baan": Column(CHAR(1)),
        "Bezetting": Column(SmallInteger),
        "Vracht": Column(CHAR(2)),
        "Aankomsttijd": Column(DateTime)
    },
    "banen": {
        "Baannummer": Column(CHAR(1), nullable=False),
        "Code": Column(String(7)),
        "Naam": Column(String(30)),
        "Lengte": Column(SmallInteger)
    },
    "klant": {
        "Vluchtid": Column(Integer, nullable=False),
        "Operatie": Column(DECIMAL(2, 1)),
        "Faciliteiten": Column(DECIMAL(2, 1)),
        "Shops": Column(DECIMAL(2, 1))
    },
    "luchthavens": {
        "Airport": Column(String(100)),
        "City": Column(String(100)),
        "Country": Column(String(100)),
        "IATA": Column(CHAR(3), nullable=False),
        "ICAO": Column(CHAR(4)),
        "Lat": Column(Float),
        "Lon": Column(Float),
        "Alt": Column(SmallInteger),
        "TZ": Column(DECIMAL(3, 1)),
        "DST": Column(CHAR(1)),
        "Tz": Column(String(100))
    },
    "maatschappijen": {
        "Name": Column(String(50)),
        "IATA": Column(String(3), nullable=False),
        "ICAO": Column(String(3))
    },
    "planning": {
        "Vluchtnr": Column(String(7)),
        "Airlinecode": Column(String(3)),
        "Destcode": Column(CHAR(3)),
        "Planterminal": Column(CHAR(1)),
        "Plangate": Column(String(2)),
        "Plantijd": Column(TIME, nullable=False)
    },
    "vertrek": {
        "Vluchtid": Column(Integer, nullable=False),
        "Vliegtuigcode": Column(String(8)),
        "Terminal": Column(CHAR(1)),
        "Gate": Column(String(2)),
        "Baan": Column(CHAR(1)),
        "Bezetting": Column(SmallInteger),
        "Vracht": Column(CHAR(2)),
        "Vertrektijd": Column(DateTime, nullable=False)
    },
    "vliegtuig": {
        "Airlinecode": Column(String(3)),
        "Vliegtuigcode": Column(String(8), nullable=False),
        "Vliegtuigtype": Column(CHAR(3)),
        "Bouwjaar": Column(Integer)
    },
    "vliegtuigtype": {
        "IATA": Column(CHAR(3), nullable=False),
        "ICAO": Column(String(4)),
        "Merk": Column(String(50)),
        "Type": Column(String(100)),
        "Wake": Column(String(3)),
        "Cat": Column(String(10)),
        "Capaciteit": Column(String(3)),
        "Vracht": Column(String(2))
    },
    "vlucht": {
        "Vluchtid": Column(Integer, nullable=False),
        "Vluchtnr": Column(String(7), nullable=False),
        "Airlinecode": Column(String(3)),
        "Destcode": Column(CHAR(3)),
        "Vliegtuigcode": Column(String(8)),
        "Datum": Column(DATE)
    },
    "weer": {
        "Datum": Column(DATE, nullable=False),
        "DDVEC": Column(SmallInteger),
        "FHVEC": Column(SmallInteger),
        "FG": Column(SmallInteger),
        "FHX": Column(SmallInteger),
        "FHXH": Column(SmallInteger),
        "FHN": Column(SmallInteger),
        "FHNH": Column(SmallInteger),
        "FXX": Column(SmallInteger),
        "FXXH": Column(SmallInteger),
        "TG": Column(SmallInteger),
        "TN": Column(SmallInteger),
        "TNH": Column(SmallInteger),
        "TX": Column(SmallInteger),
        "TXH": Column(SmallInteger),
        "T10N": Column(SmallInteger),
        "T10NH": Column(SmallInteger),
        "SQ": Column(SmallInteger),
        "SP": Column(SmallInteger),
        "Q": Column(SmallInteger),
        "DR": Column(SmallInteger),
        "RH": Column(SmallInteger),
        "RHX": Column(SmallInteger),
        "RHXH": Column(SmallInteger),
        "PG": Column(SmallInteger),
        "PX": Column(SmallInteger),
        "PXH": Column(SmallInteger),
        "PN": Column(SmallInteger),
        "PNH": Column(SmallInteger),
        "VVN": Column(SmallInteger),
        "VVNH": Column(SmallInteger),
        "VVX": Column(SmallInteger),
        "VVXH": Column(SmallInteger),
        "NG": Column(SmallInteger),
        "UG": Column(SmallInteger),
        "UX": Column(SmallInteger),
        "UXH": Column(SmallInteger),
        "UN": Column(SmallInteger),
        "UNH": Column(SmallInteger),
        "EV2": Column(SmallInteger)
    }
}

# Validate, cleanse, and archive data
cleansed_data = {}
archived_data = {}

for table, df in raw_data.items():
    print(f"Processing {table}")
    cleansed_data[table], archived_data[table] = validate_and_convert(df, schema_cleansed[table], illegal_chars)

Processing aankomst
Processing banen
Processing klant
Processing luchthavens
Processing maatschappijen
Processing planning
Processing vertrek
Processing vliegtuig
Processing vliegtuigtype
Processing vlucht
Processing weer


In [167]:
schema_archived = {
    "aankomst": {
        "Vluchtid": Column(String),
        "Vliegtuigcode": Column(String),
        "Terminal": Column(String),
        "Gate": Column(String),
        "Baan": Column(String),
        "Bezetting": Column(String),
        "Vracht": Column(String),
        "Aankomsttijd": Column(String)
    },
    "banen": {
        "Baannummer": Column(String),
        "Code": Column(String),
        "Naam": Column(String),
        "Lengte": Column(String)
    },
    "klant": {
        "Vluchtid": Column(String),
        "Operatie": Column(String),
        "Faciliteiten": Column(String),
        "Shops": Column(String)
    },
    "luchthavens": {
        "Airport": Column(String),
        "City": Column(String),
        "Country": Column(String),
        "IATA": Column(String),
        "ICAO": Column(String),
        "Lat": Column(String),
        "Lon": Column(String),
        "Alt": Column(String),
        "TZ": Column(String),
        "DST": Column(String),
        "Tz": Column(String)
    },
    "maatschappijen": {
        "Name": Column(String),
        "IATA": Column(String),
        "ICAO": Column(String)
    },
    "planning": {
        "Vluchtnr": Column(String),
        "Airlinecode": Column(String),
        "Destcode": Column(String),
        "Planterminal": Column(String),
        "Plangate": Column(String),
        "Plantijd": Column(String)
    },
    "vertrek": {
        "Vluchtid": Column(String),
        "Vliegtuigcode": Column(String),
        "Terminal": Column(String),
        "Gate": Column(String),
        "Baan": Column(String),
        "Bezetting": Column(String),
        "Vracht": Column(String),
        "Vertrektijd": Column(String)
    },
    "vliegtuig": {
        "Airlinecode": Column(String),
        "Vliegtuigcode": Column(String),
        "Vliegtuigtype": Column(String),
        "Bouwjaar": Column(String)
    },
    "vliegtuigtype": {
        "IATA": Column(String),
        "ICAO": Column(String),
        "Merk": Column(String),
        "Type": Column(String),
        "Wake": Column(String),
        "Cat": Column(String),
        "Capaciteit": Column(String),
        "Vracht": Column(String)
    },
    "vlucht": {
        "Vluchtid": Column(String),
        "Vluchtnr": Column(String),
        "Airlinecode": Column(String),
        "Destcode": Column(String),
        "Vliegtuigcode": Column(String),
        "Datum": Column(String)
    },
    "weer": {
        "Datum": Column(String),
        "DDVEC": Column(String),
        "FHVEC": Column(String),
        "FG": Column(String),
        "FHX": Column(String),
        "FHXH": Column(String),
        "FHN": Column(String),
        "FHNH": Column(String),
        "FXX": Column(String),
        "FXXH": Column(String),
        "TG": Column(String),
        "TN": Column(String),
        "TNH": Column(String),
        "TX": Column(String),
        "TXH": Column(String),
        "T10N": Column(String),
        "T10NH": Column(String),
        "SQ": Column(String),
        "SP": Column(String),
        "Q": Column(String),
        "DR": Column(String),
        "RH": Column(String),
        "RHX": Column(String),
        "RHXH": Column(String),
        "PG": Column(String),
        "PX": Column(String),
        "PXH": Column(String),
        "PN": Column(String),
        "PNH": Column(String),
        "VVN": Column(String),
        "VVNH": Column(String),
        "VVX": Column(String),
        "VVXH": Column(String),
        "NG": Column(String),
        "UG": Column(String),
        "UX": Column(String),
        "UXH": Column(String),
        "UN": Column(String),
        "UNH": Column(String),
        "EV2": Column(String)
    }
}


In [168]:
# Convert SQLAlchemy Column types to SQLAlchemy types
def get_sqlalchemy_dtypes(schema):
    return {column_name: column_info.type for column_name, column_info in schema.items()}

# Insert data into cleansed and archived schemas using df.to_sql
def insert_data(data, schema, engine, schemas):
    for table, df in data.items():
        print(f'Inserting data into {schema}.{table}')
        if not df.empty:
            dtypes = get_sqlalchemy_dtypes(schemas[table])
            df.to_sql(name=table, con=engine, schema=schema, if_exists='replace', index=False, dtype=dtypes)

insert_data(cleansed_data, 'cleansed', engine, schema_cleansed)
insert_data(archived_data, 'archived', engine, schema_archived)

Inserting data into cleansed.aankomst
Inserting data into cleansed.banen
Inserting data into cleansed.klant
Inserting data into cleansed.luchthavens
Inserting data into cleansed.maatschappijen
Inserting data into cleansed.planning
Inserting data into cleansed.vertrek
Inserting data into cleansed.vliegtuig
Inserting data into cleansed.vliegtuigtype
Inserting data into cleansed.vlucht
Inserting data into cleansed.weer
Inserting data into archived.aankomst
Inserting data into archived.banen
Inserting data into archived.klant
Inserting data into archived.luchthavens
Inserting data into archived.maatschappijen
Inserting data into archived.planning
Inserting data into archived.vertrek
Inserting data into archived.vliegtuig
Inserting data into archived.vliegtuigtype
Inserting data into archived.vlucht
Inserting data into archived.weer


In [169]:
cleansed_data = load_data(engine, 'cleansed')

In [None]:
df_vlucht = cleansed_data['vlucht']
df_vertrek = cleansed_data['vertrek']
df_planning = cleansed_data['planning']

In [186]:
def time_to_minutes(t):
    return t.hour * 60 + t.minute + t.second / 60

In [209]:
merged_df = pd.merge(df_vlucht, df_vertrek, on=['Vluchtid', 'Vliegtuigcode'])
merged_df['Vertrektijd'] = merged_df['Vertrektijd'].dt.time
final_df = pd.merge(merged_df, df_planning, on=['Vluchtnr', 'Airlinecode', 'Destcode'])
final_df['Vertraging'] = final_df['Vertrektijd'].apply(time_to_minutes) - final_df['Plantijd'].apply(time_to_minutes)
final_df.sort_values(by='Vluchtid', ascending=True, inplace=True)
fact = final_df

In [210]:
df_vliegtuig = cleansed_data['vliegtuig']
df_vliegtuigtype = cleansed_data['vliegtuigtype']

In [211]:
dim_vliegtuig = pd.merge(df_vliegtuig, df_vliegtuigtype, left_on='Vliegtuigtype', right_on='IATA')

In [217]:
dim_banen = cleansed_data['banen']

In [222]:
dim_klant = cleansed_data['klant']

In [219]:
dim_luchthavens = cleansed_data['luchthavens']
dim_luchthavens = dim_luchthavens.rename(columns={'IATA': 'Destcode'})

In [221]:
dim_maatschappijen = cleansed_data['maatschappijen']
dim_maatschappijen = dim_maatschappijen.rename(columns={'IATA': 'Airlinecode'})

In [None]:
dim_weer = cleansed_data['weer']

In [232]:
import os
def write_to_parquet(df, file_path):

    directory = "parquet_data"

    if not os.path.exists(directory):
        os.makedirs(directory)
    df.to_parquet(file_path, engine='pyarrow')

In [233]:
write_to_parquet(fact, './parquet_data/fact.parquet')
write_to_parquet(dim_vliegtuig, './parquet_data/vliegtuig.parquet')
write_to_parquet(dim_banen, './parquet_data/banen.parquet')
write_to_parquet(dim_klant, './parquet_data/klant.parquet')
write_to_parquet(dim_luchthavens, './parquet_data/luchthavens.parquet')
write_to_parquet(dim_maatschappijen, './parquet_data/maatschappijen.parquet')
write_to_parquet(dim_weer, './parquet_data/weer.parquet')

In [79]:
# Save cleansed data to CSV files
def save_to_csv(data, directory='cleansed_data'):
    import os
    if not os.path.exists(directory):
        os.makedirs(directory)
    for table, df in data.items():
        file_path = os.path.join(directory, f"{table}.csv")
        df.to_csv(file_path, index=False)
        print(f"Saved {table} to {file_path}")

save_to_csv(cleansed_data)

Saved aankomst to cleansed_data/aankomst.csv
Saved banen to cleansed_data/banen.csv
Saved klant to cleansed_data/klant.csv
Saved luchthavens to cleansed_data/luchthavens.csv
Saved maatschappijen to cleansed_data/maatschappijen.csv
Saved planning to cleansed_data/planning.csv
Saved vertrek to cleansed_data/vertrek.csv
Saved vliegtuig to cleansed_data/vliegtuig.csv
Saved vliegtuigtype to cleansed_data/vliegtuigtype.csv
Saved vlucht to cleansed_data/vlucht.csv
Saved weer to cleansed_data/weer.csv
