In [1]:
from FlightRadar24 import FlightRadar24API
from pyspark.sql import SparkSession
from utils.calculate_distance import calculate_distance
from utils.map_zone import map_zone
import os
from datetime import datetime
import logging
import json
from config import airport_schema, flight_schema, airline_schema, zone_schema  # Import schemas

os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk-11'  # Update to your Java path
os.environ['SPARK_HOME'] = 'C:/spark'  # Update to your Spark installation path
os.environ['HADOOP_HOME'] = 'C:/hadoop'  # Required for Windows
os.environ["PYSPARK_PYTHON"] = "C:\\Users\\tefte\\anaconda3\\envs\\kata-env\\python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "C:\\Users\\tefte\\anaconda3\\envs\\kata-env\\python"

class FlightPipeline:
    def __init__(self):
        self.fr_api = FlightRadar24API()
        self.spark = SparkSession.builder.appName("FlightPipeline").getOrCreate()
        self.df_flights = None
        self.df_airports = None
        self.df_airlines = None
        self.df_zones = None

        # Initialize logging
        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s - %(levelname)s - %(message)s",
            handlers=[
                logging.FileHandler("pipeline.log"),
                logging.StreamHandler()
            ]
        )

    def extract(self):
        try:
            # Step 1: Ingest Flights
            flights = self.fr_api.get_flights()
            data_flights = []
            for flight in flights:
                try:
                    data_flights.append({
                        "flightID": flight.id,
                        "aircraft": flight.aircraft_code,
                        "airlineCode": flight.airline_icao,
                        "airportOrigineCode": flight.origin_airport_iata,
                        "airportDestinationCode": flight.destination_airport_iata,
                        "flightStatus": flight.on_ground
                    })
                except Exception as e:
                    logging.error(f"Error retrieving flight details for flight {flight}: {e}")
            self.df_flights = self.spark.createDataFrame(data_flights, schema=flight_schema)  # Use imported schema
            # self.df_flights.show()
            logging.info('Step 1: Ingest Flights Finished')

            # Step 2: Ingest Airports
            airports = self.fr_api.get_airports()
            data_airports = []
            for airport in airports:
                try:
                    data_airports.append({
                        "airportID": airport.iata,
                        "airportName": airport.name,
                        "airportLat": float(airport.latitude) if airport.latitude else None,
                        "airportLong": float(airport.longitude) if airport.longitude else None,
                        "airportCountryName": airport.country,
                    })
                except Exception as e:
                    logging.error(f"Error retrieving airport details for airport {airport}: {e}")
            self.df_airports = self.spark.createDataFrame(data_airports, schema=airport_schema)  # Use imported schema
            # self.df_airports.show()
            logging.info('Step 2: Ingest Airports Finished')

            # Step 3: Ingest Airlines
            data_airlines = self.fr_api.get_airlines()
            self.df_airlines = self.spark.createDataFrame(data_airlines, schema=airline_schema)  # Use imported schema
            # self.df_airlines.show()
            logging.info('Step 3: Ingest Airlines Finished')

            # Step 4: Ingest Zones
            zones = self.fr_api.get_zones()
            data_zones = []
            for continent, details in zones.items():
                if "subzones" in details:
                    for subzone, sub_details in details["subzones"].items():
                        data_zones.append({
                            "continent": continent,
                            "subzone": subzone,
                            "tl_y": float(sub_details["tl_y"]),
                            "tl_x": float(sub_details["tl_x"]),
                            "br_y": float(sub_details["br_y"]),
                            "br_x": float(sub_details["br_x"])
                        })
                else:
                    data_zones.append({
                        "continent": continent,
                        "subzone": None,
                        "tl_y": float(details["tl_y"]),
                        "tl_x": float(details["tl_x"]),
                        "br_y": float(details["br_y"]),
                        "br_x": float(details["br_x"])
                    })
            self.df_zones = self.spark.createDataFrame(data_zones, schema=zone_schema)  # Use imported schema
            self.df_zones.show()
            logging.info('Step 4: Ingest Zones Finished')

            # Return all DataFrames
            return self.df_flights, self.df_airports, self.df_airlines, self.df_zones

        except Exception as e:
            logging.critical(f"Critical error during ingestion: {e}")
            return None, None, None, None


