In [None]:
# Define ADLS Gen2 Storage Account Name
# IMPORTANT: Replace 'earthquakedataluke' with your actual ADLS Gen2 storage account name for interactive testing.
storage_account_name = "earthquakedataluke" # Replace with your storage account name

# Uncomment the following lines for Azure Data Factory (ADF) execution:
# dbutils.widgets.text("storage_account_name", "", "Storage Account Name")
# storage_account_name = dbutils.widgets.get("storage_account_name")

print(f"Using storage_account_name: {storage_account_name}")

In [None]:
from datetime import date, timedelta

# Remove this before running Data Factory Pipeline
start_date = date.today() - timedelta(1)

silver_adls = f"abfss://silver@{storage_account_name}.dfs.core.windows.net/"
gold_adls = f"abfss://gold@{storage_account_name}.dfs.core.windows.net/"

silver_data = f"{silver_adls}earthquake_events_silver/"


In [None]:
''' Data Factory

import json

# Get base parameters
dbutils.widgets.text("bronze_params", "")
dbutils.widgets.text("silver_params", "")
dbutils.widgets.text("storage_account_name", "", "Storage Account Name")

bronze_params = dbutils.widgets.get("bronze_params")
silver_params = dbutils.widgets.get("silver_params")
storage_account_name_direct = dbutils.widgets.get("storage_account_name") # If passed directly by ADF
silver_data_params = {}
if silver_params and isinstance(silver_params, str) and silver_params.strip().startswith('{'):
    try:
        silver_data_params = json.loads(silver_params)
    except json.JSONDecodeError:
        print(f"Warning: Could not decode silver_params as JSON: {silver_params}")
        # silver_params might be just a path as in original notebook, handle accordingly
        # If it's just a path, storage_account_name should come from bronze_data or direct widget
        pass
storage_account_name_silver = silver_data_params.get("storage_account_name", "") # From silver_params (if JSON and contains the key)

# Debug: Print the raw input values for troubleshooting
print(f"Raw bronze_params: {bronze_params}")
print(f"Raw silver_params: {silver_params}")

# Parse the JSON string
bronze_data = json.loads(bronze_params)

# Access individual variables
storage_account_name_bronze = bronze_data.get("storage_account_name", "") # From bronze_data
start_date = bronze_data.get("start_date", "")
end_date = bronze_data.get("end_date", "")
silver_adls_param = bronze_data.get("silver_adls", "") # Path from bronze_data
gold_adls_param = bronze_data.get("gold_adls", "")   # Path from bronze_data
silver_data_input_path = silver_params # This is the direct input from silver_params widget

# Determine storage_account_name to use
if 'storage_account_name_direct' in locals() and storage_account_name_direct:
    storage_account_name = storage_account_name_direct
    print(f"Using direct ADF storage_account_name: {storage_account_name}")
elif 'storage_account_name_silver' in locals() and storage_account_name_silver:
    storage_account_name = storage_account_name_silver
    print(f"Using storage_account_name from silver_params: {storage_account_name}")
elif 'storage_account_name_bronze' in locals() and storage_account_name_bronze:
    storage_account_name = storage_account_name_bronze
    print(f"Using storage_account_name from bronze_data: {storage_account_name}")
else:
    print("Warning: storage_account_name not found in ADF or upstream params. Using pre-defined or default from interactive cell.")
    if 'storage_account_name' not in locals(): # Fallback to value from widget cell if running interactively
        storage_account_name = "earthquakedataluke" # Default if not found anywhere else
print(f"Final storage_account_name for Gold: {storage_account_name}")

# Re-construct ADLS paths using the determined storage_account_name
silver_adls = f"abfss://silver@{storage_account_name}.dfs.core.windows.net/"
gold_adls = f"abfss://gold@{storage_account_name}.dfs.core.windows.net/"

# Determine the correct silver_data path to read from
if isinstance(silver_data_params, dict) and silver_data_params.get('silver_output_path'):
    silver_data = silver_data_params['silver_output_path']
    print(f"Using silver_output_path from parsed silver_params (JSON): {silver_data}")
elif silver_data_input_path and silver_data_input_path.startswith('abfss://'):
    silver_data = silver_data_input_path
    print(f"Using silver_params widget value as full silver_data path: {silver_data}")
else:
    # Fallback if silver_data_input_path is not a full path (e.g. only a date or relative part)
    silver_data = f"{silver_adls}earthquake_events_silver/"
    print(f"Defaulting silver_data path using determined silver_adls: {silver_data}")

# Debug: Print the extracted values for verification
print(f"Start Date: {start_date}, End Date: {end_date}")
print(f"Silver ADLS (used for constructing silver_data path if needed): {silver_adls}")
print(f"Gold ADLS Path: {gold_adls}")
print(f"Final silver_data path to be read: {silver_data}")
'''

