# Earthquake Data Processing Pipeline

This notebook processes earthquake data from the Bronze ADLS layer to the Silver ADLS layer. The data is read from JSON files, transformed, and then written back to the ADLS.

## Configuration and Setup

First, we set up the necessary configurations and import required libraries.

In [0]:
from datetime import date, timedelta

# Remove this before running Data Factory Pipeline
start_date = date.today() - timedelta(days=1)

bronze_adls = "abfss://bronze@earthquakedatadb.dfs.core.windows.net/"
silver_adls = "abfss://silver@earthquakedatadb.dfs.core.windows.net/"    

## Import Libraries

Import necessary libraries for data processing.

In [0]:
# Import Libraries
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta

## Load Data

Load the JSON data from the Bronze ADLS layer into a Spark DataFrame.

In [0]:
# Load the JSON data into a Spark DataFrame
df = spark.read.option("multiline", "true").json(f"{bronze_adls}{start_date}_earthquake_data.json")

In [0]:
df

DataFrame[geometry: struct<coordinates:array<double>,type:string>, id: string, properties: struct<alert:string,cdi:double,code:string,detail:string,dmin:double,felt:bigint,gap:double,ids:string,mag:double,magType:string,mmi:double,net:string,nst:bigint,place:string,rms:double,sig:bigint,sources:string,status:string,time:bigint,title:string,tsunami:bigint,type:string,types:string,tz:string,updated:bigint,url:string>, type: string]

In [0]:
df.head()

Row(geometry=Row(coordinates=[-115.9286, 36.7856, 7.2], type='Point'), id='nn00896479', properties=Row(alert=None, cdi=None, code='00896479', detail='https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=nn00896479&format=geojson', dmin=0.017, felt=None, gap=199.50000000000006, ids=',nn00896479,', mag=0.3, magType='ml', mmi=None, net='nn', nst=12, place='33 km NW of Indian Springs, Nevada', rms=0.0905, sig=1, sources=',nn,', status='reviewed', time=1745107130799, title='M 0.3 - 33 km NW of Indian Springs, Nevada', tsunami=0, type='earthquake', types=',origin,phase-data,', tz=None, updated=1745108597444, url='https://earthquake.usgs.gov/earthquakes/eventpage/nn00896479'), type='Feature')

## Data Transformation

Perform necessary data transformations.

### Reshaping Data

Reshape the data to fit the required schema for further analysis.

In [0]:
# Reshape earthquake data
df = (
    df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('depth'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)

In [0]:
df

DataFrame[id: string, longitude: double, latitude: double, depth: double, title: string, place_description: string, sig: bigint, mag: double, magType: string, time: bigint, updated: bigint]

In [0]:
df.head()

Row(id='nn00896479', longitude=-115.9286, latitude=36.7856, depth=7.2, title='M 0.3 - 33 km NW of Indian Springs, Nevada', place_description='33 km NW of Indian Springs, Nevada', sig=1, mag=0.3, magType='ml', time=1745107130799, updated=1745108597444)

### Data Validation

Validate the transformed and reshaped data to ensure data quality.

In [0]:
# Validate data: Check for missing or nul values
df = (
    df
    .withColumn('longitude', when(isnull('longitude'), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull('latitude'), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull('time'), 0).otherwise(col('time')))
)

In [0]:
# Convert 'time' and 'updated' to timestamp from Unix time
df = (
    df
    .withColumn('time', (col('time') / 1000).cast(TimestampType()))
    .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
)

In [0]:
df.head()

Row(id='nn00896479', longitude=-115.9286, latitude=36.7856, depth=7.2, title='M 0.3 - 33 km NW of Indian Springs, Nevada', place_description='33 km NW of Indian Springs, Nevada', sig=1, mag=0.3, magType='ml', time=datetime.datetime(2025, 4, 19, 23, 58, 50, 799000), updated=datetime.datetime(2025, 4, 20, 0, 23, 17, 444000))

### Save Transformed Data

Save the transformed data to the Silver ADLS layer.

In [0]:
# Save the transformed DataFrame to the Silver container
silver_output_path = f"{silver_adls}earthquake_events_silver/"

In [0]:
# Append DataFrame to the Silver container in Parquet format
df.write.mode("append").parquet(silver_output_path)