# Example usage
pipeline = FlightPipeline()
df_flights, df_airports, df_airlines, df_zones = pipeline.extract()



2024-12-14 14:32:27,168 - INFO - Step 1: Ingest Flights Finished
2024-12-14 14:32:27,427 - INFO - Step 2: Ingest Airports Finished
2024-12-14 14:32:27,644 - INFO - Step 3: Ingest Airlines Finished
2024-12-14 14:32:37,895 - INFO - Step 4: Ingest Zones Finished


+-------------+-----------+-----+-------+------+------+
|    continent|    subzone| tl_y|   tl_x|  br_y|  br_x|
+-------------+-----------+-----+-------+------+------+
|       europe|     poland|56.86|  11.06| 48.22| 28.26|
|       europe|    germany|57.92|   1.81| 45.81| 16.83|
|       europe|         uk|62.61| -13.07| 49.71|  3.46|
|       europe|      spain|44.36| -11.06| 35.76|  4.04|
|       europe|     france|51.07|  -5.18| 42.17|   8.9|
|       europe|       ceur|51.39|  11.25| 39.72| 32.55|
|       europe|scandinavia|72.12|  -0.73| 53.82| 40.67|
|       europe|      italy|47.67|   5.26| 36.27| 20.64|
| northamerica|       na_n|72.82|-177.97| 41.92|-52.48|
| northamerica|       na_c|54.66|-134.68| 22.16|-56.91|
| northamerica|       na_s|41.92|-177.83|  3.82|-52.48|
| southamerica|       NULL| 16.0|  -96.0| -57.0| -31.0|
|      oceania|       NULL|19.62|   88.4|-55.08| 180.0|
|         asia|      japan|60.38|  113.5| 22.58|176.47|
|       africa|       NULL| 39.0|  -29.0| -39.0|

In [4]:
spark = SparkSession.builder.appName("myapp").getOrCreate()
output_path = "./data/ingested_data"
os.makedirs(output_path, exist_ok=True)
df_zones.write.mode("overwrite").parquet(output_path)
print(f"DataFrame successfully saved to {output_path}")




DataFrame successfully saved to ./data/ingested_data


In [3]:
spark = SparkSession.builder.appName("myapp").getOrCreate()

print(spark.sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion())


3.3.4


In [1]:
import os
from pyspark.sql import SparkSession
from datetime import datetime

os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk-11'  # Update to your Java path
os.environ['SPARK_HOME'] = 'C:/spark'  # Update to your Spark installation path
os.environ['HADOOP_HOME'] = 'C:/hadoop'  # Required for Windows
os.environ["PYSPARK_PYTHON"] = "C:\\Users\\tefte\\anaconda3\\envs\\kata-env\\python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "C:\\Users\\tefte\\anaconda3\\envs\\kata-env\\python"
# Get the current datetime
now = datetime.now()
tech_year = now.strftime("%Y")
tech_month = now.strftime("%m")
tech_day = now.strftime("%d")
tech_hour = now.strftime("%H")


# Step 1: Initialize SparkSession
spark = SparkSession.builder \
    .appName("ReadResultsData") \
    .getOrCreate()

# Step 2: Base path to the results folder
base_results_path = "./data/results/airline_with_most_flights"

# Step 3: Dynamically detect the latest folder
latest_path = max(
    [os.path.join(base_results_path, d) for d in os.listdir(base_results_path)],
    key=os.path.getmtime
)

# Step 4: Load the Parquet data
print(latest_path)
df_results = spark.read.parquet(f"{base_results_path}/tech_year={tech_year}/tech_month={tech_month}/tech_day={tech_day}/tech_hour={tech_hour}")

# Step 5: Show the loaded data
df_results.show()


./data/results/airline_with_most_flights\tech_year=2024
+-----------+------------+------------+
|airlineCode|airline_name|flight_count|
+-----------+------------+------------+
|        UAE|    Emirates|          89|
+-----------+------------+------------+



