## Aidetic Data Engineer - Assignment Pyspark 2023

### Assignment Description
In this assignment, I will demonstrate my ability to work with Pyspark

# Assignment Instructions:
#### ● Set up your development environment with the necessary libraries.
#### ● Access the provided dataset from here and save it into your local directory.
#### ● Setup a local mysql database.
#### ● Create a new table named neic_earthquakes.
#### ● Run a python script to read the given data and push the data into the neic_earthquakes table.
#### ● Read the data from the table into a PySpark DataFrame and answer the following questions:
##### ○ How does the Day of a Week affect the number of earthquakes?
##### ○ What is the relation between Day of the month and Number of earthquakes that happened in a year?
##### ○ What does the average frequency of earthquakes in a month from the year 1965 to 2016 tell us?
##### ○ What is the relation between Year and Number of earthquakes that happened in that year?
##### ○ How has the earthquake magnitude on average been varied over the years?
##### ○ How does year impact the standard deviation of the earthquakes?
##### ○ Does geographic location have anything to do with earthquakes?
##### ○ Where do earthquakes occur very frequently?
##### ○ What is the relation between Magnitude, Magnitude Type , Status and Root Mean Square of the earthquakes?

## 1. Install Libraries

In [1]:
pip install pandas pymysql findspark

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


## Python Script to Load Data into MySQL

Lets create a Python script, load_data.py, to read the provided data and push it into the MySQL database. Use the pandas library for reading data and pymysql for database connectivity.

In [2]:
import pandas as pd
import pymysql
from sqlalchemy import create_engine

# Read data into a pandas DataFrame
data = pd.read_csv(r'E:\neic_earthquake\database.csv')

# Connect to MySQL
#engine = create_engine('mysql+pymysql://root:"Nithin@2001"@localhost/MySQL')
engine = create_engine("mysql+pymysql://root:Nithin2001@localhost/MySQL")

# Push data to MySQL
data.to_sql('neic_earthquakes', con=engine, if_exists='replace', index=False)

23412

### PySpark Analysis:

Create another Python script to analyze the data using PySpark. Lets call it analyze_data.py.

In [3]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("EarthquakeAnalysis").getOrCreate()

In [4]:
spark

In [5]:
df = spark.read.csv("E:\\neic_earthquake\\database.csv",header=True,inferSchema=True)

In [6]:
df.show()

+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|      Date|               Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|
+----------+-------------------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+
|01/02/1965|2023-11-21 13:44:18|  19.246|  145.616|Earthqu

In [7]:
df.head(2)

[Row(Date='01/02/1965', Time=datetime.datetime(2023, 11, 21, 13, 44, 18), Latitude=19.246, Longitude=145.616, Type='Earthquake', Depth=131.6, Depth Error=None, Depth Seismic Stations=None, Magnitude=6.0, Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860706', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic'),
 Row(Date='01/04/1965', Time=datetime.datetime(2023, 11, 21, 11, 29, 49), Latitude=1.863, Longitude=127.352, Type='Earthquake', Depth=80.0, Depth Error=None, Depth Seismic Stations=None, Magnitude=5.8, Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860737', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic')]

