## **Earthquake Data Processing - Silver Layer**
#### **Read the incremental data, explode the array, flatten the structure**

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import date, timedelta

StatementMeta(, ae075154-9abd-4877-b4dc-7ce6347f1499, 3, Finished, Available, Finished)

#### **Read the json payload in Spark Dataframe**

In [None]:
start_date = date.today()

# To read the delta file only, we shall give a current date to process the latest files
# df = spark.read.option("multiline", "true").json(f"Files/Earthquake_Data/{start_date}_earthquake_data.json")

df = spark.read.option("multiline", "true").json(f"Files/Earthquake_Data/*.json")

# df now is a Spark DataFrame containing JSON data from "Files/Earthquake_Data/<date>_earthquake_data.json".
# display(df)

StatementMeta(, ae075154-9abd-4877-b4dc-7ce6347f1499, 16, Finished, Available, Finished)

#### **Now flatten the data by extracting and renaming the attributes**
##### **Below the structure of source data**

```
{
		"type": "Feature",
		"properties": {
			"mag": 1.45,
			"place": "15 km ENE of San Simeon, CA",
			"time": 1738559830510,
			"updated": 1738559924099,
			"tz": null,
			"url": "https://earthquake.usgs.gov/earthquakes/eventpage/nc75127356",
			"detail": "https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=nc75127356&format=geojson",
			"felt": null,
			"cdi": null,
			"mmi": null,
			"alert": null,
			"status": "automatic",
			"tsunami": 0,
			"sig": 32,
			"net": "nc",
			"code": "75127356",
			"ids": ",nc75127356,",
			"sources": ",nc,",
			"types": ",nearby-cities,origin,phase-data,",
			"nst": 7,
			"dmin": 0.09188,
			"rms": 0.03,
			"gap": 107,
			"magType": "md",
			"type": "earthquake",
			"title": "M 1.5 - 15 km ENE of San Simeon, CA"
		},
		"geometry": {
			"type": "Point",
			"coordinates": [
				-121.047332763672,
				35.7131652832031,
				3.98000001907349
			]
		},
		"id": "nc75127356"
	}
```

#### **Transforming the data and creating mock dimensions solely to demonstrate the joins**

In [None]:

# Quantitative Data
df = df.select(
    'id',
    col('geometry.coordinates').getItem(0).alias('longitude'),
    col('geometry.coordinates').getItem(1).alias('latitude'),
    col('geometry.coordinates').getItem(2).alias('elevation'),
    col('properties.title').alias('title'),
    col('properties.place').alias('place_description'),
    col('properties.type').alias('type'),
    col('properties.mag').alias('magnitude'),
    col('properties.magType').alias('magType'),
    col('properties.time').alias('time'),
).withColumn('timestamp',  
    when(col('time').isNull(), None)
        .otherwise((col('time')/1000).cast(TimestampType()))
    ) 


# Qualitative Data
df_magType = df.select(col('magType').alias('magType_id')).dropDuplicates(['magType_id'])
display(df_magType)


StatementMeta(, ae075154-9abd-4877-b4dc-7ce6347f1499, 17, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 65571efd-800d-4e3d-814b-5b958f5ef88a)

#### **Join the dataframes and demonstrate broadcast join**

In [None]:
df = df.join(broadcast(df_magType), df["magType"] == df_magType["magType_id"], how='left' ).withColumnRenamed("magType_id", "magType_joined")
display(df)

StatementMeta(, ae075154-9abd-4877-b4dc-7ce6347f1499, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 79ec4612-2627-481f-a1a0-36f3a81aecca)

### **Time to write the table in lakehouse**
###### **As this is simplest project to show you medallion architure, you must have dimension like scds etc. to handle in silver**

In [None]:
# if not spark.catalog.tableExists('earthquakes_lakehouse.earthquake_events_silver'):
#     print("Table Doest Exists")
df.write.mode('overwrite').format('delta').option("mergeSchema", "true").saveAsTable("earthquakes_lakehouse.earthquake_events_silver")
# else:
#     print("Table Exists")

StatementMeta(, ae075154-9abd-4877-b4dc-7ce6347f1499, 19, Finished, Available, Finished)