# PIPLELINE FOR DATA WAREHOUSING AND DASHBOARDING

## Connection to Data Source
This funstion connects to the OLTP database named Airline in MySQL dbms.

In [74]:
import mysql.connector

def connect_to_data_source():
    connection_params = {
        'host': 'localhost',
        'database': 'airline',
        'user': 'root',
        'password': 'hina1234'
    }
    
    connection = mysql.connector.connect(
        host=connection_params['host'],
        user=connection_params['user'],
        password=connection_params['password'],
        database=connection_params['database']
    )
    
    return connection

connection=connect_to_data_source()

## Data ingestion
This function extract relevant attributes from the OLTP database that are to be used in Star Schema creation.

These attrbutes are then stored in a satging area which is a dataframe.

In [75]:
import mysql.connector
import pandas as pd

def ingest_data(connection):
    # Define the query
    query = """
    SELECT 
        A.Aircraft_Type, A.Manufacturer, A.Capacity,
        AMH.Maintenance_Date, 
        R.Distance, 
        T.Amount, 
        CF.Cancellation_Reason, CF.Cancellation_date, CF.Cancellation_time,
        RF.Reimbursement_fee, 
        DF.Airport_Name as DepartureAirport, AF.Airport_Name as ArrivalAirport, F.FlightDuration, F.TicketPrice, F.DepartureDate, F.DepartureTime, F.ArrivalDate, F.ArrivalTime,F.Over_Booked,
        DF.City as Dep_City, DF.Country as Dep_Country, DF.Latitude as Dep_Latitude, DF.Longitude as Dep_Longitude,
        AF.City as Arrival_City, AF.Country as Arrival_Country, AF.Latitude as Arrival_Latitude, AF.Longitude as Arrival_Longitude,
        CCF.Compensation_Amount
    FROM Cancelled_Flights CF
    LEFT JOIN Flight F ON CF.FlightID=F.FlightID
    LEFT JOIN Aircraft A ON F.AircraftID=A.AircraftID
    LEFT JOIN aircraft_maintenance_history AMH ON A.AircraftID=AMH.AircraftID
    LEFT JOIN Airport DF ON F.DepartureAirportID=DF.AirportID
    LEFT JOIN Airport AF ON F.ArrivalAirportID=AF.AirportID
    LEFT JOIN Routes R ON DF.AirportID = R.Departure_AirportID AND AF.AirportID = R.Arrival_AirportID
    LEFT JOIN Reservation RS ON F.FlightID=RS.FlightID
    LEFT JOIN Transaction T ON RS.ReservationID=T.ReservationID
    LEFT JOIN Cancelled_Flight_Compensation CCF ON CF.FlightID=CCF.FlightID
    LEFT JOIN Rescheduled_Flights RF ON CF.CancellationID=RF.CancellationID;
    """
    # Execute the query and fetch the data
    cursor = connection.cursor()
    cursor.execute(query)
    data = cursor.fetchall()
    
    # Get column names
    columns = [desc[0] for desc in cursor.description]
    
    # Create DataFrame
    df = pd.DataFrame(data, columns=columns)
    
    # Close the cursor and connection
    cursor.close()
    connection.close()
    
    return df
staging_area = ingest_data(connection)

## Data Cleaning
This function cleans the data by removing any null values in the dataframe staging area.

In [77]:
def clean_data(staging_area):
    # Interpolate numerical columns
    cleaned_data = staging_area.interpolate(method='ffill', axis=0)
    
    # Fill missing values in categorical columns with mode
    for column in cleaned_data.select_dtypes(include='object'):
        cleaned_data[column] = cleaned_data[column].fillna(cleaned_data[column].mode().iloc[0])
    
    return cleaned_data

staging_area = clean_data(staging_area)


## Creation of New Attributes
These functions create new attributes which are required in the Star Schema. 
These includes:
1. Primary keys for the dimension tables
2. Derived Facts
3. Extraction of data and time components.

In [78]:
import pandas as pd

