In [88]:
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Aidetic-assignment').getOrCreate()



### 	1.Load the  dataset into a PySpark DataFrame.

In [89]:
df = spark.read.csv('/content/drive/MyDrive/Programs/Sample-data-files/database.csv',header=True, inferSchema=True)
df.show()
df.printSchema()

+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|
+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|01/02/1965|2024-02-05 13:44:18|  19.246|  145.616|Earthqu

### 2.Convert the Date and Time columns into a timestamp column named Timestamp.

In [120]:
from pyspark.sql.functions import *
df_timestamp = df.withColumn('Time',to_timestamp('Time'))
df_timestamp.show()
df_timestamp.printSchema()

+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|
+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|01/02/1965|2024-02-05 13:44:18|  19.246|  145.616|Earthqu

### 	3.Filter the dataset to include only earthquakes with a magnitude greater than 5.0.

In [121]:
df_filter = df_timestamp.filter('Magnitude>5.0')
df_filter.show()

+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|
+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|01/02/1965|2024-02-05 13:44:18|  19.246|  145.616|Earthqu

### 	4.Calculate the average depth and magnitude of earthquakes for each earthquake type.

In [122]:
 df_avg= df_filter.groupBy('Type').avg('Depth','Magnitude')
 df_avg.show()

+-----------------+-----------------+-----------------+
|             Type|       avg(Depth)|   avg(Magnitude)|
+-----------------+-----------------+-----------------+
|        Explosion|              0.0|             5.85|
|       Rock Burst|              1.0|              6.2|
|Nuclear Explosion|              0.3|5.850685714285718|
|       Earthquake|71.31391348140497|5.882762568870756|
+-----------------+-----------------+-----------------+



### 	5.Implement a UDF to categorize the earthquakes into levels (e.g., Low, Moderate, High) based on their magnitudes.

In [123]:
from pyspark.sql.types import *
df_udf = df.withColumn("magnitude", df["magnitude"].cast(FloatType()))
df_udf.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Depth Error: double (nullable = true)
 |-- Depth Seismic Stations: integer (nullable = true)
 |-- magnitude: float (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- Magnitude Error: double (nullable = true)
 |-- Magnitude Seismic Stations: integer (nullable = true)
 |-- Azimuthal Gap: double (nullable = true)
 |-- Horizontal Distance: double (nullable = true)
 |-- Horizontal Error: double (nullable = true)
 |-- Root Mean Square: double (nullable = true)
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Location Source: string (nullable = true)
 |-- Magnitude Source: string (nullable = true)
 |-- Status: string (nullable = true)



In [124]:
def categorize_mag(magnitude):
    if magnitude < 4.0:
        return "Low"
    elif 4.0 <= magnitude < 6.0:
        return "Moderate"
    else:
        return "High"

In [125]:
category_udf = udf(categorize_mag, StringType())

mag_cate_df = df.withColumn('Magnitude_category', category_udf('magnitude'))
mag_cate_df.show()

+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+------------------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|Magnitude_category|
+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+------------------+
|

###	6.Calculate the distance of each earthquake from a reference location (e.g., (0, 0)).

In [126]:
from pyspark.sql.functions import col, lit, sqrt

earthquakes_distance = df.withColumn("distance_from_reference",sqrt(col("Latitude")**2 + col("Longitude")**2) )

# earthquakes_distance.select("Latitude", "Longitude", "distance_from_reference").show()
earthquakes_distance.show()


+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-----------------------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|distance_from_reference|
+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-----------