In [0]:
# Retrive the task value from the previous task (bronze and  silver)

bronze_output = dbutils.jobs.taskValues.get(taskKey = "Bronze", key = "bronze_output")


silver_data = dbutils.jobs.taskValues.get(taskKey = "Silver", key = "silver_output")


# Access individual variables
start_date = bronze_output.get("start_date"," ")
gold_adls = bronze_output.get("gold_adls"," ")
silver_adls = bronze_output.get("silver_adls"," ")


print( f"Start Date : {start_date}, GOLD ADLS : {gold_adls}")


In [0]:
from pyspark.sql.functions import when, col, udf
from pyspark.sql.types import StringType
# Ensure the below library is installed in the cluster
import reverse_geocoder as rg
from datetime import timedelta

In [0]:
df = spark.read.parquet(silver_data).filter(col('time') > start_date)

In [0]:
df = df.limit(10)

In [0]:

def get_country_code(lat,lon):
    """
    Retrive the country code for given latitude and longitude

    Parameters:
    lat( float or str): Latitude for the location
    lon( float or str): Longitude for the location

    Returns:
    str: Country code of the location , retrived using the reverse geocoding API

    Example
    >>>
    get_country_code(40.7128, -74.0060)
    'US'
    
    """

    try: 
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0].get('cc')

        print(f'Processed coordinates {coordinates} -> {result}')
        return result
    except Exception as e:

        print(f'Error processing coordinates {coordinates} -> {str(e)}')
        return None

In [0]:
# registring the udfs so thay can be used on spark dataframes
get_country_code_udf = udf(get_country_code, StringType())

In [0]:
get_country_code(48.8566, 2.3522)


In [0]:
df_with_location = \
    df.\
        withColumn('country_code', get_country_code_udf(col('latitude'), col('longitude')))


In [0]:
# adding significance classification

df_with_location_sig_class = \
                     df_with_location.\
                         withColumn('sig_class',
                when(col("sig")< 100, "Low").\
                when((col("sig") >= 100) & (col("sig") < 500), "Medium").\
                                        otherwise ("High"))

In [0]:
df_with_location_sig_class.head()

In [0]:
# Save the transformed Dataframe to the gold Container

gold_output_path = f"{gold_adls}earthquake_events_gold/"

In [0]:
# Append DataFrame to Silver container in Parquet format

df_with_location_sig_class.write.mode("append").parquet(gold_output_path)