<mark>Business-Gold → Aggregated, curated data (ready for analytics/reporting)</mark>

In [None]:
# Access parameter directly from Refined-Silver notebook / (Toggle parameter cell

bronze_output = ""
silver_data = ""

In [None]:
import json

# Parse the JSON string
bronze_data = json.loads(bronze_params)

# Access individual variables
start_date = bronze_data.get("start_date", "")
silver_adls = bronze_data.get("silver_adls", "")
gold_adls = bronze_data.get("gold_adls", "")

print(f"Start Date: {start_date}")
print(f"Silver ADLS: {silver_adls}")
print(f"Gold ADLS: {gold_adls}")

In [38]:
# For manually testing the notebook / remove before running in Data Factory Pipeline 
#import json
#from datetime import date, timedelta

#start_date = date.today() - timedelta(1)

#silver_adls = "abfss://refined-silver@synapsetest1298.dfs.core.windows.net/"
#gold_adls = "abfss://business-gold@synapsetest1298.dfs.core.windows.net/"

#silver_data = f"{silver_adls}earthquake_events_silver/"

StatementMeta(Synapsedemo, 4, 3, Finished, Available, Finished)

In [39]:
# Import Libraries

from pyspark.sql.functions import when, col, udf
from pyspark.sql.types import StringType
from datetime import date, timedelta
# Ensure the below library is installed on your cluster
import reverse_geocoder as rg


StatementMeta(Synapsedemo, 4, 4, Finished, Available, Finished)

In [40]:
# Get the data from the silver layer

df = spark.read.parquet(silver_data).filter(col('time') > start_date)

StatementMeta(Synapsedemo, 4, 5, Finished, Available, Finished)

In [41]:
def get_country_code(lat, lon):
    """
    Retrieve the country code for a given latitude and longitude.

    Parameters:
    lat (float or str): Latitude of the location.
    lon (float or str): Longitude of the location.

    Returns:
    str: Country code of the location, retrieved using the reverse geocoding API.

    Example:
    >>> get_country_details(48.8588443, 2.2943506)
    'FR'
    """
    try:
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0].get('cc')
        print(f"Processed coordinates: {coordinates} -> {result}")
        return result
    except Exception as e:
        print(f"Error processing coordinates: {lat}, {lon} -> {str(e)}")
        return None

StatementMeta(Synapsedemo, 4, 6, Finished, Available, Finished)

In [42]:
# registering the udfs so they can be used on spark dataframes
get_country_code_udf = udf(get_country_code, StringType())

StatementMeta(Synapsedemo, 4, 7, Finished, Available, Finished)

In [43]:
# adding country_code and city attributes
df_with_location = \
                df.\
                    withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

StatementMeta(Synapsedemo, 4, 8, Finished, Available, Finished)

In [44]:
# Printing schemas

print(df.printSchema())
print(df_with_location.printSchema())

StatementMeta(Synapsedemo, 4, 9, Finished, Available, Finished)

root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- updated: timestamp (nullable = true)

None
root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- country_code: string (nullable = true)

None


In [45]:
# adding significance classification
df_with_location_sig_class = \
                            df_with_location.\
                                withColumn('sig_class', 
                                            when(col("sig") < 100, "Low").\
                                            when((col("sig") >= 100) & (col("sig") < 500), "Moderate").\
                                            otherwise("High")
                                            )

print(df_with_location_sig_class.printSchema())

StatementMeta(Synapsedemo, 4, 10, Finished, Available, Finished)

root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- country_code: string (nullable = true)
 |-- sig_class: string (nullable = false)

None


In [46]:
# Save the transformed DataFrame to the gold container
gold_output_path = f"{gold_adls}earthquake_events_gold/"

StatementMeta(Synapsedemo, 4, 11, Finished, Available, Finished)

In [47]:
# append DataFrame to gold container in Parquet format / (overwrite to test)
df_with_location_sig_class.write.mode('overwrite').parquet(gold_output_path)

StatementMeta(Synapsedemo, 4, 12, Finished, Available, Finished)