def create_new_attributes(staging_area):
    
    # Define the function to create new attributes
    def add_attribute(staging_area, new_att, operation, column1, column2):
        if operation == 'add':
            staging_area[new_att] = staging_area[column1] + staging_area[column2]
        elif operation == 'subtract':
            staging_area[new_att] = staging_area[column1] - staging_area[column2]
        elif operation == 'multiply':
            staging_area[new_att] = staging_area[column1] * staging_area[column2]
        elif operation == 'divide':
            staging_area[new_att] = staging_area[column1] / staging_area[column2]
        elif operation == 'time_difference':
            staging_area[new_att] = (staging_area[column1] - staging_area[column2]).dt.components.hours
        elif operation == 'date_difference':
            staging_area[new_att] = (pd.to_datetime(staging_area[column1]) - pd.to_datetime(staging_area[column2])).dt.days
        else:
            print("Invalid operation. Please choose from 'add', 'subtract', 'multiply', 'divide', 'time_difference', or 'date_difference'.")
        
        return staging_area
    
    # Call the function to create new attributes
    staging_area = add_attribute(staging_area, 'Duration_of_Flight', 'time_difference', 'DepartureTime', 'ArrivalTime')
    staging_area = add_attribute(staging_area, 'Lead_Time', 'time_difference', 'Cancellation_time', 'DepartureTime')
    staging_area = add_attribute(staging_area, 'Cost', 'add', 'Compensation_Amount', 'Reimbursement_fee')
    staging_area = add_attribute(staging_area, 'Revenue_Loss', 'subtract', 'Amount', 'Cost')
    
    return staging_area

# Call the function to perform all operations
staging_area = create_new_attributes(staging_area)



In [79]:
def generate_primary_keys(df):
    df['DateID'] = df.index.map(lambda x: 'D' + str(x + 1))
    df['TimeID'] = df.index.map(lambda x: 'T' + str(x + 1))
    df['Dep_LocID'] = df.index.map(lambda x: 'DL' + str(x + 1))
    df['Arrival_LocID'] = df.index.map(lambda x: 'AL' + str(x + 1))
    df['AircraftID'] = df.index.map(lambda x: 'A' + str(x + 1))
    df['ReasonID'] = df.index.map(lambda x: 'R' + str(x + 1))
    return df

staging_area = generate_primary_keys(staging_area)

In [80]:
def extract_date_components(df, prefix, date_column):
    df[date_column] = pd.to_datetime(df[date_column])
    df[prefix + 'Day'] = df[date_column].dt.day
    df[prefix + 'Quarter'] = df[date_column].dt.quarter
    df[prefix + 'Month'] = df[date_column].dt.month
    df[prefix + 'Week'] = df[date_column].dt.isocalendar().week
    df[prefix + 'Year'] = df[date_column].dt.year
    return df

staging_area = extract_date_components(staging_area, '', 'Cancellation_date')

In [81]:
def extract_time_components(df, prefix, time_column):
    
    df[prefix + 'Hour'] = df[time_column].dt.components.hours
    df[prefix + 'Minute'] = df[time_column].dt.components.minutes
    df[prefix + 'Second'] = df[time_column].dt.components.seconds
    return df

staging_area = extract_time_components(staging_area, '', 'Cancellation_time')


## Creation of Star Schema Tables
This function create the Dimension and Fact tables in a new database in MySQL called data warehouse.

In [44]:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, ForeignKey, Date, DECIMAL