In [None]:
from pyspark.sql.functions import when, col, udf
from pyspark.sql.types import StringType
# Ensure the below library is installed on your cluster
import reverse_geocoder as rg
from datetime import date, timedelta

In [None]:
df = spark.read.parquet(silver_data).filter(col('time') > start_date)

In [None]:
# The problem is caused by the Python UDF (reverse_geocoder) being a bottleneck due to its non-parallel nature and high computational cost per task

In [None]:
def get_country_code(lat, lon):
    """
    Retrieve the country code for a given latitude and longitude.

    Parameters:
    lat (float or str): Latitude of the location.
    lon (float or str): Longitude of the location.

    Returns:
    str: Country code of the location, retrieved using the reverse geocoding API.

    Example:
    >>> get_country_details(48.8588443, 2.2943506)
    'FR'
    """
    try:
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0].get('cc')
        print(f"Processed coordinates: {coordinates} -> {result}")
        # Note: print() statements in UDFs are for interactive debugging; output may not appear on driver or be reliably collected from worker logs in production.
        return result
    except Exception as e:
        print(f"Error processing coordinates: {lat}, {lon} -> {str(e)}")
        # Note: print() statements in UDFs are for interactive debugging; output may not appear on driver or be reliably collected from worker logs in production.
        return "ERROR_GEOCODING" # Changed from None


In [None]:
# registering the udfs so they can be used on spark dataframes
get_country_code_udf = udf(get_country_code, StringType())

In [None]:
get_country_code(48.8588443, 2.2943506)

### IMPORTANT: Performance Consideration for `get_country_code_udf`

The `get_country_code_udf` uses the `reverse_geocoder` library within a standard Python UDF. As noted in the project's `guide.md` (Step 7) and in earlier comments, this approach can be a **significant performance bottleneck** for large datasets due to the row-by-row processing nature of standard UDFs and the computational cost of geocoding.

**For production or large-scale processing, consider the following optimizations as recommended in `guide.md`:**
- Using Pandas UDFs (vectorized UDFs) for better performance with Python native libraries.
- Pre-calculating a lookup table for common coordinates if the geographic range is somewhat limited.
- Performing geocoding in batches outside of the main Spark transformation if feasible.

The `df.limit(100)` line, previously used for faster testing, has been removed to allow full data processing. Ensure your cluster is appropriately sized if running this notebook on large datasets with the current UDF implementation.

In [None]:
# adding country_code and city attributes
df_with_location = \
                df.\
                    withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

In [None]:
df.printSchema()

In [None]:
df_with_location.printSchema()

In [None]:
# adding significance classification
df_with_location_sig_class = \
                            df_with_location.\
                                withColumn('sig_class', 
                                            when(col("sig") < 100, "Low").\
                                            when((col("sig") >= 100) & (col("sig") < 500), "Moderate").\
                                            otherwise("High")
                                            )

In [None]:
df_with_location_sig_class.printSchema()

In [None]:
# Save the transformed DataFrame to the Gold container using Parquet format
gold_output_path = f"{gold_adls}earthquake_events_gold/" 

In [None]:
# Write DataFrame to Gold container in Parquet format, appending to existing data
df_with_location_sig_class.write.mode('append').parquet(gold_output_path)