In [None]:
df = spark.read.parquet("Files/Silver/" + f'{current_date}_earthquake_data')

StatementMeta(, 377edef6-7f0d-4104-8998-cc2b135f5925, 3, Finished, Available, Finished)

NameError: name 'current_date' is not defined

### status: filter reviewed only

In [None]:
from pyspark.sql.functions import col

reviewed_df = df.filter(col('status') == 'reviewed')

StatementMeta(, , , Cancelled, , Cancelled)

### place: replace acronyms with full values
(i.e., "77 km ESE of Sand Point, Alaska" >> "77 km South East of Sand Point, Alaska")

In [None]:
from functools import reduce
from pyspark.sql.functions import regexp_replace, col

direction_mapping = {
    "N": "north",
    "S": "south",
    "E": "east",
    "W": "west",
    "NE": "north-east",
    "NW": "north-west",
    "SE": "south-east",
    "SW": "south-west",
    "NNE": "north-north-east",
    "NNW": "north-north-west",
    "SSE": "south-south-east",
    "SSW": "south-south-west",
    "ENE": "east-north-east",
    "ESE": "east-south-east",
    "WSW": "west-south-west",
    "WNW": "west-north-west"
}

replace_expr = reduce(
    lambda a, b: regexp_replace(a, rf"\b{b[0]}\b", b[1]),
    direction_mapping.items(),
    col("place")
)

direction_mapped_df = reviewed_df.withColumn("place", replace_expr)

StatementMeta(, , , Cancelled, , Cancelled)

### Convert `time` (epoch) to timestamp

In [None]:
from pyspark.sql.functions import from_unixtime, to_timestamp, to_utc_timestamp

# Convert the 'time' column from milliseconds to UTC timestamp
df_with_utc = direction_mapped_df.withColumn('utc_datetime', from_unixtime(col('time') / 1000))

# Convert UTC to Warsaw time zone (Europe/Warsaw)
df_with_warsaw_time = df_with_utc.withColumn('warsaw_datetime', to_utc_timestamp(col('utc_datetime'), 'Europe/Warsaw')).drop('utc_datetime')


StatementMeta(, , , Cancelled, , Cancelled)

### Add `country` column based on geolocation

In [None]:
from pyspark.sql.functions import when, col, udf, count
from pyspark.sql.types import StringType
import reverse_geocoder as rg

In [None]:

def get_country_code(lat, lon):
    coordinates = (float(lat), float(lon))
    return rg.search(coordinates)[0].get('cc')

# registering the udfs so they can be used on spark dataframes
get_country_code_udf = udf(get_country_code, StringType())

In [None]:
# adding country_code and city attributes
df_with_location = \
                df_with_warsaw_time.\
                    withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

### Add significance classification

In [None]:
from pyspark.sql.functions import when
df_with_sig_rating = df_with_location.withColumn('significance_rating',
    when(col('significance')<=100, 'low')
    .when((col('significance')>100) & (col('significance')<=250), 'medium')
    .when(col('significance')>250, 'high')
    .otherwise('check')
    )

### Write to table

In [None]:
if mode == 'overwrite':
    df_with_sig_rating.write.mode('overwrite').option("overwriteSchema", "true").format("delta").saveAsTable('earthquakes')
elif mode == 'append':
    df_with_sig_rating.write.mode('append').format("delta").saveAsTable('earthquakes')
else:
    raise Exception('Table not populated. Check variable: mode')

StatementMeta(, , , Cancelled, , Cancelled)