def create_star_schema_tables(engine, schema_definition):
    metadata = MetaData()
    for table_name, columns in schema_definition.items():
        columns_list = []
        for col_name, col_type in columns.items():
            column_kwargs = {}
            is_foreign_key = False
            if 'primary_key=True' in col_type:
                column_kwargs['primary_key'] = True
                col_type = col_type.replace(', primary_key=True', '')
            if 'ForeignKey' in col_type:
                foreign_key = col_type.split('ForeignKey(')[1].split(')')[0]
                is_foreign_key = True
                col_type = col_type.split(', ForeignKey(')[0]
            
            if col_type.startswith('String'):
                length = int(col_type.split('(')[1].split(')')[0])
                column = Column(col_name, String(length), **column_kwargs)
            elif col_type.startswith('Integer'):
                column = Column(col_name, Integer, **column_kwargs)
            elif col_type.startswith('Date'):
                column = Column(col_name, Date, **column_kwargs)
            elif col_type.startswith('Decimal'):
                precision, scale = map(int, col_type.split('(')[1].split(')')[0].split(','))
                column = Column(col_name, DECIMAL(precision=precision, scale=scale), **column_kwargs)
            else:
                raise ValueError(f"Unsupported column type: {col_type}")

            if is_foreign_key:
                column.foreign_keys.add(ForeignKey(foreign_key))

            columns_list.append(column)

        Table(table_name, metadata, *columns_list)
    metadata.create_all(engine)

schema_definition = {
    'DimDate': {
        'DateID': 'String(50), primary_key=True',
        'Day': 'Integer',
        'Week': 'Integer',
        'Month': 'String(50)',
        'Quarter': 'Integer',
        'Year': 'Integer'
    },
    'DimTime': {
        'TimeID': 'String(50), primary_key=True',
        'Hour': 'Integer',
        'Minute': 'Integer',
        'Second': 'Integer'
    },
    'DimDep_Location': {
        'Dep_LocID': 'String(50), primary_key=True',
        'Dep_Country': 'String(50)',
        'Dep_City': 'String(50)',
        'Dep_Latitude': 'Decimal(10,2)',
        'Dep_Longitude': 'Decimal(10,2)'
    },
    'DimArrival_Location': {
        'Arrival_LocID': 'String(50), primary_key=True',
        'Arrival_Country': 'String(50)',
        'Arrival_City': 'String(50)',
        'Arrival_Latitude': 'Decimal(10,2)',
        'Arrival_Longitude': 'Decimal(10,2)'
    },  
    'DimAircraft': {
        'AircraftID': 'String(50), primary_key=True',
        'Aircraft_Type': 'String(50)',
        'Manufacturer': 'String(50)',
        'Maintenance_Date': 'Date'
    },  
    'DimReason': {
        'ReasonID': 'String(50), primary_key=True',
        'Cancellation_Reason': 'String(50)'
    },
    'FactCanc_Flight': {
        'FactID': 'String(50), primary_key=True',
        'DateID': 'String(50), ForeignKey(DimDate.DateID)',
        'TimeID': 'String(50), ForeignKey(DimTime.TimeID)',
        'Dep_LocID': 'String(50), ForeignKey(DimDep_Location.Dep_LocID)',
        'Arrival_LocID': 'String(50), ForeignKey(DimArrival_Location.Arrival_LocID)',
        'AircraftID': 'String(50), ForeignKey(DimAircraft.AircraftID)',
        'ReasonID': 'String(50), ForeignKey(DimReason.ReasonID)',
        'Over_Booked': 'Integer',
        'Capacity': 'Integer',
        'Distance': 'Integer',
        'Amount': 'Integer',
        'Lead_Time': 'Integer',
        'Duration_of_Flight': 'Integer',
        'Revenue_Loss': 'Integer',
        'Cost': 'Integer'
    }
}

engine = create_engine('mysql+mysqlconnector://root:hina1234@localhost/data warehouse')
create_star_schema_tables(engine, schema_definition)


## Mapping of Data to Dimension Tables
This function maps data from the staging area to the dimension tables in our data warehouse.

In [82]:
from sqlalchemy import create_engine