In [3]:
import os
from pyspark.sql import SparkSession
from datetime import datetime

os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk-11'  # Update to your Java path
os.environ['SPARK_HOME'] = 'C:/spark'  # Update to your Spark installation path
os.environ['HADOOP_HOME'] = 'C:/hadoop'  # Required for Windows
os.environ["PYSPARK_PYTHON"] = "C:\\Users\\tefte\\anaconda3\\envs\\kata-env\\python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "C:\\Users\\tefte\\anaconda3\\envs\\kata-env\\python"
# Get the current datetime
now = datetime.now()
tech_year = now.strftime("%Y")
tech_month = now.strftime("%m")
tech_day = now.strftime("%d")
tech_hour = now.strftime("%H")


# Step 1: Initialize SparkSession
spark = SparkSession.builder \
    .appName("ReadResultsData") \
    .getOrCreate()

# Step 2: Base path to the results folder
base_results_path = "./data/results/airline_with_most_regional_flights"

# Step 3: Dynamically detect the latest folder
latest_path = max(
    [os.path.join(base_results_path, d) for d in os.listdir(base_results_path)],
    key=os.path.getmtime
)

# Step 4: Load the Parquet data
print(latest_path)
df_results = spark.read.parquet(f"{base_results_path}/tech_year={tech_year}/tech_month={tech_month}/tech_day={tech_day}/tech_hour={tech_hour}")

# Step 5: Show the loaded data
df_results.show()


./data/results/airline_with_most_regional_flights\tech_year=2024
+---------------+-----------+------------------+---------------------+
|originContinent|airlineCode|      airline_name|regional_flight_count|
+---------------+-----------+------------------+---------------------+
|         africa|        ETH|Ethiopian Airlines|                    7|
|           asia|        CHH|   Hainan Airlines|                    2|
|         europe|        TAP|  TAP Air Portugal|                    2|
|   northamerica|        AAL| American Airlines|                   20|
|        oceania|        QFA|            Qantas|                    8|
|   southamerica|        GLO| GOL Linhas Aereas|                    1|
+---------------+-----------+------------------+---------------------+



In [10]:
import os
from pyspark.sql import SparkSession
from datetime import datetime

os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk-11'  # Update to your Java path
os.environ['SPARK_HOME'] = 'C:/spark'  # Update to your Spark installation path
os.environ['HADOOP_HOME'] = 'C:/hadoop'  # Required for Windows
os.environ["PYSPARK_PYTHON"] = "C:\\Users\\tefte\\anaconda3\\envs\\kata-env\\python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "C:\\Users\\tefte\\anaconda3\\envs\\kata-env\\python"
# Get the current datetime
now = datetime.now()
tech_year = now.strftime("%Y")
tech_month = now.strftime("%m")
tech_day = now.strftime("%d")
tech_hour = now.strftime("%H")


# Step 1: Initialize SparkSession
spark = SparkSession.builder \
    .appName("ReadResultsData") \
    .getOrCreate()

# Step 2: Base path to the results folder
base_results_path = "./data/results/Aircraft_manufacturer_with_most_active_flights"

# Step 3: Dynamically detect the latest folder
latest_path = max(
    [os.path.join(base_results_path, d) for d in os.listdir(base_results_path)],
    key=os.path.getmtime
)

# Step 4: Load the Parquet data
print(latest_path)
df_results = spark.read.parquet(f"{base_results_path}/tech_year={tech_year}/tech_month={tech_month}/tech_day={tech_day}/tech_hour={tech_hour}")

# Step 5: Show the loaded data
df_results.show()



./data/results/Aircraft_manufacturer_with_most_active_flights\tech_year=2024
+--------+-------------------+
|aircraft|active_flight_count|
+--------+-------------------+
|    B77W|                202|
+--------+-------------------+



In [22]:
import os
import sys
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, count
from pyspark.sql.window import Window
from utils.load_latest_data import load_latest_data
from utils.save_kpi_data import save_kpi_data

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler()
    ]
)

# Initialize Spark session
spark = SparkSession.builder.appName("AircraftManufacturerWithMostActiveFlights").getOrCreate()

