In [0]:
#Load the  dataset into a PySpark DataFrame.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Earthquake_Analysis").getOrCreate()
file_path = "/FileStore/tables/database-2.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

df.show(truncate=False)

+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+-------------------------+---------+---------------+----------------+---------+
|Date      |Time               |Latitude|Longitude|Type      |Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|ID                       |Source   |Location Source|Magnitude Source|Status   |
+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+-------------------------+---------+---------------+----------------+---------+
|01/02/1965|2024-02-07 13:44:18|19.246  |14

In [0]:
#Convert the Date and Time columns into a timestamp column named Timestamp
from pyspark.sql.functions import col, concat, to_timestamp
df = df.withColumn("Timestamp", to_timestamp(concat(col("Date"), col("Time")), "dd/MM/yyyy" "HH:mm:ss"))
df.show()

+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+---------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|Timestamp|
+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+---------+
|01/02/1965|2024-02-07 13:44

In [0]:
#Filter the dataset to include only earthquakes with a magnitude greater than 5.0.
filter_df = df.filter(col("Magnitude") >5.0)
filter_df.select("Magnitude").show()

+---------+
|Magnitude|
+---------+
|      6.0|
|      5.8|
|      6.2|
|      5.8|
|      5.8|
|      6.7|
|      5.9|
|      6.0|
|      6.0|
|      5.8|
|      5.9|
|      8.2|
|      5.5|
|      5.6|
|      6.0|
|      6.1|
|      8.7|
|      6.0|
|      5.7|
|      5.8|
+---------+
only showing top 20 rows



In [0]:
#Calculate the average depth and magnitude of earthquakes for each earthquake type.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

result_df = filter_df.groupBy("Type").agg(
    avg("Depth").alias("Avg_Depth"),
    avg("Magnitude").alias("Avg_Magnitude")
)

result_df.show()

+-----------------+-----------------+-----------------+
|             Type|        Avg_Depth|    Avg_Magnitude|
+-----------------+-----------------+-----------------+
|        Explosion|              0.0|             5.85|
|       Rock Burst|              1.0|              6.2|
|Nuclear Explosion|              0.3|5.850685714285718|
|       Earthquake|71.31391348140497|5.882762568870756|
+-----------------+-----------------+-----------------+



In [0]:
#Implement a UDF to categorize the earthquakes into levels (e.g., Low, Moderate, High) based on their magnitudes.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Define a UDF to catogorize earthquakes based on their magnitudes

def categorize_magnitude(magnitude):
    if magnitude<4.0:
        return"Low"
    elif 4.0 <= magnitude < 6.0:
        return"Moderate"
    else:
        return"High"
    

#Registerd UDF 
categorize_magnitude_udf = udf(categorize_magnitude,StringType())

#Apply UDF and Create a New Column 

df = df.withColumn("Magnitude_Level", categorize_magnitude_udf(col("Magnitude")))

df.select("Magnitude_Level").show(truncate=False)

+---------------+
|Magnitude_Level|
+---------------+
|High           |
|Moderate       |
|High           |
|Moderate       |
|Moderate       |
|High           |
|Moderate       |
|High           |
|High           |
|Moderate       |
|Moderate       |
|High           |
|Moderate       |
|Moderate       |
|High           |
|High           |
|High           |
|High           |
|Moderate       |
|Moderate       |
+---------------+
only showing top 20 rows



In [0]:
#Calculate the distance of each earthquake from a reference location (e.g., (0, 0)).

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sqrt

#Reference Location 
reference_latitude =0
reference_longitude = 0

#For Calculate Distance We use Pythagores Theorem
df = df.withColumn(
    "Distance",
    sqrt((col("Latitude") - reference_latitude)**2 + (col("Longitude") - reference_longitude)**2)
)

df.select("Distance").show()


+------------------+
|          Distance|
+------------------+
| 146.8823609968195|
|127.36562594750595|
|175.18490809713035|
| 63.59957566682344|
|126.98937818967381|
|167.16733432701497|
| 92.02725214848046|
|166.74399067132822|
|62.595144803730584|
|180.16922639008027|
| 109.2003635204572|
|125.97899812270298|
|170.68377985327137|
|178.84401109626234|
| 82.30239686667699|
|149.04674663004224|
|185.91857418235546|
|182.51257311758005|
| 179.8488221618368|
|183.17177837210622|
+------------------+
only showing top 20 rows



In [0]:
#Visualize the geographical distribution of earthquakes on a world map using appropriate libraries (e.g., Basemap or Folium).

from pyspark.sql.functions import col, concat, to_timestamp
import folium

# Initialize a Folium map centered at (0, 0)
map_center = [0, 0]
mymap = folium.Map(location=map_center, zoom_start=2)

# Define a function to add markers to the map
def add_marker(row):
    latitude = row['Latitude']
    longitude = row['Longitude']
    magnitude = row['Magnitude']
    popup_text = f"Magnitude: {magnitude}<br>Latitude: {latitude}<br>Longitude: {longitude}"
    folium.Marker([latitude, longitude], popup=popup_text).add_to(mymap)

# Use foreach to apply the add_marker function to each row
df.foreach(add_marker)

# Display the map in Databricks notebook using display
display(mymap)

In [0]:
#Final CSV format
final_csv_path = "final_earthquake_data.csv"
df.write.csv(final_csv_path, header=True, mode="overwrite")

In [0]:
final_csv_path = "/dbfs/FileStore/your_custom_path/final_earthquake_data.csv"
print(f"The file is stored at: {final_csv_path}")

The file is stored at: /dbfs/FileStore/your_custom_path/final_earthquake_data.csv