def map_to_dimtable(staging_area):
    engine = create_engine('mysql+mysqlconnector://root:hina1234@localhost/data warehouse')
    dimension_mappings = {
    'dimdate': ['DateID','Day', 'Week', 'Month', 'Quarter', 'Year'],
    'dimtime': [ 'TimeID','Hour', 'Minute', 'Second'],
    'dimdep_location': ['Dep_LocID','Dep_Country', 'Dep_City', 'Dep_Latitude', 'Dep_Longitude'],
    'dimarrival_location': ['Arrival_LocID','Arrival_Country', 'Arrival_City', 'Arrival_Latitude', 'Arrival_Longitude'],
    'dimaircraft':['AircraftID','Aircraft_Type','Manufacturer','Maintenance_Date'],
    'dimreason':['ReasonID','Cancellation_Reason']
    }
    with engine.connect() as connection:
        for table_name, columns in dimension_mappings.items():
            dimension_data = staging_area[columns].copy()
            dimension_data.to_sql(table_name, connection, if_exists='replace', index=False)
            
map_to_dimtable(staging_area)


## Mapping of Data to Fact Table
This function maps data from the staging area to the fact table in our data warehouse.

In [83]:
from sqlalchemy import create_engine
import pandas as pd

def map_to_facttable(staging_area):
    engine = create_engine('mysql+mysqlconnector://root:hina1234@localhost/data warehouse')
    dimension_mappings = {
    'dimdate': ['DateID'],
    'dimtime': ['TimeID'],
    'dimdep_location': ['Dep_LocID'],
    'dimarrival_location': ['Arrival_LocID'],
    'dimaircraft': ['AircraftID'],
    'dimreason': ['ReasonID'],
    
    }
    fact_mappings = {
    'factcanc_flight': ['DateID', 'TimeID', 'Dep_LocID', 'Arrival_LocID', 'AircraftID', 'ReasonID','Over_Booked', 'Capacity', 'Distance', 'Amount', 'Lead_Time', 'Duration_of_Flight', 'Revenue_Loss', 'Cost']
    }
    with engine.connect() as connection:
        for table_name, columns in fact_mappings.items():
            fact_data = staging_area[columns].copy()
            fact_data['FactID'] = range(1, len(fact_data) + 1)
            # Join with dimension tables to get foreign keys
            for dim_table, dim_columns in dimension_mappings.items():
                for dim_col in dim_columns:
                    if dim_col in fact_data.columns:
                        dim_data = pd.read_sql(f"SELECT {dim_col} FROM {dim_table}", connection)
                        fact_data = fact_data.merge(dim_data, on=dim_col, how='left')
            
            # Writing data to SQL, including primary key column
            
            fact_data.to_sql(table_name, connection, if_exists='replace', index=False)

map_to_facttable(staging_area)

## Execution of Dimensional Queiries:

### The following code executes these Dimensional Queries.
1. WHAT IS THE AVERAGE NO. OF FLIGHTS CANCELLED IN CANADA BECUASE OF CREW UNAVAILABILITY?
2. WHICH AIRPLANE'S FLIGHTS GETS CANCELLED THE MOST?
3. WHAT IS THE TOTAL REVENUE_LOSS IN 2023?


In [84]:
from sqlalchemy import create_engine
import pandas as pd

def execute_dimensional_queries():
    queries = {
        "Dimensional Query 1: Average Cancelled Flights in Canada": """
            SELECT AVG(cancelled_flights) AS average_cancelled_flights
            FROM (
                SELECT COUNT(*) AS cancelled_flights
                FROM factcanc_flight f
                JOIN DimDep_Location as DL on f.Dep_LocID=DL.Dep_LocID
                JOIN DimReason as R on f.ReasonID=R.ReasonID
                WHERE Dep_Country = 'Canada'
                  AND cancellation_reason = 'Crew Unavailability'
                GROUP BY DATE(DateID)
            ) AS daily_cancelled_flights;
        """,
        "Dimensional Query 2: Aircraft with highest no. of cancelled flights":"""
        select a.aircraft_type, COUNT(FactID) as cancelled_flights from DimAircraft a
        join factcanc_flight f on a.AircraftID=f.AircraftID
        GROUP BY a.aircraft_type
        ORDER BY cancelled_flights desc limit 1;
        """,
        "Dimensional Query 3: Total Revenue loss in 2023": """
        SELECT sum(f.revenue_loss) from factcanc_flight f 
        join DimDate d on f.DateID=d.DateID
        where d.year=2023
        """
    }
    
    engine = create_engine('mysql+mysqlconnector://root:hina1234@localhost/data warehouse')
    results = {}
    for query_name, query in queries.items():
        results[query_name] = pd.read_sql(query, engine)
        
    # Print the results
    for query_name, result in results.items():
        print(f"{query_name}:")
        print(f"{result}\n")

