In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession  
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Configure spark session
spark = SparkSession\
    .builder\
    .master('local[2]')\
    .appName('quake_etl')\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:2.4.1')\
    .getOrCreate()

In [3]:
#load the date file

df_load = spark.read.csv(r"C:\Users\romai\Dropbox\Data Engineer\Project\database.csv" ,header=True)

#Perview of df_Load

df_load.take(1)

[Row(Date='01/02/1965', Time='13:44:18', Latitude='19.246', Longitude='145.616', Type='Earthquake', Depth='131.6', Depth Error=None, Depth Seismic Stations=None, Magnitude='6', Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860706', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic')]

In [4]:
#Drop fields that not needed. 
lst_dropped_columns = ['Depth Error', 'Time', 'Depth Seismic Stations','Magnitude Error','Magnitude Seismic Stations','Azimuthal Gap', 'Horizontal Distance','Horizontal Error',
    'Root Mean Square','Source','Location Source','Magnitude Source','Status']
df_load = df_load.drop(*lst_dropped_columns)

#perview table after drop columns

df_load.show(4)


+----------+--------+---------+----------+-----+---------+--------------+------------+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|
+----------+--------+---------+----------+-----+---------+--------------+------------+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|
+----------+--------+---------+----------+-----+---------+--------------+------------+
only showing top 4 rows



In [5]:
#Create a year field and add it to dataframe

df_load = df_load.withColumn('Year',year(to_timestamp('Date','dd/mm/yyyy')))

#Perview the dataframe

df_load.show(4)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 4 rows



In [6]:
#Display earthquakes frequency dataframe withthe use of year field and count foe each year

df_quake_freq = df_load.groupBy('Year').count().withColumnRenamed('count','Count')

#Perview df-quake_freq
df_quake_freq.show(4)

+----+-----+
|Year|Count|
+----+-----+
|1990|  528|
|1975|  411|
|1977|  425|
|2003|  485|
+----+-----+
only showing top 4 rows



In [7]:
#Perview df_load schema

df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [8]:
#Tansform some field from string to numeric types

df_load =df_load.withColumn('Latitude', df_load['Latitude'].cast(DoubleType()))\
.withColumn('Longitude', df_load['Longitude'].cast(DoubleType()))\
.withColumn('Depth', df_load['Depth'].cast(DoubleType()))\
.withColumn('Magnitude', df_load['Magnitude'].cast(DoubleType()))

#Perview dataframe and data schema
df_load.show(4)
df_load.printSchema()

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 4 rows

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-

In [9]:
# Create avg magnitude and max magnitude fields and add to df_quake_freq

df_max = df_load.groupBy('Year').max('Magnitude').withColumnRenamed('max(Magnitude)','Max_Magnitude')
df_avg = df_load.groupBy('Year').avg('Magnitude').withColumnRenamed('avg(Magnitude)','Avg_Magnitude')

df_max.show(4),
df_avg.show(4)


+----+-------------+
|Year|Max_Magnitude|
+----+-------------+
|1990|          7.8|
|1975|          7.9|
|1977|          7.9|
|2003|          8.3|
+----+-------------+
only showing top 4 rows

+----+-----------------+
|Year|    Avg_Magnitude|
+----+-----------------+
|1990|5.860624999999987|
|1975|  5.8488807785888|
|1977|5.783764705882346|
|2003|5.885731958762881|
+----+-----------------+
only showing top 4 rows



In [10]:
#Join df_max and df_avg
df_quake_freq = df_quake_freq.join(df_avg,['Year']).join(df_max,['Year'])

df_quake_freq.show(4)

+----+-----+-----------------+-------------+
|Year|Count|    Avg_Magnitude|Max_Magnitude|
+----+-----+-----------------+-------------+
|1990|  528|5.860624999999987|          7.8|
|1975|  411|  5.8488807785888|          7.9|
|1977|  425|5.783764705882346|          7.9|
|2003|  485|5.885731958762881|          8.3|
+----+-----+-----------------+-------------+
only showing top 4 rows



In [11]:
#Drop nulls
df_load.dropna()
df_quake_freq.dropna()

DataFrame[Year: int, Count: bigint, Avg_Magnitude: double, Max_Magnitude: double]

In [12]:
#Preview dataframe
df_load.show(4)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 4 rows



In [13]:
df_quake_freq.show(4)

+----+-----+-----------------+-------------+
|Year|Count|    Avg_Magnitude|Max_Magnitude|
+----+-----+-----------------+-------------+
|1990|  528|5.860624999999987|          7.8|
|1975|  411|  5.8488807785888|          7.9|
|1977|  425|5.783764705882346|          7.9|
|2003|  485|5.885731958762881|          8.3|
+----+-----+-----------------+-------------+
only showing top 4 rows



In [15]:
#Build the collection in Mongodb from a static data source

df_load.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.quakes').save()

In [16]:
# Write df_quake_freq to mongodb
df_quake_freq.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.quake_freq').save()

# Section Machine Learning Implementation 

In [23]:
#Load test data into dataframe
df_test = spark.read.csv(r"C:\Users\romai\Dropbox\Data Engineer\Project\query.csv", header=True)

#Preview df_test
df_test.take(1)

[Row(time='2017-01-02T00:13:06.300Z', latitude='-36.0365', longitude='51.9288', depth='10', mag='5.7', magType='mwb', nst=None, gap='26', dmin='14.685', rms='1.37', net='us', id='us10007p5d', updated='2017-03-27T23:53:17.040Z', place='Southwest Indian Ridge', type='earthquake', horizontalError='10.3', depthError='1.7', magError='0.068', magNst='21', status='reviewed', locationSource='us', magSource='us')]

In [None]:
# Load the train data from mongo into dataframe 