# Real-time DFW Flight Data Using AviationStack API
This Python script retrieves real-time arrival and departure flight information for Dallas/Fort Worth International Airport (DFW) from the AviationStack API. The data is updated every minute and undergoes unit testing before being stored in the gold table.

**Fetch Data** → The script sends a GET request to the AviationStack API, filtering for today’s DFW arrivals and departures.

**Process & Structure** → The response is flattened into a structured PySpark DataFrame.

**Perform Unit Tests** → Before pushing the data, unit tests check for missing values, data integrity, and consistency.

**Store in Gold Table** → After validation, the clean data is written to the gold table, ensuring high-quality flight tracking information.

This automated pipeline ensures accurate, real-time flight data, supporting operational efficiency at DFW. ✈️📊

In [0]:
# IMPORT NECESSARY LIBRARIES
import uuid
import logging
import requests
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *

# DEPARTURE DATA
## USE AVIATION STACK API TO GET DFW AIRPORT DEPARTURE DATA

In [0]:
# SET UP VARIABLES
API_KEY = "your_api_key_here"
BASE_URL = "http://api.aviationstack.com/v1/flights"

# PULL IN GEOCODE DATA
airport_geocode = spark.sql("SELECT * FROM tabular.dataexpert.josephgabbrielle62095_airport_geocode_gold")

## LET'S START WITH DEPARTURE DATA
THIS FIRST PART OF THE CODE WILL GET DFW DEPARTURE FLIGHT DATA USING THE AVIATIONSTACK API.

In [0]:
# LIST OF AIRLINE IATA TO USE
airline_iata_codes = ["AA", "AC", "AF", "AM", "AS", "AZ", "B6", "BA", "DL", "EK", "F9", "IB", "KL", "LH", "LX", "NK", "QR", "SQ", "TK", "UA", "WS"]  

# DATA IS NESTED AND NEEDS TO BE FLATTENED
def flatten_flight(flight):
    """
    Flattens nested 'departure', 'arrival', 'airline', 'flight', 'aircraft', and 'live' dictionaries.
    """
    flattened = {
        "flight_date": flight.get("flight_date", ""),  # KEEP FLIGHT_DATE AS IS
        "flight_status": flight.get("flight_status", ""),  # KEEP FLIGHT_STATUS AS IS
    }

    for key, value in flight.items():
        if key in ["departure", "arrival", "airline", "flight", "aircraft", "live"] and isinstance(value, dict):
            for inner_key, inner_val in value.items():
                if inner_key == "codeshared" and isinstance(inner_val, dict):
                    for cs_key, cs_val in inner_val.items():
                        flattened[f"flight_codeshared_{cs_key}"] = str(cs_val) if cs_val is not None else ""
                else:
                    flattened[f"{key}_{inner_key}"] = str(inner_val) if inner_val is not None else ""

        elif key not in ["flight_date", "flight_status"]:  
            flattened[key] = str(value) if value is not None else ""

    return flattened

# LIST TO STORE FLIGHT DATA
all_flights_data = []

# LOOP OVER EACH AIRLINE USING THE GIVEN PARAMETERS
for airline in airline_iata_codes:
    params = {
        "access_key": API_KEY,
        "dep_iata": "DFW",  # SET DFW AS DEPARTURE AIRPORT
        "airline_iata": airline,  # LOOPS THROUGH LIST OF AIRPORTS ABOVE
    }

    response = requests.get(BASE_URL, params=params)

    if response.status_code == 200:
        data = response.json().get("data", [])
        
        if data:
            flattened_data = [flatten_flight(flight) for flight in data]
            all_flights_data.extend(flattened_data)
        else:
            print(f"No flight data available for airline {airline}.")
    else:
        print(f"Error fetching data for airline {airline}: {response.status_code}, {response.text}")

# CONVERTS TO PYSPARK DATAFRAME
if all_flights_data:
    dep_flights_df = spark.createDataFrame(all_flights_data)
    dep_flights_df = dep_flights_df.withColumn("uploaded_timestamp", current_timestamp())

    # WRITE DATA TO CSV FILE
    dep_flights_df.write.csv(f"/Volumes/tabular/dataexpert/josephgabbrielle62095/capstone_flight/departing_flight_csv/dep_data_{uuid.uuid4()}.csv", header=True)

    print("Data successfully written to csv.")
else:
    print("No flight data available for the selected airlines.")


