In [1]:
import findspark
findspark.init()
import pymongo
import pandas as pd



In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession\
    .builder\
    .master('local[2]')\
    .appName('quake_etl')\
    .config('spark.jars.package', 'org.mongodb.spark:mongo-spark-connector_2.12:2.4.1')\
    .getOrCreate()
    

In [3]:
#Loading dataset 
df_load = spark.read.csv(r"C:\Users\LOQ\Desktop\BDT\database.csv",header = True)

#previewing dataset
df_load.take(1)

[Row(Date='01/02/1965', Time='13:44:18', Latitude='19.246', Longitude='145.616', Type='Earthquake', Depth='131.6', Depth Error=None, Depth Seismic Stations=None, Magnitude='6', Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860706', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic')]

In [4]:
#removing fields of less significance
lst_dropped_columns = ['Depth Error', 'Depth Seismic Stations','Magnitude Error','Magnitude Seismic Stations','Azimuthal Gap', 'Horizontal Distance','Horizontal Error',
    'Root Mean Square','Source','Location Source','Magnitude Source','Status']

df_load = df_load.drop("lst_dropped_columns")

df_load.show(5)

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|          ID|Source|Location Source|Magnitude Source|   Status|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       NULL|                  NULL|        6|            MW|       

In [5]:
# Creating a year field and add it to the df_load dataframe
df_load = df_load.withColumn ('Year', year(to_timestamp('Date', 'dd/MM/yyyy')))

df_load.show(5)

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+----+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|          ID|Source|Location Source|Magnitude Source|   Status|Year|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+----+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       NULL|                  NULL|        6|       

In [6]:
# Creating the quakes freq dataframe from the year
df_quake_freq = df_load.groupBy('Year').count().withColumnRenamed('count', 'Counts')

df_quake_freq.show

<bound method DataFrame.show of DataFrame[Year: int, Counts: bigint]>

In [7]:
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Depth Error: string (nullable = true)
 |-- Depth Seismic Stations: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- Magnitude Error: string (nullable = true)
 |-- Magnitude Seismic Stations: string (nullable = true)
 |-- Azimuthal Gap: string (nullable = true)
 |-- Horizontal Distance: string (nullable = true)
 |-- Horizontal Error: string (nullable = true)
 |-- Root Mean Square: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Location Source: string (nullable = true)
 |-- Magnitude Source: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Year: integer (nullable = true)



In [8]:
# Transformation of fields from string to double
df_load = df_load.withColumn('Latitude', df_load['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_load['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_load['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_load['Magnitude'].cast(DoubleType()))

df_load.show(5)


+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+----+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|          ID|Source|Location Source|Magnitude Source|   Status|Year|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+----+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       NULL|                  NULL|      6.0|       

In [9]:
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Depth Error: string (nullable = true)
 |-- Depth Seismic Stations: string (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- Magnitude Error: string (nullable = true)
 |-- Magnitude Seismic Stations: string (nullable = true)
 |-- Azimuthal Gap: string (nullable = true)
 |-- Horizontal Distance: string (nullable = true)
 |-- Horizontal Error: string (nullable = true)
 |-- Root Mean Square: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Location Source: string (nullable = true)
 |-- Magnitude Source: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Year: integer (nullable = true)



In [10]:
# Creating avg and max magnitude fields
df_max = df_load.groupBy('Year').max('Magnitude').withColumnRenamed('max(Mgnitude)', 'Max_Magnitude')

df_avg = df_load.groupBy('Year').avg('Magnitude').withColumnRenamed('avg(Mgnitude)', 'Avg_Magnitude')

# Join the max and avg dfs to df_quake_freq
df_quake_freq = df_quake_freq.join(df_avg, ['Year']).join(df_max, ['Year'])

df_quake_freq.show(5)

+----+------+-----------------+--------------+
|Year|Counts|   avg(Magnitude)|max(Magnitude)|
+----+------+-----------------+--------------+
|1990|   196|5.858163265306125|           7.6|
|1975|   150| 5.84866666666667|           7.8|
|1977|   148|5.757432432432437|           7.6|
|2003|   187|5.850802139037435|           7.6|
|2007|   211| 5.89099526066351|           8.4|
+----+------+-----------------+--------------+
only showing top 5 rows



In [11]:
df_load.show(5)

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+----+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|          ID|Source|Location Source|Magnitude Source|   Status|Year|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+----+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       NULL|                  NULL|      6.0|       

In [12]:
df_quake_freq.show(5)

+----+------+-----------------+--------------+
|Year|Counts|   avg(Magnitude)|max(Magnitude)|
+----+------+-----------------+--------------+
|1990|   196|5.858163265306125|           7.6|
|1975|   150| 5.84866666666667|           7.8|
|1977|   148|5.757432432432437|           7.6|
|2003|   187|5.850802139037435|           7.6|
|2007|   211| 5.89099526066351|           8.4|
+----+------+-----------------+--------------+
only showing top 5 rows



In [13]:
print(df_load.show())


+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+----+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|                  ID|   Source|Location Source|Magnitude Source|   Status|Year|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+--------------------+---------+---------------+----------------+---------+----+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       NULL|       

In [14]:
df_load.dropna()
df_quake_freq.dropna()

DataFrame[Year: int, Counts: bigint, avg(Magnitude): double, max(Magnitude): double]