In [0]:
pip install reverse_geocoder

In [0]:

import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Ensure the below library is installed on your cluster
import reverse_geocoder as rg
from datetime import *
     

In [0]:
input_path='/Volumes/earth_data/bronze/operationaldata/earthquake_data.json'
# Load the JSON data into a Spark DataFrame
earthquake_df = spark.read.option('multiline', 'true').json(input_path)

In [0]:

# Reshape earthquake data
earthquake_selected = (
    earthquake_df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('magnitude'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated'),
         col('properties.sources').alias('sources')
    )
)
    

In [0]:

# Validate data: Check for missing or null values
earthquake_selected = (
    earthquake_selected
    .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

In [0]:

# Convert 'time' and 'updated' to timestamp from Unix time
earthquake_selected = (
    earthquake_selected
    .withColumn('time', (col('time') / 1000).cast(TimestampType()))
    .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
)

In [0]:
@pandas_udf(StringType())
def get_county_batch(lat_series: pd.Series, lon_series: pd.Series) -> pd.Series:
    coords = [(float(lat), float(lon)) 
              for lat, lon in zip(lat_series, lon_series) 
              if pd.notna(lat) and pd.notna(lon)]
    
    if not coords:
        return pd.Series([None] * len(lat_series))
    
    results = rg.search(coords)
    counties = [r.get('admin2', '').upper().replace(' COUNTY', '').strip() 
                for r in results]
    
    return pd.Series(counties)

In [0]:
# Repartition for better parallelism
earthquake_with_location = (
    earthquake_selected
    .repartition(1000)  # Adjust based on cluster size
    .withColumn('county', get_county_batch(col('latitude'), col('longitude')))
)

In [0]:
earthquake_with_location.show()

In [0]:
silver_path='earth_data.silver.earthquake'
earthquake_with_location.write.mode('overwrite').format('delta').saveAsTable(silver_path)