# Project Data engineering 2023-2024
### Bers Goudantov, 2ITAI

## Data Cleaning

In [9]:
# requirements install
!pip install pandera
!pip install boto3
!pip install pandasql
!pip install numpy
!pip install awswrangler
!pip install awscli

'"\n!pip install pandera\n!pip install boto3\n!pip install pandasql\n!pip install numpy\n!pip install awswrangler\n!pip install awscli'

In [13]:
from sqlalchemy import create_engine, types as sqlalchemytypes, text
import pandas as pd

# Database connection string
DB_CONNECTION = 'postgresql://postgres:Newpassword@192.168.1.4:5432/postgres'

# Create SQLAlchemy engine
engine = create_engine(DB_CONNECTION)

def load_data(schema, table_name, engine):
    """Load data from a specified table in a given schema."""
    query = f'SELECT * FROM {schema}.{table_name}'
    df = pd.read_sql(query, engine)
    return df

def save_first_row_to_csv(df, file_name):
    """Save the first row of a DataFrame to a CSV file."""
    first_row = df.head(1)
    first_row.to_csv(file_name, index=False)
    print(f"First row saved to {file_name}")


In [14]:

def combine_and_clean_data(luchthavens, vliegtuig, vliegtuigtype, weer, maatschappijen, aankomst, vertrek, vlucht, klant):
    """Combine and clean data from the cleansed schema."""
    # Ensure the key columns exist
    if 'vliegtuigtype' not in vliegtuig.columns:
        raise KeyError("Column 'vliegtuigtype' not found in 'vliegtuig' table")
    if 'iata' not in vliegtuigtype.columns:
        raise KeyError("Column 'iata' not found in 'vliegtuigtype' table")

    # Strip any leading or trailing whitespace from the key columns
    vliegtuig['vliegtuigtype'] = vliegtuig['vliegtuigtype'].str.strip()
    vliegtuigtype['iata'] = vliegtuigtype['iata'].str.strip()
    maatschappijen['iata'] = maatschappijen['iata'].str.strip()
    vlucht['airlinecode'] = vlucht['airlinecode'].str.strip()

    # Combine vliegtuig and vliegtuigtype data
    vliegtuig_dim = vliegtuig.merge(vliegtuigtype, how='left', left_on='vliegtuigtype', right_on='iata')
    vliegtuig_dim = vliegtuig_dim[['airlinecode', 'vliegtuigcode', 'vliegtuigtype', 'bouwjaar', 
                                   'merk', 'type', 'wake', 'cat', 'capaciteit', 'vracht']].drop_duplicates(subset=['vliegtuigcode'])

    # Convert columns to correct dtypes
    vliegtuig_dim['capaciteit'] = pd.to_numeric(vliegtuig_dim['capaciteit'], errors='coerce')
    vliegtuig_dim['vracht'] = pd.to_numeric(vliegtuig_dim['vracht'], errors='coerce')
    
    # Copy the necessary columns from vlucht
    vlucht_fct = vlucht[['vluchtid', 'vluchtnr', 'airlinecode', 'destcode', 'vliegtuigcode']].copy()

    # Debug: Save first row of aankomst and vertrek to CSV
    save_first_row_to_csv(aankomst, "first_row_aankomst.csv")
    save_first_row_to_csv(vertrek, "first_row_vertrek.csv")

    # Add aankomst and vertrek data to vlucht_fct
    vlucht_fct = vlucht_fct.merge(aankomst[['vluchtid', 'bezetting', 'vracht', 'aankomsttijd']], on='vluchtid', how='left')
    print("Print 1 aankomst merge", vlucht_fct)
    vlucht_fct = vlucht_fct.merge(vertrek[['vluchtid', 'vertrektijd']], on='vluchtid', how='left')
    print("Print 2 vertrek merge", vlucht_fct)
    # Debug: Save first row of vlucht_fct after merge
    save_first_row_to_csv(vlucht_fct, "first_row_vlucht_fct_after_merge.csv")

    # Filter rows where airlinecode is exactly 2 characters long
    vlucht_fct = vlucht_fct[vlucht_fct['airlinecode'].str.len() == 2]
    print("Print 3", vlucht_fct)
    # Filter rows where airlinecode is present in maatschappij_dim
    valid_airlinecodes = maatschappijen['iata'].tolist()
    vlucht_fct = vlucht_fct[vlucht_fct['airlinecode'].isin(valid_airlinecodes)]
    print("Print 4", vlucht_fct)

    # Join with luchthaven_dim based on destcode and iata
    luchthaven_dim = luchthavens[['iata']].drop_duplicates(subset=['iata'])
    vlucht_fct = vlucht_fct.merge(luchthaven_dim, how='left', left_on='destcode', right_on='iata')
    vlucht_fct.rename(columns={'iata': 'dest_luchthaven_id'}, inplace=True)
    print("Print 4 Join with luchthaven_dim based on destcode and iata", vlucht_fct)

    # Convert aankomsttijd to date and join with weer_dim
    vlucht_fct['aankomsttijd'] = pd.to_datetime(vlucht_fct['aankomsttijd'], errors='coerce').dt.date
    vlucht_fct['aankomst_date'] = vlucht_fct['aankomsttijd'].astype(str)
    print("Print 4  Convert aankomsttijd to date and join with weer_dim", vlucht_fct)

    # Debug: Save the first row of vlucht_fct with aankomst_date
    save_first_row_to_csv(vlucht_fct, "first_row_vlucht_fct_with_date.csv")

    # Ensure that 'datum' in 'weer' is string type
    weer['datum'] = pd.to_datetime(weer['datum'], errors='coerce').dt.date.astype(str)

    # Merge with weer to get weer_id
    vlucht_fct = vlucht_fct.merge(weer[['datum']], how='left', left_on='aankomst_date', right_on='datum')
    print("Print 4 Merge with weer to get weer_id", vlucht_fct)
    vlucht_fct.rename(columns={'datum': 'weer_id'}, inplace=True)
    print("Print 5 Merge with weer to get weer_id", vlucht_fct)

    # Drop the helper columns used for merging
    vlucht_fct.drop(columns=['aankomst_date'], inplace=True)

    # Convert columns to correct dtypes
    vlucht_fct['bezetting'] = pd.to_numeric(vlucht_fct['bezetting'], errors='coerce')
    vlucht_fct['vracht'] = pd.to_numeric(vlucht_fct['vracht'], errors='coerce')

    # Debugging output to verify the structure and data types of vlucht_fct before renaming columns
    print(vlucht_fct.dtypes)
    save_first_row_to_csv(vlucht_fct, "output_before_renaming.csv")

    # Select only necessary columns for vlucht_fct
    try:
        vlucht_fct = vlucht_fct[['vluchtid', 'vluchtnr', 'airlinecode', 'destcode', 'vliegtuigcode', 'bezetting', 'vracht', 'aankomsttijd', 'vertrektijd', 'weer_id', 'dest_luchthaven_id']]
        vlucht_fct.rename(columns={'airlinecode': 'maatschappij_id'}, inplace=True)
    except KeyError as e:
        print("Available columns in vlucht_fct:", vlucht_fct.columns)
        raise e

    # Select only necessary columns and remove duplicates for luchthaven_dim
    luchthaven_dim = luchthavens[['airport', 'city', 'country', 'iata', 'icao', 'lat', 'lon', 'alt', 'tz', 'dst', 'tzname']].drop_duplicates(subset=['iata'])

    # Select only necessary columns for maatschappij_dim
    maatschappij_dim = maatschappijen[['name', 'iata', 'icao']].drop_duplicates(subset=['iata'])

    # Filter klant_dim to only include rows with vluchtid present in vlucht_fct
    valid_vluchtids = vlucht_fct['vluchtid'].tolist()
    klant_dim = klant[klant['vluchtid'].isin(valid_vluchtids)]

    return {
        'luchthaven_dim': luchthaven_dim,
        'vliegtuig_dim': vliegtuig_dim,
        'weer_dim': weer,
        'maatschappij_dim': maatschappij_dim,
        'vlucht_fct': vlucht_fct,
        'klant_dim': klant_dim
    }
    

