In [1]:
# Importing Spark libs.
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
import findspark
findspark.init()

from pyspark.sql.functions import min, max, avg, count, expr, array_contains, col, explode_outer, explode, struct, map_keys, arrays_zip
from pyspark.sql.types import ArrayType, StructType, MapType, StringType, DoubleType, TimestampType, IntegerType, FloatType, LongType, DataType

# Importing Python Data Stat. and vis. libs.
import pandas as pd

import matplotlib.pyplot as plt


# For error handling and utilities 
import json
import requests
import urllib.request
from io import StringIO
from urllib.parse import urlparse
import sys
import os
import tempfile
import logging

In [2]:
# Spark builder Warnings checked
spark = SparkSession.builder.master("local[*]").appName("AirportFlightDataViz").getOrCreate()

24/10/17 23:32:55 WARN Utils: Your hostname, codespaces-358acb resolves to a loopback address: 127.0.0.1; using 10.0.11.149 instead (on interface eth0)
24/10/17 23:32:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/17 23:32:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
url_oag = "https://raw.githubusercontent.com/JM-AE/Airport-Flight-Data/refs/heads/main/oag.json"
url_oag_multi = "https://raw.githubusercontent.com/JM-AE/Airport-Flight-Data/refs/heads/main/oag_multiple.json"

In [4]:
def load_data(url):
    
    
    # Step 1: Extract the file name from the URL
    file_name = os.path.basename(urlparse(url).path)
    
    # Step 2: Download the data
    data = stream_download(url)
    if data is None:
        return None  

    # Step 3: Analyze the structure (optional)
    analyze_json_structure(data)
    
    # Step 4: Try loading into Spark without saving to disk
    try:
        df = load_spark_dataframe_from_memory(data)
        print("#4# DataFrame loaded successfully from memory #4#")
    except Exception as e:
        print(f"#4# Error loading DataFrame from memory: {str(e)} #4#")
        print("#4# Attempting to load DataFrame using file-based approach #4#")

        # Step 4.1: Fallback: Save to a temporary file and read into Spark
        tmp_file_path = save_to_temp_file(data, file_name)
        if tmp_file_path is None:
            return None

        # Step 4.2: Validate and load the DataFrame from the file
        df = load_spark_dataframe(tmp_file_path)
        if df is None:
            return None  
    
    # Step 5: Show the DataFrame
    print("#5# DataFrame 1st line #5#")
    df.show(1)
    return df

def stream_download(url):
    """Download data from the URL and return it as a string (JSON)."""
    try:
        response = requests.get(url, stream=True)
        if response.status_code == 200:
            print(f"#1# Data from {url} successfully loaded #1#")
            return response.text  # Return the raw JSON data as a string
        else:
            print(f"#1# Failed to load {url}. Status code: {response.status_code} #1#")
            return None
    except requests.RequestException as e:
        print(f"#1# Error while downloading data from {url}: {str(e)} #1#")
        return None

def analyze_json_structure(data):
    """Analyze the structure of the JSON data and print a sample."""
    try:
        json_data = json.loads(data)
        if isinstance(json_data, dict):
            keys = list(json_data.keys())
            print(f"#2# Top-level keys found: {keys} #2#")
        elif isinstance(json_data, list):
            print(f"#2# Sample data from the list: {json.dumps(json_data[:1], indent=4)} #2#")
        else:
            print("#2# No valid data structure found. #2#")
    except json.JSONDecodeError:
        print("#2# Failed to parse JSON structure. Skipping analysis. #2#")

def save_to_temp_file(data, file_name):
    """Save the JSON string to a temporary file and return the file path."""
    try:
        tmp_file_path = os.path.join(tempfile.gettempdir(), file_name)
        with open(tmp_file_path, "w") as f:
            f.write(data)
        print(f"#3# Temporary file {file_name} saved #3#")
        return tmp_file_path
    except Exception as e:
        print(f"#3# Error saving temporary file {file_name}: {str(e)} #3#")
        return None