execute_dimensional_queries()


Dimensional Query 1: Average Cancelled Flights in Canada:
   average_cancelled_flights
0                      113.0

Dimensional Query 2: Aircraft with highest no. of cancelled flights:
   aircraft_type  cancelled_flights
0  Regional Jets              13359

Dimensional Query 3: Total Revenue loss in 2023:
   sum(f.revenue_loss)
0         3.916041e+09



## Fact Table snapshot
This function creates a dataframe of Fact table snapshot.

In [23]:
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime

def extract_fact_table_snapshot():
    fact_table = 'factcanc_flight'
    dimension_tables = ['dimdate', 'dimtime', 'dimdep_location', 'dimarrival_location', 'dimaircraft', 'dimreason']
    
    snapshot_query = f"""
        SELECT f.FactID,f.Capacity,f.Distance,f.Amount,f.Lead_Time,f.Duration_of_Flight,f.Revenue_Loss,f.Cost,f.Over_Booked,
               d1.Day, d1.Week, d1.Month, d1.Quarter, d1.Year,
               d2.Hour, d2.Minute, d2.Second, 
               d3.Dep_Country, d3.Dep_City, d3.Dep_Latitude, d3.Dep_Longitude,
               d4.Arrival_Country,d4.Arrival_City,d4.Arrival_Latitude,d4.Arrival_Longitude,
               d5.Aircraft_Type,d5.Manufacturer,d5.Maintenance_Date,
               d6.Cancellation_Reason
        FROM {fact_table} f
        JOIN dimdate d1 ON f.DateID = d1.DateID
        JOIN dimtime d2 ON f.TimeID = d2.TimeID
        JOIN dimdep_location d3 ON f.Dep_LocID = d3.Dep_LocID
        JOIN dimarrival_location d4 ON f.Arrival_LocID = d4.Arrival_LocID
        JOIN dimaircraft d5 ON f.AircraftID = d5.AircraftID
        JOIN dimreason d6 ON f.ReasonID = d6.ReasonID
    """
    
    engine = create_engine('mysql+mysqlconnector://root:hina1234@localhost/data warehouse')
    
    snapshot = pd.read_sql(snapshot_query, engine)
    
    # Adding date of data entry to track percentage change later
    def add_date_col(snapshot):
        def current_time_hour_minute():
            return datetime.now().replace(second=0, microsecond=0)

        snapshot['DateTime'] = current_time_hour_minute()
        return snapshot

    snapshot = add_date_col(snapshot)
    
    return snapshot

# Call the function to extract the fact table snapshot
snapshot_ft = extract_fact_table_snapshot()

## Exporting Fact Table Snapshot to SQL for Dashboarding
 This snapshot is then exported to MySQL database 'data warehouse' which is then exported to Power BI for dashboarding.

In [24]:
def export_ft(snapshot):
    engine = create_engine('mysql+mysqlconnector://root:hina1234@localhost/data warehouse')
    snapshot.to_sql('fact_table_snapshot', con=engine, if_exists='replace', index=False)
export_ft(snapshot_ft)

## POWER BI DASHBOARD

The Dynamic Power BI Dashboard is hosted on the web.

After that more OLTP data was generated and pipleine was executed to update the dashboard. The percentage change in updates was also recorded in a dashboard. Both of the dashbboards were hosted.

