In [0]:
# 1. Reach out to the "Bulletin Board" (Task Values) from the previous task (bronze & silver).
# We tell it: "Go find the task named 'bronze' and give me the value labeled 'bronze_output'."
bronze_output = dbutils.jobs.taskValues.get(taskKey="Bronze", key="bronze_output") #TASKEY IS THE NAME OF THE TASK IN THE JOB RUN EDITOR
silver_data = dbutils.jobs.taskValues.get(taskKey= "Silver", key= "silver_output")  

# 2. Extract the variables from that "package".

# The .get("key", "") is a safety feature: if the key is missing, it returns an empty string instead of crashing.
start_date = bronze_output.get("start_date", "")
end_date = bronze_output.get("end_date", "")
gold_adls = bronze_output.get("gold_adls", "")

# 3. Print the results to verify we have the right "keys" to the storage account.

print(f"Start Date: {start_date}, Gold ADLS: {gold_adls}")

# 1. Define the stages of the Medallion Architecture data pipeline
tiers = ["bronze", "silver", "gold"]

# 2. Automatically generate the storage URLs for each tier.
# This creates a dictionary where each tier is a key, and the Azure storage path is the value.
# The 'f' before the string allows us to plug the {tier} variable directly into the URL.
adls_paths = {tier: f"abfss://{tier}@earthquakestorageaccount.dfs.core.windows.net/" for tier in tiers}


# 3. Pull the specific URLs out of our dictionary and save them to easy-to-use variables.
# This is like taking a long address from a directory and writing it on a sticky note.
bronze_adls = adls_paths["bronze"]
silver_adls = adls_paths["silver"]
gold_adls = adls_paths["gold"] 

silver_data = f"{silver_adls}earthquake_events_silver/"


# 4. Use Databricks Utilities (dbutils) to list the files in each folder.
# This confirms that the connection to Azure is working and shows you what data is available.
dbutils.fs.ls(bronze_adls)
dbutils.fs.ls(silver_adls)
dbutils.fs.ls(gold_adls)

from datetime import date, timedelta

start_date = date.today() - timedelta(1)
end_date = date.today()
start_date, end_date

In [0]:
# 1. Import specialized Spark functions.
# 'when' is like an IF/ELSE statement for columns.
# 'col' allows you to reference specific columns by name.
# 'udf' (User Defined Function) lets you create your own custom Python logic for Spark.
from pyspark.sql.functions import when, col, udf

# 2. Tell Spark what kind of data your custom function will return.
# In this case, we are returning text (the name of a city or country).
from pyspark.sql.types import StringType

# 3. Import 'reverse_geocoder'. 
# This is a powerful library that takes Latitude/Longitude and tells you where that is on a map.
# NOTE: This is a standard Python library, not a Spark one, which is why we need a UDF later.
import reverse_geocoder as rg

# 4. Standard tools for date math.
# We use these to find the specific files we want to process (e.g., "yesterday's data").
from datetime import date, timedelta

In [0]:
df = spark.read.parquet(silver_data).filter(col('time') > start_date)   
df = df.limit(100) # added to speed up processings as during testing it was proving a bottleneck
# The problem is caused by the Python UDF (reverse_geocoder) being a bottleneck due to its non-parallel nature and high computational cost per task

In [0]:
def get_country_code(lat, lon):
    """
    Retrieve the country code for a given latitude and longitude.

    Parameters:
    lat (float or str): Latitude of the location.
    lon (float or str): Longitude of the location.

    Returns:
    str: Country code of the location, retrieved using the reverse geocoding API.

    Example:
    >>> get_country_details(48.8588443, 2.2943506)
    'FR'
    """
    try:
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0].get('cc')
        print(f"Processed coordinates: {coordinates} -> {result}")
        return result
    except Exception as e:
        print(f"Error processing coordinates: {lat}, {lon} -> {str(e)}")
        return None

In [0]:
# Register the get_country_code function as a Spark UDF that returns a string.
get_country_code_udf = udf(get_country_code, StringType())

# Add a new column "country_code" to the DataFrame by applying the UDF to the latitude and longitude columns.
df_with_location = df.withColumn(
    "country_code", get_country_code_udf(col("latitude"), col("longitude"))
)

In [0]:
# adding significance classification
df_with_location_sig_class = \
                            df_with_location.\
                                withColumn('sig_class', 
                                            when(col("sig") < 100, "Low").\
                                            when((col("sig") >= 100) & (col("sig") < 500), "Moderate").\
                                            otherwise("High")
                                            )
df_with_location_sig_class.head()                             

In [0]:
# Save the transformed DataFrame to the Silver container
gold_output_path = f"{gold_adls}earthquake_events_gold/"

# Append DataFrame to Silver container in Parquet format
df_with_location_sig_class.write.mode('append').parquet(gold_output_path)
     