def load_spark_dataframe_from_memory(data):
    """Load the JSON data into a Spark DataFrame from an in-memory string."""
    try:
        json_rdd = spark.sparkContext.parallelize([data])
        df = spark.read.json(json_rdd)
        return df
    except Exception as e:
        print(f"#4# Error loading DataFrame from memory: {str(e)} #4#")
        raise

def load_spark_dataframe(file_path):
    """Load the JSON data from a file into a Spark DataFrame with error handling."""
    try:
        df = spark.read.json(file_path)
        print("#4# DataFrame loaded successfully from file #4#")
        return df
    except Exception as e:
        print(f"#4# Error loading DataFrame from file: {str(e)} #4#")
        print("#4# Attempting to load DataFrame using multiline option #4#")
        
        try:
            df = spark.read.option("multiline", "true").json(file_path)
            print("#4# DataFrame loaded successfully in multiline option #4#")
            return df
        except Exception as e:
            print(f"#4# Error loading DataFrame in multiline option: {str(e)} #4#")
            return None

In [5]:
df_oag = load_data(url_oag)
df_oag_d = df_oag.drop("paging") 

#1# Data from https://raw.githubusercontent.com/JM-AE/Airport-Flight-Data/refs/heads/main/oag.json successfully loaded #1#
#2# Top-level keys found: ['data', 'paging'] #2#


                                                                                

#4# DataFrame loaded successfully from memory #4#
#5# DataFrame 1st line #5#


                                                                                