[View Dashboard](https://app.powerbi.com/view?r=eyJrIjoiOWY3MTc3ODktMjE4Ny00Zjc3LTgwMWQtN2Q0NWY5ODkzOTNmIiwidCI6ImZlZTNiOTE2LTAxYzEtNDk4Ny1hNjQ2LWUxOTM0MzJiOWVhYSIsImMiOjl9)

## MASTER FUNCTION
This function automates everything. When new data is updated in the OLTP database, this fucntion runs and updates data in data warehouse and also export it to power BI so the dashboard updates automatically.

In [53]:
def Master_Controller():
    connection=connect_to_data_source()
    staging_area = ingest_data(connection)
    staging_area = clean_data(staging_area)
    staging_area = create_new_attributes(staging_area)
    staging_area = generate_primary_keys(staging_area)
    staging_area = extract_date_components(staging_area, '', 'Cancellation_date')
    staging_area = extract_time_components(staging_area, '', 'Cancellation_time')
    map_to_dimtable(staging_area)
    map_to_facttable(staging_area)
    execute_dimensional_queries()
    snapshot_ft = extract_fact_table_snapshot()
    export_ft(snapshot_ft)
    
Master_Controller()
snapshot_ft

Dimensional Query 1: Average Cancelled Flights in Canada:
   average_cancelled_flights
0                      113.0

Dimensional Query 2: Aircraft with highest no. of cancelled flights:
   aircraft_type  cancelled_flights
0  Regional Jets              12977

Dimensional Query 3: Total Revenue loss in 2023:
   sum(f.revenue_loss)
0         3.916041e+09



Unnamed: 0,FactID,Capacity,Distance,Amount,Lead_Time,Duration_of_Flight,Revenue_Loss,Cost,Over_Booked,Day,...,Dep_Longitude,Arrival_Country,Arrival_City,Arrival_Latitude,Arrival_Longitude,Aircraft_Type,Manufacturer,Maintenance_Date,Cancellation_Reason,DateTime
0,1,250,14214,458052.0,2,14,426034.0,32018,1,6,...,-65.85,Canada,Plano,-36.97,-59.19,Commercial Airliner,Airbus,2024-03-30,Crew Unavailability,2024-06-02 14:53:00
1,2,250,14214,275331.0,2,14,243313.0,32018,1,6,...,-65.85,Canada,Plano,-36.97,-59.19,Commercial Airliner,Airbus,2024-03-30,Crew Unavailability,2024-06-02 14:53:00
2,3,250,14214,458052.0,2,14,426034.0,32018,1,6,...,-65.85,Canada,Plano,-36.97,-59.19,Commercial Airliner,Airbus,2024-09-04,Crew Unavailability,2024-06-02 14:53:00
3,4,250,14214,275331.0,2,14,243313.0,32018,1,6,...,-65.85,Canada,Plano,-36.97,-59.19,Commercial Airliner,Airbus,2024-09-04,Crew Unavailability,2024-06-02 14:53:00
4,5,250,14214,458052.0,2,14,426034.0,32018,1,6,...,-65.85,Canada,Plano,-36.97,-59.19,Commercial Airliner,Airbus,2023-10-18,Crew Unavailability,2024-06-02 14:53:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
28800,28801,500,12383,322222.0,10,20,293243.0,28979,1,17,...,63.79,Mongolia,Miami,20.70,102.60,Commercial Airliner,Airbus,2024-06-04,Weather Conditions (departure location),2024-06-02 14:53:00
28801,28802,500,12383,322222.0,10,20,293243.0,28979,1,17,...,63.79,Mongolia,Miami,20.70,102.60,Commercial Airliner,Airbus,2024-02-24,Weather Conditions (departure location),2024-06-02 14:53:00
28802,28803,500,12383,322222.0,10,20,293243.0,28979,1,17,...,63.79,Mongolia,Miami,20.70,102.60,Commercial Airliner,Airbus,2023-10-26,Weather Conditions (departure location),2024-06-02 14:53:00
28803,28804,500,12383,322222.0,10,20,293243.0,28979,1,17,...,63.79,Mongolia,Miami,20.70,102.60,Commercial Airliner,Airbus,2023-10-11,Weather Conditions (departure location),2024-06-02 14:53:00
