In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, udf, concat, lit, when
from pyspark.sql.types import StringType

In [2]:
# Initialize Spark session
spark = SparkSession.builder.appName("Earthquake Analysis").getOrCreate()

24/02/22 08:19:20 WARN Utils: Your hostname, Ambikas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0)
24/02/22 08:19:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/22 08:19:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Load the dataset into a DataFrame
df = spark.read.csv(
    "database.csv",
    header=True,
)

In [4]:
# Task 2: Convert the Date and Time columns into a timestamp column named Timestamp

df = df.withColumn(
    "Timestamp_ISO", to_timestamp(col("Date"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
)

df = df.withColumn(
    "Timestamp_nonISO",
    to_timestamp(concat(col("Date"), lit(" "), col("Time")), "MM/dd/yyyy HH:mm:ss"),
)

df = df.select(
    when(col("Timestamp_ISO").isNull(), col("Timestamp_nonISO"))
    .otherwise(col("Timestamp_ISO"))
    .alias("Timestamp"),
    "Date",
    "Time",
    "Latitude",
    "Longitude",
    "Type",
    "Depth",
    "Depth Error",
    "Depth Seismic Stations",
    "Magnitude",
    "Magnitude Type",
    "Magnitude Error",
    "Magnitude Seismic Stations",
    "Azimuthal Gap",
    "Horizontal Distance",
    "Horizontal Error",
    "Root Mean Square",
    "ID",
    "Source",
    "Location Source",
    "Magnitude Source",
    "Status",
)

In [5]:
# Task 3: Filter the dataset to include only earthquakes with a magnitude greater than 5.0
df_filtered = df.filter(col("Magnitude") > 5.0)

In [6]:
# Task 4: Calculate the average depth and magnitude of earthquakes for each earthquake type
avg_depth_magnitude = df_filtered.groupBy("Type").agg(
    {"Depth": "avg", "Magnitude": "avg"}
)

In [7]:
# Task 5: Implement a UDF to categorize the earthquakes into levels (e.g., Low, Moderate, High) based on their magnitudes
def categorize_magnitude(magnitude):
    if magnitude < 6.0:
        return "Low"
    elif magnitude < 7.0:
        return "Moderate"
    else:
        return "High"


categorize_magnitude_udf = udf(categorize_magnitude, StringType())
df_filtered = df_filtered.withColumn(
    "MagnitudeLevel", categorize_magnitude_udf(col("Magnitude"))
)

In [8]:
# Task 6: Calculate the distance of each earthquake from a reference location (e.g., (0, 0))
# Assuming reference location is (0, 0)


def calculate_distance(lat, lon):
    return ((lat - 0) ** 2 + (lon - 0) ** 2) ** 0.5


calculate_distance_udf = udf(calculate_distance)
df_filtered = df_filtered.withColumn(
    "DistanceFromReference", calculate_distance_udf(col("Latitude"), col("Longitude"))
)

In [10]:
# Task 7: Save the final DataFrame as a CSV file
# Write CSV file with column header (column names)
df.write.option("header", True).csv(
    "final_analysis_data"
)

In [None]:
# Stop Spark session
spark.stop()