In [0]:

#Mount ADLS Gen2
#Required each time the cluster is restarted which should be only on the first notebook as they run in order

tiers = ["bronze", "silver", "gold"]
adls_paths = {tier: f"abfss://{tier}@rgdatabricksaccount.dfs.core.windows.net/" for tier in tiers}

# Accessing paths
bronze_adls = adls_paths["bronze"]
silver_adls = adls_paths["silver"]
gold_adls = adls_paths["gold"]

silver_data = f"{silver_adls}earthquake_events_silver/"

dbutils.fs.ls(bronze_adls)
dbutils.fs.ls(silver_adls)
dbutils.fs.ls(gold_adls)

[]

In [0]:
from datetime import date, timedelta
start_date = date.today() - timedelta(1)
end_date = date.today()

In [0]:
# Retrieve the task value from the previous task (bronze & silver)
bronze_output = dbutils.jobs.taskValues.get(taskKey="Bronze", key="bronze_output", debugValue={})
silver_output = dbutils.jobs.taskValues.get(taskKey="silver", key="silver_output", debugValue={})

# Access individual variables
start_date = bronze_output.get("start_date", "")
silver_adls = bronze_output.get("silver_adls", "")
gold_adls = bronze_output.get("gold_adls", "")

print(f"Start Date: {start_date}, Gold ADLS: {gold_adls}")

Start Date: , Gold ADLS: 


In [0]:
from pyspark.sql.functions import  when, col, udf
from pyspark.sql.types import StringType

# Ensure the below library is installed on your cluster
import reverse_geocoder as rg
from datetime import date, timedelta

In [0]:

df = spark.read.parquet(silver_data).filter(col("time") > start_date)




In [0]:
# Limit the DataFrame to speed up processing as during testing it was proving a bottleneck
df = df.limit(10)

In [0]:
def get_country_code(lat, lon):
    """
    Retrieve the country code for a given latitude and longitude.

    Parameters:
    lat (float): Latitude of the location.
    lon (float): Longitude of the location.

    Returns:
    str: Country code of the location, retrieved using the reverse geocoding API.
    """
    try:
        # Processed coordinates
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates) [0].get('cc')
        print("Processed coordinates: {coordinates} -> {result}")
        return result
    except Exception as e:
        print(f"Error processing coordinates: (lat: {lat}, lon: {lon}) -> {str(e)}")
        return None


In [0]:
get_country_code_udf = udf(get_country_code, StringType())

In [0]:
get_country_code(48.858443, 2.294536)

Processed coordinates: {coordinates} -> {result}


'FR'

In [0]:
# Adding country_code and city attributes
df_with_location = \
     df.\
          withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

In [0]:
# Adding significance classification
df_with_location_sig_class =\
    df_with_location.\
    withColumn("sig_class", 
        when(col("sig") < 100, "Low").\
        when((col("sig") >= 100) & (col("sig") < 500), "Moderate").\
        otherwise("High")
    )

In [0]:
df_with_location_sig_class.head()

In [0]:
# Define the output path for the Silver container
#gold_output_path = f"{gold_adls}/earthquake_events_gold/"
gold_output_path = "abfss://gold@rgdatabricksaccount.dfs.core.windows.net/earthquake_events_gold/"

In [0]:

# Append DataFrame to Silver container in Parquet format
df_with_location_sig_class.write.mode("append").parquet(gold_output_path)