In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *
import os

Create the Spark instance and read the data

In [2]:
spark = SparkSession.builder \
        .appName("test") \
        .getOrCreate()
df = spark.read.csv("./database.csv",header = True)
df.show()

24/02/10 22:51:12 WARN Utils: Your hostname, omen-OMEN-Laptop-15-ek0025TX-Refurb resolves to a loopback address: 127.0.1.1; using 192.168.1.9 instead (on interface wlo1)
24/02/10 22:51:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/10 22:51:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       NULL|                  NULL

Combining the date and timestamp column and converting it the Time

In [3]:
df = df.withColumn("Timestamp", concat(col("Date"), lit(" "), col("Time")))
df = df.withColumn("Timestamp", to_timestamp(df["Timestamp"], "MM/dd/yyyy HH:mm:ss"))
df = df.withColumn("Magnitude", col("Magnitude").cast(DoubleType())).withColumn("Depth", col("Depth").cast(DoubleType())) \
       .withColumn("Latitude", col("Latitude").cast(DoubleType())).withColumn("Longitude", col("Longitude").cast(DoubleType()))
df.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|01/02/1965|13:44:18|  19.246| 

Filter the dataset to include only earthquakes with a magnitude greater than 5.0

In [4]:
df_earthquake = df.filter(col("Type")=="Earthquake")
df_earthquake = df_earthquake.filter(col("Magnitude")>=5.0)
df_earthquake.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|01/02/1965|13:44:18|  19.246| 

Calculate the average depth and magnitude of earthquakes for each earthquake type

In [5]:
avg_depth_magnitude = df.groupBy("Type") \
    .agg(avg("Depth").alias("avg_depth"), avg("Magnitude").alias("avg_magnitude"))
avg_depth_magnitude.show()

+-----------------+-----------------+-----------------+
|             Type|        avg_depth|    avg_magnitude|
+-----------------+-----------------+-----------------+
|        Explosion|              0.0|             5.85|
|       Rock Burst|              1.0|              6.2|
|Nuclear Explosion|              0.3|5.850685714285718|
|       Earthquake|71.31391348140497|5.882762568870756|
+-----------------+-----------------+-----------------+



Implement a UDF to categorize the earthquakes into levels (e.g., Low, Moderate, High) based on their magnitudes


Calculate the distance of each earthquake from a reference location (e.g., (0, 0)).

In [6]:
def categorize_magnitude(magnitude):
    if magnitude < 4.0:
        return "Low"
    elif magnitude >= 4.0 and magnitude < 6.0:
        return "Moderate"
    else:
        return "High"

# Register UDF
categorize_magnitude_udf = udf(categorize_magnitude, StringType())

# Apply UDF to create a new column for earthquake level
df_udf = df.filter(col("Type")=="Earthquake").withColumn("level", categorize_magnitude_udf(df["Magnitude"]))

# Calculate distance from reference location (0, 0)
df_udf = df_udf.withColumn("distance", sqrt(pow(df["Latitude"], 2) + pow(df["Longitude"], 2)))

# Show the result
df_udf.show()


[Stage 7:>                                                          (0 + 1) / 1]

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+--------+------------------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|   level|          distance|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+------

                                                                                

In [8]:
import folium
from folium.plugins import MarkerCluster
import pandas

earthquake_pd = df_udf.toPandas()

# Create a Folium map centered at (0, 0)
m = folium.Map(location=[0, 0], zoom_start=2)

# Create a marker cluster for earthquakes
marker_cluster = MarkerCluster().add_to(m)

# Add markers for each earthquake
for index, row in earthquake_pd.iterrows():
    folium.Marker([row['Latitude'], row['Longitude']], 
                  popup=f"Magnitude: {row['Magnitude']}").add_to(marker_cluster)

# Display the map
m.save('./earthquake_map.html')

df_udf.write.mode("overwrite").option("header", "true").csv("./processed_data")