## Silver Layer: Data cleaning and transformation

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime, lit, when, udf, struct, max as spark_max
from pyspark.sql.types import StringType
from pyspark.sql.functions import avg, count, max as spark_max, min as spark_min, date_format, window
import reverse_geocoder as rg


from pyspark.sql.types import TimestampType

StatementMeta(, b67b6337-f5fc-4bfc-b28a-0182efc926b5, 10, Finished, Available, Finished)

In [7]:
#### Remove
#from datetime import date, timedelta
#start_date = date.today() - timedelta(2)
#print (start_date)

StatementMeta(, b67b6337-f5fc-4bfc-b28a-0182efc926b5, 11, Finished, Available, Finished)

2025-01-15


In [8]:
# Initialize Spark session
spark = SparkSession.builder.appName("EarthquakeDataTransformation").getOrCreate()

# Load JSON data into Spark DataFrame
df = spark.read.option("multiline", "true").json(f"Files/{start_date}_earthquake_data.json")

StatementMeta(, b67b6337-f5fc-4bfc-b28a-0182efc926b5, 12, Finished, Available, Finished)

In [9]:
#print(df.count())

StatementMeta(, b67b6337-f5fc-4bfc-b28a-0182efc926b5, 13, Finished, Available, Finished)

234


In [10]:
# Data Type Conversion: Convert epoch time to human-readable format
df = df.withColumn("time", from_unixtime(col("properties.time") / 1000).alias("time"))
df = df.withColumn("updated", from_unixtime(col("properties.updated") / 1000).alias("updated"))

# Derived Columns: Extract latitude, longitude, depth
df = df.withColumn("latitude", col("geometry.coordinates")[1])
df = df.withColumn("longitude", col("geometry.coordinates")[0])
df = df.withColumn("depth", col("geometry.coordinates")[2])

StatementMeta(, b67b6337-f5fc-4bfc-b28a-0182efc926b5, 14, Finished, Available, Finished)

In [11]:

# Magnitude categorization UDF
def categorize_magnitude(mag):
    if mag < 2.0:
        return 'Minor'
    elif mag < 4.0:
        return 'Light'
    elif mag < 6.0:
        return 'Moderate'
    elif mag < 7.0:
        return 'Strong'
    elif mag < 8.0:
        return 'Major'
    else:
        return 'Great'

mag_category_udf = udf(categorize_magnitude, StringType())
df = df.withColumn("magCategory", mag_category_udf(col("properties.mag")))

# Data Enrichment: Placeholder for location details
#df = df.withColumn("location", lit("Location placeholder"))

# Flattening and selecting relevant columns
df_flattened = df.select(
    col("properties.mag").alias("magnitude"),
    col("properties.place").alias("place"),
	col("properties.sig").alias("sig"),
	col("properties.magType").alias("magType"),
    col("time"),
    col("updated"),
    col("latitude"),
    col("longitude"),
    col("depth"),
    col("magCategory")
    
)

# Additional Transformations (Optional):
# Removing duplicates if any based on unique identifiers or relevant fields
df_deduped = df_flattened.dropDuplicates(["magnitude", "place", "time"])

# Filtering rows with valid magnitude categories
df_filtered = df_deduped.filter(col("magCategory").isNotNull())

# Show the transformed DataFrame
#df_filtered.show(truncate=False)



StatementMeta(, b67b6337-f5fc-4bfc-b28a-0182efc926b5, 15, Finished, Available, Finished)

In [12]:
# Define the function to get the country code using reverse geocoding
def get_country_code(lat, lon):
    coordinates = (float(lat), float(lon))
    return rg.search(coordinates)[0].get('cc')

# Register the UDF
get_country_code_udf = udf(get_country_code, StringType())

StatementMeta(, b67b6337-f5fc-4bfc-b28a-0182efc926b5, 16, Finished, Available, Finished)

In [13]:
# Adding country_code and sig_class attributes
df_with_location = df_filtered.withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

df_with_location_sig_class = df_with_location.withColumn(
    'sig_class',
    when(col("sig") < 100, "Low").when((col("sig") >= 100) & (col("sig") < 500), "Moderate").otherwise("High")
)

StatementMeta(, b67b6337-f5fc-4bfc-b28a-0182efc926b5, 17, Finished, Available, Finished)

In [14]:
#Remove 
##display(df_with_location_sig_class)

StatementMeta(, b67b6337-f5fc-4bfc-b28a-0182efc926b5, 18, Finished, Available, Finished)

In [15]:
# appending the data to the gold table
df_with_location_sig_class.write.mode('append').saveAsTable('events_silver')

StatementMeta(, b67b6337-f5fc-4bfc-b28a-0182efc926b5, 19, Finished, Available, Finished)