In [0]:
# Importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType, BooleanType, TimestampType, LongType
import requests
import json
from pyspark.sql.utils import AnalysisException

In [0]:
"""Initializes and returns a Spark session."""

def init_spark_session():
    try:
        return SparkSession.builder.appName('TfL Lines Data').getOrCreate()
    except Exception as err:
        raise RuntimeError(f"Error initializing Spark session: {err}")

In [0]:
"""
Defining schemas for nested json structures
"""
def define_schemas():
    try:
        validity_periods_schema = StructType([
            StructField("$type", StringType(), True),
            StructField("fromDate", StringType(), True),
            StructField("toDate", StringType(), True),
            StructField("isNow", BooleanType(), True)
        ])

        disruption_schema = StructType([
            StructField("$type", StringType(), True),
            StructField("category", StringType(), True),
            StructField("categoryDescription", StringType(), True),
            StructField("description", StringType(), True),
            StructField("affectedRoutes", ArrayType(StringType()), True),
            StructField("affectedStops", ArrayType(StringType()), True),
            StructField("closureText", StringType(), True)
        ])

        line_statuses_schema = StructType([
            StructField("$type", StringType(), True),
            StructField("id", LongType(), True),
            StructField("lineId", StringType(), True),
            StructField("statusSeverity", LongType(), True),
            StructField("statusSeverityDescription", StringType(), True),
            StructField("reason", StringType(), True),
            StructField("created", StringType(), True),
            StructField("validityPeriods", ArrayType(validity_periods_schema), True),
            StructField("disruption", disruption_schema, True)
        ])

        service_types_schema = StructType([
            StructField("$type", StringType(), True),
            StructField("name", StringType(), True),
            StructField("uri", StringType(), True)
        ])

        crowding_schema = StructType([
            StructField("$type", StringType(), True)
        ])

        line_schema = StructType([
            StructField("$type", StringType(), True),
            StructField("id", StringType(), True),
            StructField("name", StringType(), True),
            StructField("modeName", StringType(), True),
            StructField("disruptions", ArrayType(StringType()), True),
            StructField("created", StringType(), True),
            StructField("modified", StringType(), True),
            StructField("lineStatuses", ArrayType(line_statuses_schema), True),
            StructField("routeSections", ArrayType(StringType()), True),
            StructField("serviceTypes", ArrayType(service_types_schema), True),
            StructField("crowding", crowding_schema, True)
        ])

        return line_schema
    except ValueError as err:
        raise ValueError(f"Error defining schemas: {err}")
    except Exception as err:
        raise Exception(f"Exception defining schemas: {err}")

In [0]:
"""
1. Fetching the JSON data from the API.
2. Storing the data in a temporary file.
3. Moving the data from the temporary file location to a DFFS location.
"""

def fetch_and_store_data(url, temp_file_path, file_path):
    try:
        response = requests.get(url)
        
        if response.status_code != 200:
            raise ValueError(f"Request failed with status code {response.status_code}: {response.text}")

        json_data = response.json()

        with open(temp_file_path, "w") as file:
            json.dump(json_data, file)

        dbutils.fs.cp('file://' + temp_file_path, file_path)

        print(f"Data successfully fetched from {url} and stored at {file_path}")

    except requests.exceptions.HTTPError as http_err:
        raise ValueError(f"HTTP error occurred: {http_err}")
    except requests.exceptions.RequestException as req_err:
        raise ConnectionError(f"Request error: {req_err}")
    except json.JSONDecodeError as json_err:
        raise ValueError(f"Error decoding JSON: {json_err}")
    except IOError as io_err:
        raise IOError(f"File operation error: {io_err}")
    except Exception as err:
        raise Exception(f"An unexpected error occurred: {err}")

In [0]:
"""
 1. reading the incoming json data from API and converting it into a dataframe
 2. selecting necessary columns (name, lineStatuses) from the dataframe required for further logic
 2. adding a current_timestamp column to get the current timestamp
 3. exploding lineStatuses column, to fetch the inside key and values
"""

