In [0]:
# For Static Pipeline

"""from datetime import date, timedelta

# Remove this before running Data Factory Pipeline
start_date = date.today() - timedelta(1)

silver_adls = "abfss://silver@prj1storage.dfs.core.windows.net/"
gold_adls = "abfss://gold@prj1storage.dfs.core.windows.net/"

silver_data = f"{silver_adls}earthquake_events_silver/"
"""

## FOR DYNAMIC PIPELINE

import json

# Get base parameters
dbutils.widgets.text('bronze_params', '')
dbutils.widgets.text('silver_params', '')

bronze_params = dbutils.widgets.get('bronze_params')
silver_params = dbutils.widgets.get('silver_params')

# Debug: Print the raw input values for troubleshooting
print(f"Raw bronze_params: {bronze_params}")
print(f"Raw silver_params: {silver_params}")

bronze_data = json.loads(bronze_params)

# Access individual variables
start_date = bronze_data.get('start_date', '')
end_date = bronze_data.get('end_date', '')
silver_adls = bronze_data.get('silver_adls', '')
gold_adls = bronze_data.get('gold_adls', '')
silver_data = silver_params

# Debug: Print the extracted values for verification
print(f"Start Date: {start_date}, End Date: {end_date}")
print(f"Silver ADLS path: {silver_adls}, Gold ADLS path: {gold_adls}")

Raw bronze_params: 
Raw silver_params: 


[0;31m---------------------------------------------------------------------------[0m
[0;31mJSONDecodeError[0m                           Traceback (most recent call last)
File [0;32m<command-2971894761681486>, line 29[0m
[1;32m     26[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mRaw bronze_params: [39m[38;5;132;01m{[39;00mbronze_params[38;5;132;01m}[39;00m[38;5;124m"[39m)
[1;32m     27[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mRaw silver_params: [39m[38;5;132;01m{[39;00msilver_params[38;5;132;01m}[39;00m[38;5;124m"[39m)
[0;32m---> 29[0m bronze_data [38;5;241m=[39m json[38;5;241m.[39mloads(bronze_params)
[1;32m     31[0m [38;5;66;03m# Access individual variables[39;00m
[1;32m     32[0m start_date [38;5;241m=[39m bronze_data[38;5;241m.[39mget([38;5;124m'[39m[38;5;124mstart_date[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124m'[39m)

File [0;32m/usr/lib/python3.11/json/__init__.py:346[0m, in

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date, timedelta

# Ensure the below library is installed on your computer
import reverse_geocoder as rg

In [0]:
df = spark.read.parquet(silver_data).filter(col('time') > start_date)

In [0]:
df = df.limit(100) # added to speed up processings as during testing it was proving a bottleneck
# The problem is caused by the Python UDF (reverse_geocoder) being a bottleneck due to its non-parallel nature and high computational cost per task

In [0]:
def get_country_code(lat, lon):
    """
    Retrieve the country code for a given latitude and longitude.

    parameters:
    lat (float or string): Latitude of the location.
    lon (float or string): Longitude of the location.

    Returns:
    str: Country code of the location, retrieve using reverse_geocoder API.

    Example:
    >>> get_country_details(48.8588433, 2.2943506)
    'FR'
    """
    try:
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0].get('cc')
        print(f"Processed coordinates: {coordinates} -> {result}")
        return result
    except Exception as e:
        print(f"Error processing coordinates: {lat}, {lon} -> {str(e)}")
        return None

In [0]:
# register the udfs so they can be used on spark dataframes
get_country_code_udf = udf(get_country_code, StringType())

In [0]:
get_country_code(49.89598, 10.81953)

Processed coordinates: (49.89598, 10.81953) -> DE


'DE'

In [0]:
# Adding country_code and city attributes to the dataframe
df_with_location = df \
                    .withColumn('country_code', get_country_code_udf(col('latitude'), col('longitude')))

In [0]:
# Adding significance classification
df_with_location_sig_class = df_with_location \
                            .withColumn('sig_class', when(col('sig') < 100, 'Low') \
                                                    .when((col('sig') >= 100) & (col('sig') < 1000), 'Moderate') \
                                                    .otherwise('High') )

In [0]:
# Save the transformed DataFrame to the Gold Container
gold_output_path = f"{gold_adls}earthquake_events_gold/"

In [0]:
# Append DataFrame to Gold Container in Parquet format
df_with_location_sig_class.write.mode('append').parquet(gold_output_path)