<a href="https://colab.research.google.com/github/akhil64/Pyspark_exercise/blob/main/pyspark_assessment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

database.csv  spark-3.1.1-bin-hadoop3.2      spark-3.1.1-bin-hadoop3.2.tgz.1
sample_data   spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
mydata = spark.read.csv("database.csv",header= True)
mydata.show()

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       null|                  null

In [None]:
from pyspark.sql.functions import *

df = mydata.withColumn("Timestamp", to_timestamp(concat_ws(" ", col("Date"), col("Time")), "M/d/yyyy H:mm:ss"))

df=df.drop("Date","Time")

df.show()

+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|
+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|  19.246|  145.616|Earthquake|131.6|       null|                  null|        6|         

In [None]:
df = df.filter(mydata.Magnitude > 5.0)
df.show()

+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|
+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+
|  19.246|  145.616|Earthquake|131.6|       null|                  null|        6|         

In [None]:
import pyspark.sql.functions as sqlfunc
df1 = mydata.groupBy("Type").agg(sqlfunc.avg("Depth"),sqlfunc.avg("Magnitude"))
df1.show()

+-----------------+-----------------+-----------------+
|             Type|       avg(Depth)|   avg(Magnitude)|
+-----------------+-----------------+-----------------+
|        Explosion|              0.0|             5.85|
|       Rock Burst|              1.0|              6.2|
|Nuclear Explosion|              0.3|5.850685714285718|
|       Earthquake|71.31391348140497|5.882762568870756|
+-----------------+-----------------+-----------------+



In [None]:
def categorize_level(type,magnitude):
  if type == "Earthquake" and magnitude < 3.0:
    return "Low"
  elif type == "Earthquake" and magnitude >= 3.0 and magnitude <= 6.0:
    return "Moderate"
  elif type == "Earthquake" and magnitude >= 6.0:
    return "High"

categorize_level_udf = udf(categorize_level,  StringType())

df = df.withColumn("Level", categorize_level_udf("Type", mydata.Magnitude.cast("float")))


df.show()




+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+--------+
|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|   Level|
+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+--------+
|  19.246|  145.616|Earthquake|131.6|       null|               

In [None]:


reference_location = (0, 0)

# Calculate distance using Pythagorean theorem
df = df.withColumn("Distance", sqrt(pow(col("Latitude") - reference_location[0], 2) + pow(col("Longitude") - reference_location[1], 2)))
df.show()



+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+--------+------------------+
|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|          Timestamp|   Level|          Distance|
+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+-------------------+--------+------------------+
|  19.2

In [None]:
import folium

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = df.toPandas()

# Create a Folium Map centered at (0, 0)
earthquake_map = folium.Map(location=[0, 0], zoom_start=2)

# Add markers for each earthquake
for index, row in pandas_df.iterrows():
    folium.Marker([row['Latitude'], row['Longitude']],
                  popup=f"<b>Type:</b> {row['Type']}<br><b>Magnitude:</b> {row['Magnitude']}",
                  icon=folium.Icon(color='red')).add_to(earthquake_map)

# Save the map to an HTML file
earthquake_map.save("earthquake_map.html")


  series = series.astype(t, copy=False)


In [None]:
df.write.csv("Final_output.csv",header=True)