# Gold Layer Notebook


In [23]:
from pyspark.sql.functions import when, col, udf
from pyspark.sql.types import StringType
# Ensure that below library is installed on your Fabric environment
import reverse_geocoder as rg
from datetime import date, timedelta

StatementMeta(, f5af7a07-1a4f-4cba-a551-30727e4a7587, 27, Finished, Available, Finished)

In [24]:
#remove this before running before data factory pipeline
# start_date = date.today() - timedelta(7)

StatementMeta(, f5af7a07-1a4f-4cba-a551-30727e4a7587, 28, Finished, Available, Finished)

In [25]:
df = spark.read.table("earthquake_events_silver").filter(col('time') > start_date)

StatementMeta(, f5af7a07-1a4f-4cba-a551-30727e4a7587, 29, Finished, Available, Finished)

In [26]:
def get_country_code(lat, lon):
    """
    Retrieve the country code for a given latitude and longitude.

    Parameters:
    lat (float or str): Latitude of the location.
    lon (float or str): Longitude of the location.

    Returns:
    str: Country code of the location, retrieved using the reverse geocoding API.

    Example:
    >>> get_country_details(48.8588443, 2.2943506)
    'FR'
    """
    coordinates = (float(lat), float(lon))
    return rg.search(coordinates)[0].get('cc')

StatementMeta(, f5af7a07-1a4f-4cba-a551-30727e4a7587, 30, Finished, Available, Finished)

In [27]:
# regestering the udf so they can be used on spark dataframe
get_country_code_udf = udf(get_country_code, StringType())

StatementMeta(, f5af7a07-1a4f-4cba-a551-30727e4a7587, 31, Finished, Available, Finished)

In [28]:
# adding country_code and city attributes
df_with_location = \
                df.\
                    withColumn("Country_Code", get_country_code_udf(col("latitude"), col("longitude")))

StatementMeta(, f5af7a07-1a4f-4cba-a551-30727e4a7587, 32, Finished, Available, Finished)

In [29]:
# adding significance classification
df_with_location_sig_class = \
                            df_with_location.\
                               withColumn('sig_class',
                                            when(col("sig") < 100, "Low").\
                                            when((col("sig") >= 100) & (col("sig") < 500), "Moderate").\
                                            otherwise("High")
                                           )

StatementMeta(, f5af7a07-1a4f-4cba-a551-30727e4a7587, 33, Finished, Available, Finished)

In [30]:
# appending the data to the gold table
df_with_location_sig_class.write.mode('append').saveAsTable('earthquake_events_gold')

StatementMeta(, f5af7a07-1a4f-4cba-a551-30727e4a7587, 34, Finished, Available, Finished)