## Import Libraries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
import requests
import json
from datetime import datetime

## Initial Variable Declaration

In [0]:
#Path
bronze_dir = "abfss://bronze@tflopendatalogs.dfs.core.windows.net/Crowding_JSON_Logs"

## Extract Napton Codes

In [0]:
# Location of the Volume in Unity Catalog
location = '/Volumes/tfl_crowding_analysis_we/default/naptancodes/naptan.csv'

# Reading Napton CSV
naptan = spark.read.csv(location, header=True, inferSchema=True)

# Extracting NaptonID column into list
napton_codes = [row['naptanID'] for row in naptan.select('naptanID').collect()]

## Extarct Crowding Data

In [0]:
def fetch_and_store_crowding_data(napton_codes, bronze_dir):
    """
    Fetch live crowding data from TFL API for a list of napton codes
    and write the relevant fields to Delta Bronze table partitioned by naptonId.
    
    Args:
        napton_codes (list): List of naptan codes to fetch data for.
        bronze_dir (str): Path to Bronze Delta table in Databricks.
    """
    for code in napton_codes:

        # Debug: Fetching REST API
        print(f"Fetching from REST API for: {code}")

        url = f"https://api.tfl.gov.uk/crowding/{code}/Live"

        try:
            # Send GET request to TFL API
            response = requests.get(url)
            
            # Raise exception if HTTP request returned an error
            response.raise_for_status()
            
            # Parse JSON response into Python dict
            data = response.json()

            # Filter JSON request for Non-Data Request
            if not data.get('dataAvailable'):
                continue
            
            # Create Spark DataFrame from JSON response
            df = spark.createDataFrame([data])
            
            # Select only relevant columns for analytics
            filter_df = df.select("percentageOfBaseline", "timeLocal")
            
            # Add naptonId column and convert timeLocal to timestamp
            final_df = (
                filter_df
                .withColumn("naptonId", lit(code).cast('string'))
                .withColumn("percentageOfBaseline", col('percentageOfBaseline').cast('double'))
                .withColumn("timeLocal", col("timeLocal").cast("timestamp"))
            )
            
            # Debug: Print status of writing data
            print(f"Writing Bronze data for naptonId: {code}")
            
            # Write data to Bronze Delta table, partitioned by naptonId
            final_df.write.partitionBy("naptonId").format("delta").mode("append").save(bronze_dir)

            # Write data Bronze schema in UC
            final_df.write.mode("append").saveAsTable("tfl_crowding_analysis_we.bronze.raw_crowding_logs")

            # Debug: Print status of writing completion
            print(f"Writing Bronze data complted for naptonId: {code}")
        
        except requests.exceptions.HTTPError as errh:
            print(f"HTTP Error for {code}: {errh}")
        except requests.exceptions.ConnectionError as errc:
            print(f"Connection Error for {code}: {errc}")
        except requests.exceptions.Timeout as errt:
            print(f"Timeout Error for {code}: {errt}")
        except requests.exceptions.RequestException as err:
            print(f"Request Exception for {code}: {err}")

In [0]:
# Call the function with napton codes and Bronze path
fetch_and_store_crowding_data(napton_codes, bronze_dir)