+--------------------+--------------------+
|                data|              paging|
+--------------------+--------------------+
|[{{773, NULL}, {{...|{10, https://api....|
+--------------------+--------------------+



24/10/17 23:33:14 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [10]:
def get_nested_info(schema, prefix='', depth=0, DataTyp=''):
    """Recursively get nested structure information."""
    nested_info = {}

    # Check if the schema is a StructType
    if isinstance(schema, StructType):
        for field in schema.fields:
            field_name = f"{prefix}{field.name}"
            # Handle StructType
            if isinstance(field.dataType, StructType):
                nested_info[field_name] = depth
                nested_info.update(get_nested_info(field.dataType, f"{field_name}.", depth + 1))
            # Handle ArrayType
            elif isinstance(field.dataType, ArrayType):
                nested_info[field_name] = depth
                nested_info.update(get_nested_info(field.dataType.elementType, f"{field_name}.", depth + 1))
            # Handle simple types
            elif isinstance(field.dataType, (StringType, LongType)):
                nested_info[field_name] = depth
                nested_info.update(get_nested_info(field.dataType, f"{field_name}.", depth + 1))

    # Check if the schema is an ArrayType
    elif isinstance(schema, ArrayType):
        nested_info.update(get_nested_info(schema.elementType, prefix, depth))

    return nested_info


print(get_nested_info(df_oag_d))

In [23]:
def get_nested_info(schema, prefix='', depth=0):
    """Recursively get nested structure information."""
    nested_info = {}

    # Check if the schema is a StructType
    if isinstance(schema, StructType):
        for field in schema.fields:
            field_name = f"{prefix}{field.name}"
            nested_info[field_name] = depth
            # Handle StructType
            if isinstance(field.dataType, StructType):
                nested_info.update(get_nested_info(field.dataType, f"{field_name}.", depth + 1))
            # Handle ArrayType
            elif isinstance(field.dataType, ArrayType):
                nested_info.update(get_nested_info(field.dataType.elementType, f"{field_name}.", depth + 1))
    
    # Check if the schema is an ArrayType
    elif isinstance(schema, ArrayType):
        nested_info.update(get_nested_info(schema.elementType, prefix, depth))

    return nested_info

# Example usage
nested_info = get_nested_info(df_oag_d.schema)
print(nested_info)

{'data': 0, 'data.aircraftType': 1, 'data.aircraftType.iata': 2, 'data.aircraftType.icao': 2, 'data.arrival': 1, 'data.arrival.airport': 2, 'data.arrival.airport.faa': 3, 'data.arrival.airport.iata': 3, 'data.arrival.airport.icao': 3, 'data.arrival.date': 2, 'data.arrival.date.local': 3, 'data.arrival.date.utc': 3, 'data.arrival.terminal': 2, 'data.arrival.time': 2, 'data.arrival.time.local': 3, 'data.arrival.time.utc': 3, 'data.carrier': 1, 'data.carrier.iata': 2, 'data.carrier.icao': 2, 'data.codeshare': 1, 'data.codeshare.aircraftOwner': 2, 'data.codeshare.aircraftOwner.code': 3, 'data.codeshare.aircraftOwner.name': 3, 'data.codeshare.cockpitCrewEmployer': 2, 'data.codeshare.cockpitCrewEmployer.code': 3, 'data.codeshare.cockpitCrewEmployer.name': 3, 'data.codeshare.cockpitCrewEmployer.number': 3, 'data.codeshare.jointOperationAirlineDesignators': 2, 'data.codeshare.marketingFlights': 2, 'data.codeshare.marketingFlights.code': 3, 'data.codeshare.marketingFlights.serviceNumber': 3, 'd

In [15]:
# Example of selecting nested fields
try:
    # Explode the 'data' array
    df_exploded = df_oag_d.select(explode("data").alias("data_exploded"))

    # Now select the nested fields from the exploded DataFrame
    df_exploded.select("data_exploded.statusDetails.departure.actualTime.outGateTimeliness").show()
except Exception as e:
    print(f"Error: {e}")



+-----------------+
|outGateTimeliness|
+-----------------+
|        [Delayed]|
|        [Delayed]|
|           [NULL]|
|          [Early]|
|        [Delayed]|
|        [Delayed]|
|           [NULL]|
|           [NULL]|
|          [Early]|
|        [Delayed]|
+-----------------+



In [42]:
def flatten_nested_json(df, column_dict):
    """Flattens nested JSON DataFrame based on provided depth information."""

    # Step 1: Explode the main array if it exists
    if "data" in df.columns:
        df = df.withColumn("data_exploded", F.explode("data"))

    # Step 2: Select and create new columns based on the provided dictionary
    for col_name in column_dict.keys():
        new_col_name = col_name.replace("data.", "data_exploded.")
        # Use 'try-except' to handle any missing columns gracefully
        try:
            df = df.withColumn(col_name, F.col(new_col_name))
        except Exception as e:
            print(f"Error accessing column: {new_col_name} - {e}")

    # Step 3: Clean up by dropping the original 'data' column
    return df.drop("data").withColumnRenamed("data_exploded", "data")

In [45]:
# Assuming `df_oag_d` is your original DataFrame
flattened_df = flatten_nested_json(df_oag_d, get_nested_info(df_oag_d))



In [46]:
# Show the result
flattened_df.select("data.statusDetails.departure.actualTime.outGateTimeliness").show()

+-----------------+
|outGateTimeliness|
+-----------------+
|        [Delayed]|
|        [Delayed]|
|           [NULL]|
|          [Early]|
|        [Delayed]|
|        [Delayed]|
|           [NULL]|
|           [NULL]|
|          [Early]|
|        [Delayed]|
+-----------------+



In [48]:
flattened_df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- aircraftType: struct (nullable = true)
 |    |    |-- iata: string (nullable = true)
 |    |    |-- icao: string (nullable = true)
 |    |-- arrival: struct (nullable = true)
 |    |    |-- airport: struct (nullable = true)
 |    |    |    |-- faa: string (nullable = true)
 |    |    |    |-- iata: string (nullable = true)
 |    |    |    |-- icao: string (nullable = true)
 |    |    |-- date: struct (nullable = true)
 |    |    |    |-- local: string (nullable = true)
 |    |    |    |-- utc: string (nullable = true)
 |    |    |-- terminal: string (nullable = true)
 |    |    |-- time: struct (nullable = true)
 |    |    |    |-- local: string (nullable = true)
 |    |    |    |-- utc: string (nullable = true)
 |    |-- carrier: struct (nullable = true)
 |    |    |-- iata: string (nullable = true)
 |    |    |-- icao: string (nullable = true)
 |    |-- codeshare: struct (nullable = true)
 |    |    |-- aircraftOwner: struct (nullabl