## LET'S CLEAN THE DEPARTURE BRONZE DATA
TO GET THE DFW DEPARTURE DATA INTO A USEABLE FORMAT, THE DATA MUST BE IMPROVED BY CHANGING THE DATA TYPES AND THE COLUMN NAMES. THIS STEP WILL EXCLUDE ANY UNNEEDED DATA.

In [0]:
depart_bronze = spark.sql("SELECT * FROM tabular.dataexpert.josephgabbrielle62095_departing_flight_bronze")

In [0]:
display(depart_bronze)
depart_bronze.printSchema()

In [0]:
# DROP UNNEEDED COLUMNS
depart_silver = depart_bronze.drop('departure', 'arrival', 'airline', 'flight', 'aircraft', 'live', '_rescued_data', 'flight_codeshared')

# DEFINE COLUMN TYPES
type_conversions = {
    "flight_date": DateType(),
    "arrival_actual": TimestampType(),
    "arrival_actual_runway": TimestampType(),
    "arrival_delay": IntegerType(),
    "arrival_estimated": TimestampType(),
    "arrival_estimated_runway": TimestampType(),
    "arrival_scheduled": TimestampType(),
    "arrival_terminal": IntegerType(),
    "departure_actual": TimestampType(),
    "departure_actual_runway": TimestampType(),
    "departure_delay": IntegerType(),
    "departure_estimated": TimestampType(),
    "departure_estimated_runway": TimestampType(),
    "departure_scheduled": TimestampType(),
    "uploaded_timestamp": TimestampType()
}

# CAST COLUMNS AS NEW TYPES
for col_name, new_type in type_conversions.items():
    depart_silver = depart_silver.withColumn(col_name, col(col_name).cast(new_type))

# SELECT CERTAIN COLUMNS
depart_silver = depart_silver.select(
    "airline_iata",
    "airline_name",
    "arrival_iata",
    "departure_actual",
    "departure_delay",
    "departure_estimated",
    "departure_estimated_runway",
    "departure_iata",
    "departure_scheduled",
    "flight_iata",
    "flight_date",
    "flight_status",
    "uploaded_timestamp"
    )

# JOIN WITH GEOCODED AIRPORT LOCATIONS
depart_geocode = depart_silver \
                .join(broadcast(airport_geocode), depart_silver.arrival_iata == airport_geocode.iata, "left") \
                .select(
                    depart_silver['*'], 
                    airport_geocode['city'].alias('arrival_city'),
                    airport_geocode['country'].alias('arrival_country'),
                    airport_geocode['latitude_decimal_degrees'].alias('arrival_latitude'),
                    airport_geocode['longitude_decimal_degrees'].alias('arrival_longitude')
                ) 

# UPPER CASE AIRLINE NAME
depart_geocode = depart_geocode.withColumn("airline_name", upper(col("airline_name")))

# CREATE FLAG FOR INTERNATIONAL ARRIVALS
depart_geocode = depart_geocode.withColumn("arrival_international", when(col("arrival_country") == "USA", "N").otherwise("Y"))

# EDIT FLIGHT STATUS TO BE SPECIFIC
depart_geocode = depart_geocode.withColumn(
    "flight_status",
    when(col("flight_status") == "landed", "LANDED")
    .when((col("flight_status") == "scheduled") & (col("departure_delay") > 0), "DELAYED")
    .when((col("departure_delay").isNotNull()) & (col("departure_delay") > 0), "ENROUTE DELAYED")
    .otherwise(upper(col("flight_status")))
)

# DEFINE COLUMNS
schema_columns = [
    "airline_iata", "airline_name", "arrival_city", "arrival_country", "arrival_iata", "arrival_international" "arrival_latitude", "arrival_longitude", "departure_actual", "departure_delay", "departure_estimated", "departure_estimated_runway", "departure_iata", "departure_scheduled", "flight_iata", "flight_date", "flight_status", "uploaded_timestamp"
]

# SORT COLUMNS TO ALPHABETICAL ORDER
sorted_columns = sorted(depart_geocode.columns)

# REORDER DATAFRAME
depart_geocode = depart_geocode.select(sorted_columns)

display(depart_geocode)
depart_geocode.printSchema()

# CREATE SILVER TABLE
depart_geocode.write.mode("overwrite").saveAsTable("tabular.dataexpert.josephgabbrielle62095_departing_flight_silver")

## LET'S PERFORM THE UNIT TESTS
BY PERFORMING UNIT TESTS, END USERS CAN BE SURE OF THE QUALITY OF THE DATA. THIS WILL AVOID PUTTING INCORRECT OR WRONG DATA INTO PRODUCTION. API OWNERS FREQUENTLY CHANGE THE DATA SCHEMA OR DATA TYPE. THIS WILL FIND ANY CHANGES THAT WILL AFFECT PRODUCTION DATA.

