In [0]:
# Retrieve the task value from the previous tasks (bronze & silver)
bronze_output = dbutils.jobs.taskValues.get(taskKey="Bronze", key="bronze_output")
silver_output = dbutils.jobs.taskValues.get(taskKey="Silver", key="silver_output")

# Access individual variables
start_date = bronze_output.get("start_date","")
#bronze_adls = bronze_output.get("bronze_output_path","")
silver_adls = bronze_output.get("silver_adls","")
gold_adls = bronze_output.get("gold_adls","")

print(f"Start Date: {start_date}, Gold ADLS:{gold_adls}")

In [0]:
from pyspark.sql.functions import when, col, udf
from pyspark.sql.types import StringType
#Ensure the below library is installed on your cluster
import reverse_geocoder as rg
from datetime import date, timedelta

In [0]:
#We are reading the data for just that date.
df = spark.read.parquet(silver_output).filter(col('time') > start_date)

In [0]:
df = df.limit(500) 
# added this tep to speed up processing as we read the data
#The problem is caused by the Python UDF (reverse_geocoder) being a bottleneck due to its non-paralle nature and high computational cost per task.

In [0]:
#Creating the UDF that performs the reverse geocoding
def get_county_code(lat, lon):
    '''
    Retrieve the country code for a given latitude and logitude.
    
    Parameters:
    lat (float or str): Latitude of the Location.
    lon (float or str): Longitude of the Location.

    Returns:
    str: Country code of the location, retriveded using the reverse geocoder API

    Example:
    >>> get_county_code(48.8588443, 2.2943506)
    'FR'
    '''
    try:
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0].get('cc')
        print(f"Processed Coordinates: {coordinates} -> {result}")
        return result
    except Exception as e:
        print(f"Error processing coordinates: {lat}, {lon} -> {str(e)}")
        return None

In [0]:
#Register the above udf so that they can be used on the spark dataframes
get_country_code_udf = udf(get_county_code, StringType())

In [0]:
get_county_code(48.8588443, 2.2943506)

In [0]:
# Adding country code and city attributes
df_with_location = \
                df.\
                    withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

In [0]:
# Adding signifncance classification
df_with_location_sig_class = \
                df_with_location.\
                    withColumn('sig_class',
                               when(col("sig")< 100, "Low").\
                                when((col("sig")>=100) & (col("sig") < 500), "Moderate").\
                                otherwise("High")
                                )

In [0]:
df_with_location_sig_class.display()

In [0]:
#Save the transformed DataFrame to the Gold Container
gold_output_path = f"{gold_adls}earthquakes_events_gold/"

In [0]:
#Append DataFrame to gold container in Parquet format
df_with_location_sig_class.write.mode("append").parquet(gold_output_path)
