In [0]:
bronze_output= dbutils.jobs.taskValues.get(taskKey="Bronze",key="bronze_output")
silver_output= dbutils.jobs.taskValues.get(taskKey="Silver", key="silver_output")

start_date= bronze_output.get("start_date","")
silver_dbfs= bronze_output.get("silver_dbfs","")
gold_dbfs= bronze_output.get("gold_dbfs","")

print(f"start date : {start_date} and gold dbfs : {gold_dbfs}")

In [0]:
pip install reverse_geocoder

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *
from datetime import date, timedelta
import reverse_geocoder as rg

In [0]:
df= spark.read.parquet(f"{silver_dbfs}/earthquake_events_silver/").filter(col('time') > start_date)

In [0]:
df= df.limit(10)

In [0]:
# write a udf to get the contry code by using latitude and longitude 
def get_country_code(lat,lon):
    """
    Retrieve the counry code for a given latitude and longitude.
    
    parameters :
    lat ( float or str) : Latitude of the location
    lon ( float or str) : Longitude of the location
    
    returns : 
    str : Country code of the location , retrieved using the reverse geocode API

    Example : 
    >>> get_country_code(40.7128, -74.0060)
    'FR'

    """

    try : 
        coordinates = (float(lat), float(lon))
        result= rg.search(coordinates)[0].get('cc')
        print(f" Processed coordinates : {coordinates}-> { result}")
        return result
    except Exception as e : 
        print(f"Error while processing coordinates : {lat}, {lon} -> {str(e)}")
        return None


In [0]:
# Registering the udf o they can be used on spark dataframe
get_country_code_udf= udf(get_country_code, StringType())

In [0]:
get_country_code(48.8588443, 2.2943506)

In [0]:
# adding country code and city attributes
df_with_loation = df.\
                        withColumn("country_code",get_country_code_udf(col("latitude"),col("longitude")))

In [0]:
df_with_location_sig_class= df_with_loation.\
    withColumn("sig_class", when(col("sig")<100, "low")\
               .when((col("sig")>=100) & (col("sig")<500),"Moderate").\
                   otherwise("High")
                   )

In [0]:
gold_output_path= f"{gold_dbfs}/earthquake_events_gold/"

In [0]:
#save the transformed dataframe to the gold container
df_with_location_sig_class.write.mode("append").parquet(gold_output_path)