In [12]:
def remove_duplicates(df, subset):
    """Remove duplicate rows based on a subset of columns."""
    df_dedup = df.drop_duplicates(subset=subset)
    return df_dedup


def write_to_dw(df, table_name, engine, dtype=None):
    """Write DataFrame to Data Warehouse."""
    # Ensure the DataFrame is not empty
    if df.empty:
        print(f"No data to write for table {table_name}")
        return
    
    try:
        # Clear the existing table
        with engine.connect() as connection:
            truncate_query = text(f"TRUNCATE TABLE dw.{table_name} RESTART IDENTITY CASCADE;")
            connection.execute(truncate_query)
        
        # Write the DataFrame to the database
        df.to_sql(table_name, engine, schema='dw', if_exists='append', index=False, dtype=dtype)
        print(f"Data successfully written to table {table_name}")

    except Exception as e:
        print(f"Failed to write data to table {table_name}: {e}")
        raise


def etl_flow():
    # Extract data
    luchthavens = load_data('cleansed', 'luchthavens', engine)
    vliegtuig = load_data('cleansed', 'vliegtuig', engine)
    vliegtuigtype = load_data('cleansed', 'vliegtuigtype', engine)
    weer = load_data('cleansed', 'weer', engine)
    maatschappijen = load_data('cleansed', 'maatschappijen', engine)
    aankomst = load_data('cleansed', 'aankomst', engine)
    vertrek = load_data('cleansed', 'vertrek', engine)
    vlucht = load_data('cleansed', 'vlucht', engine)
    klant = load_data('cleansed', 'klant', engine)

    # Combine and clean data
    combined_data = combine_and_clean_data(luchthavens, vliegtuig, vliegtuigtype, weer, maatschappijen, aankomst, vertrek, vlucht, klant)

    # Remove duplicates from vliegtuig_dim
    deduplicated_vliegtuig_dim = remove_duplicates(combined_data['vliegtuig_dim'], subset=['vliegtuigcode'])
    combined_data['vliegtuig_dim'] = deduplicated_vliegtuig_dim

    # Print the cleaned and deduplicated DataFrame for verification
    print("vliegtuig_dim after deduplication:", combined_data['vliegtuig_dim'])

    # Save the first row of vlucht_fct to a CSV file
    save_first_row_to_csv(combined_data['vlucht_fct'], 'first_row_vlucht_fct.csv')

    # Load data into data warehouse
    write_to_dw(combined_data['luchthaven_dim'], 'luchthaven_dim', engine)
    write_to_dw(combined_data['vliegtuig_dim'], 'vliegtuig_dim', engine)
    write_to_dw(combined_data['weer_dim'], 'weer_dim', engine)
    write_to_dw(combined_data['maatschappij_dim'], 'maatschappij_dim', engine)
    write_to_dw(combined_data['vlucht_fct'], 'vlucht_fct', engine)
    write_to_dw(combined_data['klant_dim'], 'klant_dim', engine, dtype={
        'operatie': sqlalchemytypes.DECIMAL(2, 1),
        'faciliteiten': sqlalchemytypes.DECIMAL(2, 1),
        'shops': sqlalchemytypes.DECIMAL(2, 1)
    })