# Base paths for loading and saving data
base_extracted_path ="./data/ingested_data"
results_path = "./data/results"
df_flights = load_latest_data(spark, base_extracted_path, "flights")


2024-12-15 03:16:59,223 - INFO - Loading data for flights from ./data/ingested_data\flights\tech_year=2024\tech_month=12\tech_day=15\tech_hour=03


In [26]:
active_flights = df_flights.filter(col("flightStatus") == "0")
active_flights.show()

+--------+--------+-----------+------------------+----------------------+------------+
|flightID|aircraft|airlineCode|airportOrigineCode|airportDestinationCode|flightStatus|
+--------+--------+-----------+------------------+----------------------+------------+
|3857547f|    B77W|        QTR|               DOH|                   HKG|           0|
|38581e7d|    A359|        ETH|               PVG|                   ADD|           0|
|38583115|    A359|        DLH|               PVG|                   MUC|           0|
|38583161|    B789|        MSR|               PVG|                   CAI|           0|
|38583688|    A359|        SIA|               SIN|                   JFK|           0|
|3858372c|    B77W|        THY|               ICN|                   IST|           0|
|3858376c|    A359|        SJX|               TPE|                   SFO|           0|
|38583d05|    A359|        CCA|               PEK|                   MEL|           0|
|3858403d|    B789|        CSH|            

In [27]:
top_manufacturer = (
    active_flights.groupBy("aircraft")  # Group by manufacturer
    .agg(count("*").alias("active_flight_count"))  # Count active flights
    .withColumn(  # Add a rank column to identify the top manufacturer
        "rank",
        row_number().over(Window.orderBy(col("active_flight_count").desc()))
    )
    .filter(col("rank") == 1)  # Filter to keep only the top manufacturer
    .drop("rank")  # Drop the rank column
)

In [28]:
# Select necessary columns
top_manufacturer = top_manufacturer.select("aircraft", "active_flight_count")

In [30]:
top_manufacturer.show()

+--------+-------------------+
|aircraft|active_flight_count|
+--------+-------------------+
|    B77W|                188|
+--------+-------------------+



In [31]:
# Load data
df_flights = load_latest_data(spark, base_extracted_path, "flights")
df_airlines = load_latest_data(spark, base_extracted_path, "airlines")

2024-12-15 03:22:26,692 - INFO - Loading data for flights from ./data/ingested_data\flights\tech_year=2024\tech_month=12\tech_day=15\tech_hour=03
2024-12-15 03:22:26,774 - INFO - Loading data for airlines from ./data/ingested_data\airlines\tech_year=2024\tech_month=12\tech_day=15\tech_hour=03


In [35]:
df_flights.show()

+--------+--------+-----------+------------------+----------------------+------------+
|flightID|aircraft|airlineCode|airportOrigineCode|airportDestinationCode|flightStatus|
+--------+--------+-----------+------------------+----------------------+------------+
|3857547f|    B77W|        QTR|               DOH|                   HKG|           0|
|3857e59a|    B763|        FDX|               RNO|                   OAK|           1|
|3857e905|    B77L|        FDX|               LGG|                   OAK|           1|
|38581e7d|    A359|        ETH|               PVG|                   ADD|           0|
|38582e0d|    MD11|        FDX|               OAK|                   ANC|           1|
|38583038|    B763|        FDX|               MEM|                   OAK|           1|
|38583115|    A359|        DLH|               PVG|                   MUC|           0|
|38583161|    B789|        MSR|               PVG|                   CAI|           0|
|38583688|    A359|        SIA|            

In [32]:
df_flights = df_flights.join(
    df_airlines.select(
        col("ICAO").alias("airlineCode"),
        col("Country").alias("airlineCountry")
    ),
    on="airlineCode",
    how="left"
)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Country` cannot be resolved. Did you mean one of the following? [`Code`, `ICAO`, `Name`].;
'Project [ICAO#444 AS airlineCode#449, 'Country AS airlineCountry#450]
+- Relation [Code#443,ICAO#444,Name#445] parquet
