In [0]:
from datetime import date, timedelta

silver_adls= "abfss://silver@earthquendtoend.dfs.core.windows.net/"
gold_adls= "abfss://gold@earthquendtoend.dfs.core.windows.net/"

silver_data= f"{silver_adls}earthquake_events_silver/"

In [0]:
from pyspark.sql.functions import col, when, udf
from pyspark.sql.types import StringType
from datetime import date, timedelta
import reverse_geocoder as rg

In [0]:
start_date = date.today()-timedelta(1)
df = spark.read.parquet(silver_data).filter(col("time")> start_date)

In [0]:
df.show(10)

+-----------+-----------------+----------------+----------------+--------------------+--------------------+---------+-------+---+--------------------+--------------------+
|         id|        longitude|        latitude|       elevation|               title|   place_description|magnitude|magtype|sig|                time|             updated|
+-----------+-----------------+----------------+----------------+--------------------+--------------------+---------+-------+---+--------------------+--------------------+
| nc75112231| -122.77799987793|38.7946662902832|2.32999992370605|M 0.7 - 3 km NNW ...|3 km NNW of The G...|     0.71|     md|  8|2025-01-02 23:49:...|2025-01-02 23:51:...|
| ci40832719|         -116.447|         33.4075|            13.5|M 1.2 - 18 km NNW...|18 km NNW of Borr...|     1.19|     ml| 22|2025-01-02 23:49:...|2025-01-03 00:00:...|
| tx2025adqo|          -104.41|          31.672|          6.4331|M 1.3 - 55 km S o...|55 km S of Whites...|      1.3|     ml| 26|2025-01-02 

In [0]:
df= df.limit(300)

In [0]:
def getcountrycode(lat, long):

    try:
        coordinates = (float(lat), float(long))
        result = rg.search(coordinates)[0].get('cc')
        return result
    except Exception as e:
        print("error processing {lat}, {long} - {e}")
        return None

In [0]:
get_country_code_udf = udf(getcountrycode, StringType())

In [0]:
df_with_location = df. withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

In [0]:
df_with_location.printSchema()

root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- magnitude: double (nullable = true)
 |-- magtype: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- country_code: string (nullable = true)



In [0]:
df_with_location_sig_class = \
    df_with_location.\
        withColumn("sig_class",\
                    when(col("sig")< 100, "Low"). \
                    when((col('sig')>=100) & (col('sig')< 500), "Moderate").\
                    otherwise("High")
            )

In [0]:
df_with_location_sig_class.show(4)

+----------+-----------------+----------------+----------------+--------------------+--------------------+---------+-------+---+--------------------+--------------------+------------+---------+
|        id|        longitude|        latitude|       elevation|               title|   place_description|magnitude|magtype|sig|                time|             updated|country_code|sig_class|
+----------+-----------------+----------------+----------------+--------------------+--------------------+---------+-------+---+--------------------+--------------------+------------+---------+
|nc75112231| -122.77799987793|38.7946662902832|2.32999992370605|M 0.7 - 3 km NNW ...|3 km NNW of The G...|     0.71|     md|  8|2025-01-02 23:49:...|2025-01-02 23:51:...|          US|      Low|
|ci40832719|         -116.447|         33.4075|            13.5|M 1.2 - 18 km NNW...|18 km NNW of Borr...|     1.19|     ml| 22|2025-01-02 23:49:...|2025-01-03 00:00:...|          US|      Low|
|tx2025adqo|          -104.41|

In [0]:
df_with_location_sig_class.printSchema()

root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- magnitude: double (nullable = true)
 |-- magtype: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- country_code: string (nullable = true)
 |-- sig_class: string (nullable = false)



In [0]:
gold_output_path= f"{gold_adls}earthquake_events_gold/"

In [0]:
df_with_location_sig_class.write.mode("append").parquet(gold_output_path)