In [0]:
pip install reverse_geocoder

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import json

# Get base parameters
dbutils.widgets.text("bronze_params", "")
dbutils.widgets.text("silver_params", "")

bronze_params = dbutils.widgets.get("bronze_params")
silver_params = dbutils.widgets.get("silver_params")

# Debug: Print the raw input values for troubleshooting
print(f"Raw bronze_params: {bronze_params}")
print(f"Raw silver_params: {silver_params}")

# Parse the JSON string
bronze_data = json.loads(bronze_params)

# Access individual variables
start_date = bronze_data.get("start_date", "")
end_date = bronze_data.get("end_date", "")
silver_adls = bronze_data.get("silver_adls", "")
gold_adls = bronze_data.get("gold_adls", "")
silver_data = silver_params

# Debug: Print the extracted values for verification
print(f"Start Date: {start_date}, End Date: {end_date}")
print(f"Silver ADLS Path: {silver_adls}, Gold ADLS Path: {gold_adls}")


In [0]:
from pyspark.sql.functions import when, col, udf
from pyspark.sql.types import StringType
# Ensure the below library is installed on your cluster
import reverse_geocoder as rg
from datetime import date, timedelta
from pyspark.sql.functions import col, when, dayofweek, round


In [0]:
df = spark.read.parquet(silver_data)

In [0]:
def get_country_code(lat, lon):
    """
    Retrieve the country code for a given latitude and longitude.

    Parameters:
    lat (float or str): Latitude of the location.
    lon (float or str): Longitude of the location.

    Returns:
    str: Country code of the location, retrieved using the reverse geocoding API.

    Example:
    >>> get_country_details(48.8588443, 2.2943506)
    'FR'
    """
    try:
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0].get('cc')
        print(f"Processed coordinates: {coordinates} -> {result}")
        return result
    except Exception as e:
        print(f"Error processing coordinates: {lat}, {lon} -> {str(e)}")
        return None
     



In [0]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- updated: timestamp (nullable = true)



In [0]:
# registering the udfs so they can be used on spark dataframes
get_country_code_udf = udf(get_country_code, StringType())

In [0]:
# adding country_code and city attributes
df = \
                df.\
                    withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

In [0]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- country_code: string (nullable = true)



In [0]:

df_final = df.withColumn('sig_class', 
    when(col("sig") < 100, "Low")
    .when((col("sig") >= 100) & (col("sig") < 500), "Moderate")
    .otherwise("High")
).withColumn("day_of_week", dayofweek(col("time"))) \
 .withColumn("depth_category", 
    when(col("elevation") >= -70, "Shallow")
    .when((col("elevation") < -70) & (col("elevation") >= -300), "Intermediate")
    .otherwise("Deep")
).withColumn("intensity", 
    when(col("mag") < 3.0, "Minor")
    .when((col("mag") >= 3.0) & (col("mag") < 5.0), "Moderate")
    .otherwise("Strong") # Added missing ) here
).withColumn("mag_rounded", round(col("mag"), 1))

In [0]:
df_final.printSchema()

root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- country_code: string (nullable = true)
 |-- sig_class: string (nullable = false)
 |-- day_of_week: integer (nullable = true)
 |-- depth_category: string (nullable = false)
 |-- intensity: string (nullable = false)
 |-- mag_rounded: double (nullable = true)



In [0]:

# Save the transformed DataFrame to the Silver container
gold_output_path = f"{gold_adls}earthquake_events_gold/"
     

# Append DataFrame to Silver container in Parquet format
df_final.write.mode('append').parquet(gold_output_path)