In [0]:
bronze_output = dbutils.jobs.taskValues.get(taskKey="bronze", key="bronze_output")
silver_output = dbutils.jobs.taskValues.get(taskKey="silver", key="silver_output")

start_date = bronze_output.get("start_date","")
silver_adls = bronze_output.get("silver_adls","")
gold_adls = bronze_output.get("gold_adls","")

print(f"Start date: {start_date}, Gold adls: {gold_adls}")


In [0]:
from pyspark.sql.functions import col, when, udf
from pyspark.sql.types import StringType

import reverse_geocoder as rg
from datetime import date, timedelta

In [0]:
df = spark.read.parquet(silver_output).filter(col('time') > start_date)

In [0]:
df = df.limit(100)

In [0]:
def get_coutry_code(lat, lon):
    try:
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0].get('cc')
        print(f"Processed coordinates: {coordinates}, Country code: {result}")
        return result
    except Exception as e:
        print(f"Error processing coordinates: {lat}, {lon} -> {str(e)}")
        return None    
    

In [0]:
get_country_code_udf = udf(get_coutry_code, StringType())

In [0]:
df_with_location = \
                df.\
                    withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

In [0]:
#adding significance
df_with_location_sig_class = \
                            df_with_location.\
                                withColumn('sig_class',
                                           when(col("significance") < 100, "low").\
                                               when((col("significance") >= 100) & (col("significance") < 500), "moderate").\
                                                   otherwise("high")
                                                   )

In [0]:
gold_output_path = f"{gold_adls}earthquake_events_gold/"

In [0]:
df_with_location_sig_class.write.mode('append').parquet(gold_output_path)