In [0]:
# QUERY SILVER DEPARTURE DATA
depart_silver = spark.sql("SELECT * FROM tabular.dataexpert.josephgabbrielle62095_departing_flight_silver")
display(depart_silver)
depart_silver.printSchema()

In [0]:
# PRE-DETERMINED COLUMNS LISTS
depart_columns = ["airline_iata", "airline_name", "arrival_city", "arrival_country", "arrival_iata", "arrival_international", "arrival_latitude", "arrival_longitude", "departure_actual", "departure_delay", "departure_estimated", "departure_estimated_runway", "departure_iata", "departure_scheduled", "flight_date", "flight_iata", "flight_status", "uploaded_timestamp"]

# ENSURE EVERY COLUMN EXISTS
for i in depart_columns:
    if i in depart_silver.columns:
        print(f"Column '{i}' exists in DataFrame")
    else:
        raise ValueError(f"Missing column: {i}")

# CHECK THAT THE DATA ISN'T EMPTY
if depart_silver.count() > 1:
    print("Data found")
else:
    raise ValueError("There is no data!")

# CHECK THAT CERTAIN COLUMNS DON'T HAVE NULLS
if depart_silver.filter(col("airline_iata").isNull()).limit(1).count() > 0:
    raise ValueError("There is a null in the airline_iata column!")
elif depart_silver.filter(col("airline_name").isNull()).limit(1).count() > 0:
    raise ValueError("There is a null in the airline_name column!")
elif depart_silver.filter(col("arrival_iata").isNull()).limit(1).count() > 0:
    raise ValueError("There is a null in the arrival_iata column!")
elif depart_silver.filter(col("departure_iata").isNull()).limit(1).count() > 0:
    raise ValueError("There is a null in the departure_iata column!")
elif depart_silver.filter(col("departure_scheduled").isNull()).limit(1).count() > 0:
    raise ValueError("There is a null in the departure_scheduled column!")
elif depart_silver.filter(col("flight_iata").isNull()).limit(1).count() > 0:  
    raise ValueError("There is a null in the flight_iata column!")
elif depart_silver.filter(col("flight_date").isNull()).limit(1).count() > 0:  
    raise ValueError("There is a null in the flight_date column!")
elif depart_silver.filter(col("flight_status").isNull()).limit(1).count() > 0: 
    raise ValueError("There is a null in the flight_status column!")
else:
    print("No nulls found in the dataset")

## DEPARTURE GOLD TABLES
IF THE DATA PASSES THE UNIT TESTS, THEN THE DATA CAN BE WRITTEN INTO THE GOLD TABLE.

In [0]:
#  WRITE TO DATABASE IF PASS CHECKS FROM ABOVE
depart_silver.write.mode("overwrite").saveAsTable("tabular.dataexpert.josephgabbrielle62095_departing_flight_gold")

depart_gold = spark.sql("SELECT * FROM tabular.dataexpert.josephgabbrielle62095_departing_flight_gold")
display(depart_gold)

# ARRIVAL DATA
## USE AVIATION STACK API TO GET DFW AIRPORT ARRIVAL DATA

In [0]:
# LIST OF AIRLINES TO USE
airline_iata_codes = ["AA", "AC", "AF", "AM", "AS", "B6", "BA", "DL", "EK", "F9", "IB", "KL", "LH", "LX", "NK", "QR", "SQ", "TK", "UA", "WS"]  

def flatten_flight(flight):
    """
    Flattens nested 'departure', 'arrival', 'airline', 'flight', 'aircraft', and 'live' dictionaries.
    """
    flattened = {
        "flight_date": flight.get("flight_date", ""),  # KEEP FLIGHT_DATE AS IS
        "flight_status": flight.get("flight_status", ""),  # KEEP FLIGHT_STATUS AS IS
    }

    for key, value in flight.items():
        if key in ["departure", "arrival", "airline", "flight", "aircraft", "live"] and isinstance(value, dict):
            for inner_key, inner_val in value.items():
                if inner_key == "codeshared" and isinstance(inner_val, dict):
                    for cs_key, cs_val in inner_val.items():
                        flattened[f"flight_codeshared_{cs_key}"] = str(cs_val) if cs_val is not None else ""
                else:
                    flattened[f"{key}_{inner_key}"] = str(inner_val) if inner_val is not None else ""

        elif key not in ["flight_date", "flight_status"]:  
            flattened[key] = str(value) if value is not None else ""

    return flattened