def read_and_transform_data(spark, file_path, line_schema):
    try:
        input_df = spark.read.schema(line_schema).json(file_path, multiLine=True)
        
        transformed_df = input_df \
            .select(col('name'), col('lineStatuses')) \
            .withColumn('current_timestamp', current_timestamp()) \
            .withColumn("exploded_lineStatuses", explode(col('lineStatuses'))) \
            .drop(col('lineStatuses'))
        
        print(f"Data at {file_path}, successfully read and transformed")

        return transformed_df
    except ValueError as err:
        raise ValueError(f"Error during data transformation: {err}")
    except Exception as err:
        raise Exception(f"Exception during data transformation: {err}")


In [0]:
"""
Creates and returns the final DataFrame with the desired columns and format.
"""

def create_final_dataframe(transformed_df):
    try:
        final_df = transformed_df.select(
            col('current_timestamp'),
            col('name').alias('line'),
            col("exploded_lineStatuses.statusSeverityDescription").alias("status"),
            col("exploded_lineStatuses.reason").alias("disruption_reason")
        )
        print(f"Final DataFrame created with desired columns and format")
        return final_df
    except ValueError as err:
        raise ValueError(f"Error creating final DataFrame: {err}")
    except Exception as err:
        raise Exception(f"Exception creating final DataFrame: {err}")

In [0]:
"""
Saving the final DataFrame in Delta format to an external location.
"""

def save_data_as_delta(final_df, delta_path):
    try:
        final_df.write.format("delta").mode("overwrite").save(delta_path)
        print(f"Data successfully saved in a delta format at a external location: {delta_path}")
    except IOError as err:
        raise IOError(f"Error saving data as Delta format: {err}")
    except Exception as err:
        raise Exception(f"Exception saving data as Delta format: {err}")

In [0]:
"""
Creating a Delta table using Spark SQL and fetching data from the external location defined above
"""

def create_delta_table(spark, delta_path):
    try:
        spark.sql(f"""
        CREATE TABLE IF NOT EXISTS line_statuses_table (
            current_timestamp TIMESTAMP,
            line STRING,
            status STRING,
            disruption_reason STRING
        )
        USING DELTA
        LOCATION '{delta_path}'
        """)
        print(f"Delta Table successfully created using an external location: {delta_path}")
    except Exception as err:
        raise RuntimeError(f"Error creating Delta table: {err}")

In [0]:
# Main execution
def main():
    spark = init_spark_session()
    line_schema = define_schemas()
    
    url = 'https://api.tfl.gov.uk/Line/Mode/tube/Status'
    temp_file_path = '/tmp/input_data.json'
    file_path = '/FileStore/tables/input_data.json'
    delta_path = "/FileStore/external/"
    
    fetch_and_store_data(url, temp_file_path, file_path)
    
    transformed_df = read_and_transform_data(spark, file_path, line_schema)
    
    final_df = create_final_dataframe(transformed_df)
    final_df.display()  # Displaying the DataFrame for verification
    
    save_data_as_delta(final_df, delta_path)
    
    create_delta_table(spark, delta_path)

In [0]:
# Run the main function
if __name__ == "__main__":
    main()

Data successfully fetched from https://api.tfl.gov.uk/Line/Mode/tube/Status and stored at /FileStore/tables/input_data.json
Data at /FileStore/tables/input_data.json, successfully read and transformed
Final DataFrame created with desired columns and format


current_timestamp,line,status,disruption_reason
2024-07-03T15:16:44.807+0000,Bakerloo,Good Service,
2024-07-03T15:16:44.807+0000,Central,Good Service,
2024-07-03T15:16:44.807+0000,Circle,Good Service,
2024-07-03T15:16:44.807+0000,District,Part Suspended,District Line: No service between Turnham Green and Richmond while Network Rail fix a track fault at Gunnersbury. Tickets will be accepted on London Buses and South West Rail services via any reasonable route. GOOD SERVICE on the rest of the line.
2024-07-03T15:16:44.807+0000,Hammersmith & City,Good Service,
2024-07-03T15:16:44.807+0000,Jubilee,Good Service,
2024-07-03T15:16:44.807+0000,Metropolitan,Good Service,
2024-07-03T15:16:44.807+0000,Northern,Good Service,
2024-07-03T15:16:44.807+0000,Piccadilly,Good Service,
2024-07-03T15:16:44.807+0000,Victoria,Good Service,


Data successfully saved in a delta format at a external location: /FileStore/external/
Delta Table successfully created using an external location: /FileStore/external/