In [8]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Depth Error: double (nullable = true)
 |-- Depth Seismic Stations: integer (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- Magnitude Error: double (nullable = true)
 |-- Magnitude Seismic Stations: integer (nullable = true)
 |-- Azimuthal Gap: double (nullable = true)
 |-- Horizontal Distance: double (nullable = true)
 |-- Horizontal Error: double (nullable = true)
 |-- Root Mean Square: double (nullable = true)
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Location Source: string (nullable = true)
 |-- Magnitude Source: string (nullable = true)
 |-- Status: string (nullable = true)



In [9]:
df.columns

['Date',
 'Time',
 'Latitude',
 'Longitude',
 'Type',
 'Depth',
 'Depth Error',
 'Depth Seismic Stations',
 'Magnitude',
 'Magnitude Type',
 'Magnitude Error',
 'Magnitude Seismic Stations',
 'Azimuthal Gap',
 'Horizontal Distance',
 'Horizontal Error',
 'Root Mean Square',
 'ID',
 'Source',
 'Location Source',
 'Magnitude Source',
 'Status']

In [10]:
from pyspark.sql.functions import dayofweek, dayofmonth, month, year, avg, stddev, count, desc

In [11]:
# How does the Day of a Week affect the number of earthquakes?
df_day_of_week = df.groupBy(dayofweek("Date").alias("day_of_week")).count().orderBy("day_of_week")


In [12]:
# What is the relation between Day of the month and Number of earthquakes that happened in a year?
df_day_of_month = df.groupBy(year("Date").alias("year"), dayofmonth("Date").alias("day_of_month")).count().orderBy("year", "day_of_month")


In [13]:
# What does the average frequency of earthquakes in a month from the year 1965 to 2016 tell us?
df_avg_frequency = df.filter((year("Date") >= 1965) & (year("Date") <= 2016)).groupBy(month("Date").alias("month")).agg(avg("Magnitude").alias("avg_Magnitude")).orderBy("month")


In [14]:
# What is the relation between Year and Number of earthquakes that happened in that year?
df_yearly_count = df.groupBy(year("Date").alias("year")).agg(count("*").alias("earthquake_count")).orderBy("year")


In [15]:
# How has the earthquake magnitude on average been varied over the years?
df_avg_magnitude_over_years = df.groupBy(year("Date").alias("year")).agg(avg("Magnitude").alias("avg_Magnitude")).orderBy("year")


In [16]:
# How does year impact the standard deviation of the earthquakes?
df_stddev_over_years = df.groupBy(year("Date").alias("year")).agg(stddev("Magnitude").alias("Magnitude_stddev")).orderBy("year")

In [17]:
# Does geographic location have anything to do with earthquakes?
# Note: You may need to adapt this based on your specific dataset columns related to geographic location.
df_geo_analysis = df.groupBy("Latitude", "Longitude").agg(count("*").alias("earthquake_count")).orderBy(desc("earthquake_count"))


In [18]:
# Where do earthquakes occur very frequently?
df_frequent_locations = df.groupBy("Latitude", "Longitude").agg(count("*").alias("earthquake_count")).orderBy(desc("earthquake_count"))


In [19]:
# What is the relation between Magnitude, Magnitude Type, Status, and Root Mean Square of the earthquakes?
df_magnitude_relation = df.groupBy("Magnitude", "Magnitude Type", "Status", "Root Mean Square").count().orderBy(desc("count"))


In [20]:
# Print or save the results as needed
df_day_of_week.show()
df_day_of_month.show()
df_avg_frequency.show()
df_yearly_count.show()
df_avg_magnitude_over_years.show()
df_stddev_over_years.show()
df_geo_analysis.show()
df_frequent_locations.show()
df_magnitude_relation.show()

+-----------+-----+
|day_of_week|count|
+-----------+-----+
|       NULL|23409|
|          1|    3|
+-----------+-----+

+----+------------+-----+
|year|day_of_month|count|
+----+------------+-----+
|NULL|        NULL|23409|
|1975|          23|    1|
|1985|          28|    1|
|2011|          13|    1|
+----+------------+-----+

+-----+-------------+
|month|avg_Magnitude|
+-----+-------------+
|    2|          5.6|
|    3|          5.8|
|    4|          5.6|
+-----+-------------+

+----+----------------+
|year|earthquake_count|
+----+----------------+
|NULL|           23409|
|1975|               1|
|1985|               1|
|2011|               1|
+----+----------------+

+----+-----------------+
|year|    avg_Magnitude|
+----+-----------------+
|NULL|5.882558417702829|
|1975|              5.6|
|1985|              5.6|
|2011|              5.8|
+----+-----------------+

+----+------------------+
|year|  Magnitude_stddev|
+----+------------------+
|NULL|0.4230843439717061|
|1975|           

In [22]:
# Stop Spark session
spark.stop()