# LIST TO STORE FLIGHT DATA
all_flights_data = []

# LOOP OVER EACH AIRLINE AND FETCH FLIGHT DATA
for airline in airline_iata_codes:
    params = {
        "access_key": API_KEY,
        "arr_iata": "DFW",  # FETCH ARRIVAL DATA FOR DFW
        "airline_iata": airline,  # FILTER FOR A SELECTED AIRLINE
    }

    response = requests.get(BASE_URL, params=params)

    if response.status_code == 200:
        data = response.json().get("data", [])
        
        if data:
            flattened_data = [flatten_flight(flight) for flight in data]
            all_flights_data.extend(flattened_data)
        else:
            print(f"No flight data available for airline {airline}.")
    else:
        print(f"Error fetching data for airline {airline}: {response.status_code}, {response.text}")

# CONVERT TO PYSPARK
if all_flights_data:
    arr_flights_df = spark.createDataFrame(all_flights_data)
    arr_flights_df = arr_flights_df.withColumn("uploaded_timestamp", current_timestamp())

    # WRITE DATA TO CSV FILES
    arr_flights_df.write.csv(f"/Volumes/tabular/dataexpert/josephgabbrielle62095/capstone_flight/arriving_flight_csv/arr_data_{uuid.uuid4()}.csv", header=True)

    print("Data successfully written to csv.")
else:
    print("No flight data available for the selected airlines.")

## LET'S CLEAN THE ARRIVAL BRONZE DATA
TO GET THE DFW ARRIVAL DATA INTO A USEABLE FORMAT, THE DATA MUST BE IMPROVED BY CHANGING THE DATA TYPES AND THE COLUMN NAMES. THIS STEP WILL EXCLUDE ANY UNNEEDED DATA.

In [0]:
arrival_bronze = spark.sql("SELECT * FROM tabular.dataexpert.josephgabbrielle62095_arriving_flight_bronze")
display(arrival_bronze)
arrival_bronze.printSchema()

In [0]:
# CHOOSE THE COLUMNS TO KEEP
arrival_bronze = arrival_bronze.select(
    "airline_iata",
    "airline_name",
    "arrival_actual",
    "arrival_actual_runway",
    "arrival_baggage",
    "arrival_delay",
    "arrival_estimated",
    "arrival_estimated_runway",
    "arrival_gate",
    "arrival_iata",
    "arrival_scheduled",
    "departure_actual",
    "departure_delay",
    "departure_estimated",
    "departure_estimated_runway",
    "departure_iata",
    "departure_scheduled",
    "flight_iata",
    "flight_date",
    "flight_number",
    "flight_status",
    "uploaded_timestamp"
    )

# JOIN THE TABLE WITH THE AIRPORT LOCATION TABLE
arrival_geocode = arrival_bronze \
                .join(broadcast(airport_geocode), arrival_bronze.departure_iata == airport_geocode.iata, "left") \
                .select(
                    arrival_bronze['*'], 
                    airport_geocode['city'].alias('depart_city'),
                    airport_geocode['country'].alias('depart_country'),
                    airport_geocode['latitude_decimal_degrees'].alias('depart_latitude'),
                    airport_geocode['longitude_decimal_degrees'].alias('depart_longitude')
                ) 

# CREATE FLAG FOR INTERNATIONAL FLIGHT
arrival_geocode = arrival_geocode.withColumn("depart_international", when(col("depart_country") == "USA", "N").otherwise("Y"))

# DEFINE COLUMNS AND THEIR TYPES
type_conversions = {
    "flight_date": DateType(),
    "arrival_actual": TimestampType(),
    "arrival_actual_runway": TimestampType(),
    "arrival_delay": IntegerType(),
    "arrival_estimated": TimestampType(),
    "arrival_estimated_runway": TimestampType(),
    "arrival_scheduled": TimestampType(),
    "departure_actual": TimestampType(),
    "departure_delay": IntegerType(),
    "departure_estimated": TimestampType(),
    "departure_estimated_runway": TimestampType(),
    "departure_scheduled": TimestampType(),
    "uploaded_timestamp": TimestampType()
}

# APPLY TYPES
for col_name, new_type in type_conversions.items():
    arrival_geocode = arrival_geocode.withColumn(col_name, col(col_name).cast(new_type))

