In [0]:
# Retreive task values from previous tasks (bronze and silver)

bronze_output = dbutils.jobs.taskValues.get(taskKey = "Bronze", key = "bronze_output")
silver_data = dbutils.jobs.taskValues.get(taskKey = "Silver", key = "silver_output")

start_date = bronze_output.get("start_date", "")
silver_adls = bronze_output.get("silver_adls", "")
gold_adls = bronze_output.get("gold_adls", "")


In [0]:
from pyspark.sql.functions import col, when, udf
from pyspark.sql.types import StringType
import reverse_geocoder as rg # Make sure that it is installed on the used cluster 
from datetime import date, timedelta

In [0]:
df = spark.read.parquet(silver_data).filter(col('time') > start_date)

In [0]:

def get_country_code(lat, long)-> str:
    """
    Retrieve the country code for a given latitude and longitude.

    Parameters:
    lat (float or str): Latitude of the location.
    lon (float or str): Longitude of the location.

    Returns:
    str: Country code of the location, retrieved using the reverse geocoding API.
    """
    try:
        coordinates = (float(lat), float(long))
        result = rg.search(coordinates)[0].get('cc')
        print(f"Processed coordinates: {coordinates} -> {result}")
        return result
    except Exception as e:
        print(f"Error processing coordinates: {lat}, {long} -> {str(e)}")
        return None


In [0]:
# Passing the function to be used with spark dataframes using udf(user defined functions)

get_country_code_udf = udf(get_country_code, StringType())


In [0]:
# Adding country code and city attributes

df_with_location = df.withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

In [0]:
# Adding signifia=cance class
df_with_location = df_with_location.withColumn("sig", 
                                               when(col('sig')<100, 'Low').
                                               when((col('sig')>100) & (col('sig')<500), "Moderate").
                                               otherwise("High")
                                               )

In [0]:
# Save the transformed df to the gold container
gold_output_path = f"{gold_adls}earthquake_events_gold/"

In [0]:
# Append df to gold container in parquet format
df_with_location.write.mode("append").parquet(gold_output_path)