# EDIT FLIGHT_STATUS FOR SPECIFICITY
arrival_geocode = arrival_geocode.withColumn(
    "flight_status",
    when(col("flight_status") == "landed", "LANDED")
    .when(col("arrival_delay") > 0, "DELAYED")
    .when(col("departure_delay") > 0, "DEPART DELAYED")
    .when((col("flight_status") == "scheduled") & (col("departure_delay") > 0), "DELAYED")
    .when((col("departure_delay").isNotNull()) & (col("departure_delay") > 0), "ENROUTE DELAYED")
    .when(col("flight_status") == "scheduled", "ON TIME")
    .otherwise(upper(col("flight_status")))
)

# EDIT ARRIVAL BAGGAGE TO SHOW NA IF NONE WAS ASSIGNED
arrival_geocode = arrival_geocode.withColumn(
    "arrival_baggage",
    when(col("arrival_baggage").isNull(), "UNASSIGNED")
    .otherwise(arrival_geocode.arrival_baggage)
)

# EDIT ARRIVAL GATE TO SHOW NA IF NONE WAS ASSIGNED
arrival_geocode = arrival_geocode.withColumn(
    "arrival_gate",
    when(col("arrival_gate").isNull(), "UNASSIGNED")
    .otherwise(arrival_geocode.arrival_gate)
)

# DROP COLUMN
arrival_geocode = arrival_geocode.drop('departure_delay')

# UPPERCASE AIRLINE NAME
arrival_geocode = arrival_geocode.withColumn("airline_name", upper(col("airline_name")))

# SORT COLUMNS
sorted_columns = sorted(arrival_geocode.columns)

# REORDER DATAFRAME
arrival_geocode = arrival_geocode.select(sorted_columns)

display(arrival_geocode)

# WRITE THE DATA TO THE SILVER TABLE
arrival_geocode.write.mode("overwrite").saveAsTable("tabular.dataexpert.josephgabbrielle62095_arrival_flight_silver")

## LET'S PERFORM THE UNIT TESTS
BY PERFORMING UNIT TESTS, END USERS CAN BE SURE OF THE QUALITY OF THE DATA. THIS WILL AVOID PUTTING INCORRECT OR MISSING DATA INTO PRODUCTION. API OWNERS FREQUENTLY CHANGE THE DATA SCHEMA OR DATA TYPE. THIS WILL FIND ANY CHANGES THAT WILL AFFECT PRODUCTION DATA.

In [0]:
# QUERY THE SILVER TABLE
arrival_silver = spark.sql("SELECT * FROM tabular.dataexpert.josephgabbrielle62095_arrival_flight_silver")
display(arrival_silver)
arrival_silver.printSchema()

In [0]:
# PRE-DETERMINED COLUMNS
arrival_columns = ["airline_iata", "airline_name", "arrival_actual", "arrival_actual_runway", "arrival_baggage", "arrival_delay", "arrival_estimated", "arrival_estimated_runway", "arrival_gate", "arrival_iata", "arrival_scheduled", "depart_city", "depart_country", "depart_international", "depart_latitude", "depart_longitude", "departure_actual", "departure_estimated", "departure_estimated_runway", "departure_iata", "departure_scheduled", "flight_date", "flight_iata", "flight_number", "flight_status", "uploaded_timestamp"]

# CHECK THAT EVERY COLUMN IS THERE
for i in arrival_columns:
    if i in arrival_silver.columns:
        print(f"Column '{i}' exists in DataFrame")
    else:
        raise ValueError(f"Missing column: {i}")

# CHECK THE DATA ISN'T EMPTY
if arrival_silver.count() > 1:
    print("Data found")
else:
    raise ValueError("There is no data!")

# COLUMNS THAT SHOULDN'T BE NULL
columns_to_check = [
    "airline_name", "arrival_estimated", "arrival_iata",
    "arrival_scheduled", "departure_estimated", "departure_iata", 
    "departure_scheduled", "flight_date", "flight_number", "flight_status"
]

# LOOP THROUGH THE COLUMNS TO CHECK FOR NULL DATA
for col_name in columns_to_check:
    if arrival_silver.filter(col(col_name).isNull()).limit(1).count() > 0:
        raise ValueError(f"There is a null in the {col_name} column!")

print("No nulls found in the dataset")

display(arrival_silver)

## DEPARTURE GOLD TABLES
IF THE DATA PASSES THE UNIT TESTS, THEN THE DATA CAN BE WRITTEN INTO THE GOLD TABLE.

In [0]:
# WRITE TO GOLD TABLES IS PASS ABOVE
arrival_silver.write.mode("overwrite").saveAsTable("tabular.dataexpert.josephgabbrielle62095_arrival_flight_gold")