In [1]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

### Below command should be run from command window before starting Spark session here to be able to succusfully connect to mongo db

pyspark --conf 'spark.mongodb.input.uri=mongodb+srv://username:password@cluster0.iax__.mongodb.net/quake.quakes?readPreference=primaryPreferred' --conf 'spark.mongodb.output.uri=mongodb+srv://username:password@cluster0.iax__.mongodb.net/quake.quakes' --packages org.mongodb.spark:mongo-spark-connector_2.12:2.4.1

In [2]:
spark = SparkSession \
    .builder \
    .master('local[3]') \
    .appName('quake_etl') \
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:2.4.1') \
    .getOrCreate()
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


In [3]:
#Load dataset
df_load = spark.read.csv(r"database.csv",header=True)
df_load.toPandas()

Unnamed: 0,Date,Time,Latitude,Longitude,Type,Depth,Depth Error,Depth Seismic Stations,Magnitude,Magnitude Type,...,Magnitude Seismic Stations,Azimuthal Gap,Horizontal Distance,Horizontal Error,Root Mean Square,ID,Source,Location Source,Magnitude Source,Status
0,01/02/1965,13:44:18,19.246,145.616,Earthquake,131.6,,,6,MW,...,,,,,,ISCGEM860706,ISCGEM,ISCGEM,ISCGEM,Automatic
1,01/04/1965,11:29:49,1.863,127.352,Earthquake,80,,,5.8,MW,...,,,,,,ISCGEM860737,ISCGEM,ISCGEM,ISCGEM,Automatic
2,01/05/1965,18:05:58,-20.579,-173.972,Earthquake,20,,,6.2,MW,...,,,,,,ISCGEM860762,ISCGEM,ISCGEM,ISCGEM,Automatic
3,01/08/1965,18:49:43,-59.076,-23.557,Earthquake,15,,,5.8,MW,...,,,,,,ISCGEM860856,ISCGEM,ISCGEM,ISCGEM,Automatic
4,01/09/1965,13:32:50,11.938,126.427,Earthquake,15,,,5.8,MW,...,,,,,,ISCGEM860890,ISCGEM,ISCGEM,ISCGEM,Automatic
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
23407,12/28/2016,08:22:12,38.3917,-118.8941,Earthquake,12.3,1.2,40,5.6,ML,...,18,42.47,0.12,,0.1898,NN00570710,NN,NN,NN,Reviewed
23408,12/28/2016,09:13:47,38.3777,-118.8957,Earthquake,8.8,2,33,5.5,ML,...,18,48.58,0.129,,0.2187,NN00570744,NN,NN,NN,Reviewed
23409,12/28/2016,12:38:51,36.9179,140.4262,Earthquake,10,1.8,,5.9,MWW,...,,91,0.992,4.8,1.52,US10007NAF,US,US,US,Reviewed
23410,12/29/2016,22:30:19,-9.0283,118.6639,Earthquake,79,1.8,,6.3,MWW,...,,26,3.553,6,1.43,US10007NL0,US,US,US,Reviewed


In [4]:
# Drop fileds we dont need from df_load
list_dropped_columns = ['Depth Error', 'Time', 'Depth Seismic Stations','Magnitude Error','Magnitude Seismic Stations','Azimuthal Gap', 'Horizontal Distance','Horizontal Error',
    'Root Mean Square','Source','Location Source','Magnitude Source','Status']

df_load = df_load.drop(*list_dropped_columns)
df_load.toPandas()

Unnamed: 0,Date,Latitude,Longitude,Type,Depth,Magnitude,Magnitude Type,ID
0,01/02/1965,19.246,145.616,Earthquake,131.6,6,MW,ISCGEM860706
1,01/04/1965,1.863,127.352,Earthquake,80,5.8,MW,ISCGEM860737
2,01/05/1965,-20.579,-173.972,Earthquake,20,6.2,MW,ISCGEM860762
3,01/08/1965,-59.076,-23.557,Earthquake,15,5.8,MW,ISCGEM860856
4,01/09/1965,11.938,126.427,Earthquake,15,5.8,MW,ISCGEM860890
...,...,...,...,...,...,...,...,...
23407,12/28/2016,38.3917,-118.8941,Earthquake,12.3,5.6,ML,NN00570710
23408,12/28/2016,38.3777,-118.8957,Earthquake,8.8,5.5,ML,NN00570744
23409,12/28/2016,36.9179,140.4262,Earthquake,10,5.9,MWW,US10007NAF
23410,12/29/2016,-9.0283,118.6639,Earthquake,79,6.3,MWW,US10007NL0


In [5]:
#Create year field and add it to the dataframe
df_load = df_load.withColumn('Year', year(to_timestamp('Date', 'dd/MM/yyyy')))
df_load.show(3)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 3 rows



In [6]:
#Build the quakes frequency dataframe using the year field and counts for each year
df_quake_freq = df_load.groupBy('Year').count().withColumnRenamed('count', 'Counts')
#Preview df_quake_freq
df_quake_freq.show(5)

+----+------+
|Year|Counts|
+----+------+
|1990|   196|
|1975|   150|
|1977|   148|
|2003|   187|
|2007|   211|
+----+------+
only showing top 5 rows



In [7]:
#Print schema
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [8]:
# Transform schema
df_load = df_load.withColumn('Latitude', df_load['Latitude'].cast(DoubleType())) \
                 .withColumn('Longitude', df_load['Longitude'].cast(DoubleType())) \
                 .withColumn('Depth', df_load['Depth'].cast(DoubleType())) \
                 .withColumn('Magnitude', df_load['Magnitude'].cast(DoubleType()))

df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [9]:
#Create average magnitude and maximum magniuted fields and add to df_quake_freq
df_max = df_load.groupBy('year').max('Magnitude').withColumnRenamed('max(Magnitude)', 'Max_Magnitude')
df_avg = df_load.groupBy('year').avg('Magnitude').withColumnRenamed('avg(Magnitude)', 'Avg_Magnitude')

df_max.show(3)
df_avg.show(3)

+----+-------------+
|year|Max_Magnitude|
+----+-------------+
|1990|          7.6|
|1975|          7.8|
|1977|          7.6|
+----+-------------+
only showing top 3 rows

+----+-----------------+
|year|    Avg_Magnitude|
+----+-----------------+
|1990|5.858163265306125|
|1975| 5.84866666666667|
|1977|5.757432432432437|
+----+-----------------+
only showing top 3 rows



In [10]:
#Join df_max, df_avg and df_quake_freq
df_quake_freq = df_quake_freq.join(df_avg, ['Year']).join(df_max, ['Year'])

df_quake_freq.show(3)

+----+------+-----------------+-------------+
|Year|Counts|    Avg_Magnitude|Max_Magnitude|
+----+------+-----------------+-------------+
|1990|   196|5.858163265306125|          7.6|
|1975|   150| 5.84866666666667|          7.8|
|1977|   148|5.757432432432437|          7.6|
+----+------+-----------------+-------------+
only showing top 3 rows



In [11]:
#Remove null values
df_load.dropna()
df_quake_freq.dropna()

DataFrame[Year: int, Counts: bigint, Avg_Magnitude: double, Max_Magnitude: double]

In [12]:
#Preview dataframe
df_load.show(5)
df_quake_freq.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake| 15.0|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows

+----+------+-----------------+-------------+
|Year|Counts|    Avg_Magnitude|Max_Magnitude|
+----+------+-----------------+-------------+
|1990|   

In [13]:
#Build the tables and collections in mongodb
#Write df_load to mongodb
df_load.write.format('mongo') \
    .mode('overwrite') \
    .option('spark.mongodb.output.uri', 'mongodb+srv://username:password@cluster0.abcd.mongodb.net/quake.quakes') \
    .save()

In [14]:
#Write df_quake_freq to mongodb
df_quake_freq.write.format('mongo') \
    .mode('overwrite') \
    .option('spark.mongodb.output.uri', 'mongodb+srv://username:password@cluster0.abcd.mongodb.net/quake.quake_freq') \
    .save()

In [15]:
"""
Section: Machine Learning with Spark
"""

'\nSection: Machine Learning with Spark\n'

In [16]:
#Load the test data file into a dataframe
df_test = spark.read.csv('query.csv', header=True)
df_test.limit(5).toPandas()

Unnamed: 0,time,latitude,longitude,depth,mag,magType,nst,gap,dmin,rms,...,updated,place,type,horizontalError,depthError,magError,magNst,status,locationSource,magSource
0,2017-01-02T00:13:06.300Z,-36.0365,51.9288,10.0,5.7,mwb,,26,14.685,1.37,...,2017-03-27T23:53:17.040Z,Southwest Indian Ridge,earthquake,10.3,1.7,0.068,21.0,reviewed,us,us
1,2017-01-02T13:13:48.710Z,-4.895,-76.3675,106.0,5.9,mww,,31,3.002,0.82,...,2020-01-02T23:56:16.978Z,"37km E of Barranca, Peru",earthquake,7.1,1.9,,,reviewed,us,us
2,2017-01-02T13:14:02.830Z,-23.2513,179.2383,551.62,6.3,mww,,36,5.59,0.88,...,2017-03-27T23:53:18.040Z,South of the Fiji Islands,earthquake,9.9,3.3,0.05,38.0,reviewed,us,us
3,2017-01-03T09:09:02.080Z,24.0151,92.0177,32.0,5.7,mww,,20,1.375,0.86,...,2020-01-02T23:56:29.214Z,"20km ENE of Ambasa, India",earthquake,6.7,1.8,0.071,19.0,reviewed,us,us
4,2017-01-03T21:19:07.540Z,-43.3527,-74.5017,10.26,5.5,mww,,96,0.656,0.91,...,2020-01-02T23:56:57.327Z,"76km WSW of Puerto Quellon, Chile",earthquake,4.1,1.7,0.071,19.0,reviewed,us,us


In [17]:
#Load the training data from mongodb into a dataframe
df_train = spark.read.format('mongo') \
    .option('spark.mongodb.input.uri', 'mongodb+srv://username:password@cluster0.abcd.mongodb.net/quake.quakes') \
    .load()

df_train.limit(5).toPandas()

Unnamed: 0,Date,Depth,ID,Latitude,Longitude,Magnitude,Magnitude Type,Type,Year,_id
0,01/02/1965,131.6,ISCGEM860706,19.246,145.616,6.0,MW,Earthquake,1965,"(605cf0be4aabd41e3de7a888,)"
1,01/04/1965,80.0,ISCGEM860737,1.863,127.352,5.8,MW,Earthquake,1965,"(605cf0be4aabd41e3de7a889,)"
2,01/05/1965,20.0,ISCGEM860762,-20.579,-173.972,6.2,MW,Earthquake,1965,"(605cf0be4aabd41e3de7a88a,)"
3,01/08/1965,15.0,ISCGEM860856,-59.076,-23.557,5.8,MW,Earthquake,1965,"(605cf0be4aabd41e3de7a88b,)"
4,01/09/1965,15.0,ISCGEM860890,11.938,126.427,5.8,MW,Earthquake,1965,"(605cf0be4aabd41e3de7a88c,)"


In [18]:
#Clean test data and discard unnecessary fields
df_test_clean = df_test['time', 'latitude', 'longitude', 'mag', 'depth']

df_test_clean.limit(5).toPandas()

Unnamed: 0,time,latitude,longitude,mag,depth
0,2017-01-02T00:13:06.300Z,-36.0365,51.9288,5.7,10.0
1,2017-01-02T13:13:48.710Z,-4.895,-76.3675,5.9,106.0
2,2017-01-02T13:14:02.830Z,-23.2513,179.2383,6.3,551.62
3,2017-01-03T09:09:02.080Z,24.0151,92.0177,5.7,32.0
4,2017-01-03T21:19:07.540Z,-43.3527,-74.5017,5.5,10.26


In [19]:
# Rename fields
df_test_clean = df_test_clean.withColumnRenamed('time', 'Date') \
        .withColumnRenamed('latitude', 'Latitude') \
        .withColumnRenamed('longitude', 'Longitude') \
        .withColumnRenamed('mag', 'Magnitude') \
        .withColumnRenamed('depth', 'Depth')

df_test_clean.limit(3).toPandas()

Unnamed: 0,Date,Latitude,Longitude,Magnitude,Depth
0,2017-01-02T00:13:06.300Z,-36.0365,51.9288,5.7,10.0
1,2017-01-02T13:13:48.710Z,-4.895,-76.3675,5.9,106.0
2,2017-01-02T13:14:02.830Z,-23.2513,179.2383,6.3,551.62


In [20]:
# Preview Schema
df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Depth: string (nullable = true)



In [21]:
#Adjust schema
df_test_clean = df_test_clean.withColumn('Latitude', df_test_clean['Latitude'].cast(DoubleType())) \
    .withColumn('Longitude', df_test_clean['Longitude'].cast(DoubleType())) \
    .withColumn('Depth', df_test_clean['Depth'].cast(DoubleType())) \
    .withColumn('Magnitude', df_test_clean['Magnitude'].cast(DoubleType()))

df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)



In [22]:
#Create training and testing dataframes
df_testing = df_test_clean['Latitude', 'Longitude', 'Magnitude', 'Depth']
df_training = df_train['Latitude', 'Longitude', 'Magnitude', 'Depth']

In [23]:
#Preview
df_testing.show(3)
df_training.show(3)

+--------+---------+---------+------+
|Latitude|Longitude|Magnitude| Depth|
+--------+---------+---------+------+
|-36.0365|  51.9288|      5.7|  10.0|
|  -4.895| -76.3675|      5.9| 106.0|
|-23.2513| 179.2383|      6.3|551.62|
+--------+---------+---------+------+
only showing top 3 rows

+--------+---------+---------+-----+
|Latitude|Longitude|Magnitude|Depth|
+--------+---------+---------+-----+
|  19.246|  145.616|      6.0|131.6|
|   1.863|  127.352|      5.8| 80.0|
| -20.579| -173.972|      6.2| 20.0|
+--------+---------+---------+-----+
only showing top 3 rows



In [24]:
#Drop records with null values from our dataframes
df_testing = df_testing.dropna()
df_training = df_training.dropna()

In [25]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor, LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [26]:
#Select features to parse into our model and then create feature vector
assembler = VectorAssembler(inputCols=['Latitude', 'Longitude', 'Depth'],
                            outputCol='features')

#Create the model
model_reg = RandomForestRegressor(featuresCol='features', labelCol='Magnitude')

#Chain the assembler with the model in a pipeline
pipeline = Pipeline(stages=[assembler, model_reg])

#Train the model
model = pipeline.fit(df_training)

#Make prediction
pred_results = model.transform(df_testing)

In [27]:
pred_results.show(10)

+--------+---------+---------+------+--------------------+------------------+
|Latitude|Longitude|Magnitude| Depth|            features|        prediction|
+--------+---------+---------+------+--------------------+------------------+
|-36.0365|  51.9288|      5.7|  10.0|[-36.0365,51.9288...| 5.836898542663325|
|  -4.895| -76.3675|      5.9| 106.0|[-4.895,-76.3675,...|5.8701380781481145|
|-23.2513| 179.2383|      6.3|551.62|[-23.2513,179.238...|5.8928803372072025|
| 24.0151|  92.0177|      5.7|  32.0|[24.0151,92.0177,...| 5.871676176957084|
|-43.3527| -74.5017|      5.5| 10.26|[-43.3527,-74.501...| 5.966993794316648|
|-19.3733| 176.0518|      6.9|  12.0|[-19.3733,176.051...| 5.997559794464392|
|-19.3977| 175.9532|      5.7|  10.0|[-19.3977,175.953...| 5.895835635198435|
|-19.1207| 176.1875|      6.0|  10.0|[-19.1207,176.187...| 5.895835635198435|
|-18.9749| 176.2872|      5.5| 19.36|[-18.9749,176.287...|5.9984488797122255|
|-17.8694| 167.1235|      5.6|  22.1|[-17.8694,167.123...| 6.029

In [28]:
#Model evaluation
#RMSE should be less than 0.5 for the model to be useful
evaluator = RegressionEvaluator(labelCol='Magnitude', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(pred_results)
print('Root Mean Square Error on test data = {}'.format(rmse))

Root Mean Square Error on test data = 0.402277773675387


In [29]:
"""
Create the prediction dataset
"""

'\nCreate the prediction dataset\n'

In [30]:
#Create the predcition dataset
df_pred_results = pred_results['Latitude', 'Longitude', 'prediction']

#Rename prediction column
df_pred_results = df_pred_results.withColumnRenamed('prediction', 'Pred_Magnitude')

#Add more columns to our prediction dataframe
df_pred_results = df_pred_results.withColumn('Year', lit(2017)) \
    .withColumn('RMSE', lit(rmse))

#Preview df_pred_results
df_pred_results.show(3)

+--------+---------+------------------+----+-----------------+
|Latitude|Longitude|    Pred_Magnitude|Year|             RMSE|
+--------+---------+------------------+----+-----------------+
|-36.0365|  51.9288| 5.836898542663325|2017|0.402277773675387|
|  -4.895| -76.3675|5.8701380781481145|2017|0.402277773675387|
|-23.2513| 179.2383|5.8928803372072025|2017|0.402277773675387|
+--------+---------+------------------+----+-----------------+
only showing top 3 rows



In [31]:
#Load prediction dataframe into mongodb
#Write df_pred_results
df_pred_results.write.format('mongo') \
    .mode('overwrite') \
    .option('spark.mongodb.output.uri', 'mongodb+srv://username:password@cluster0.abcd.mongodb.net/quake.pred_results') \
    .save()

In [32]:
"""
Section Data Visualization
"""

'\nSection Data Visualization\n'

In [49]:
import pandas as pd
from bokeh.io import output_notebook, output_file
from bokeh.plotting import figure, show, ColumnDataSource
from bokeh.models.tools import HoverTool
import math
from math import pi
from bokeh.palettes import Category20c
from bokeh.transform import cumsum
from bokeh.tile_providers import CARTODBPOSITRON
from bokeh.themes import built_in_themes
from bokeh.io import curdoc
from pymongo import MongoClient

In [34]:
#Create custom read function to read data from mongodb into a dataframe
def read_mongo(username, password, db, collection):
    
    mongo_uri = 'mongodb+srv://{}:{}@cluster0.abcd.mongodb.net/{}.{}'.format(username, password, db, collection)
    
    #Connect to mongodb
    
    conn = MongoClient(mongo_uri)
    db = conn[db]
    
    #Select all records from the collection
    
    cursor = db[collection].find()
    
    #Create the dataframe 
    
    df = pd.DataFrame(list(cursor))
    
    #Delete the _id field
    
    del df['_id']
    
    return df

In [35]:
#Load the datasets from mongodb
df_quakes = read_mongo(username='username', password='password', db='quake', collection="quakes")
df_quake_freq = read_mongo(username='username', password='password', db='quake', collection="quake_freq")
df_quake_pred = read_mongo(username='username', password='password', db='quake', collection="pred_results")

In [36]:
df_quakes.head()

Unnamed: 0,Date,Latitude,Longitude,Type,Depth,Magnitude,Magnitude Type,ID,Year
0,01/02/1965,19.246,145.616,Earthquake,131.6,6.0,MW,ISCGEM860706,1965.0
1,01/04/1965,1.863,127.352,Earthquake,80.0,5.8,MW,ISCGEM860737,1965.0
2,01/05/1965,-20.579,-173.972,Earthquake,20.0,6.2,MW,ISCGEM860762,1965.0
3,01/08/1965,-59.076,-23.557,Earthquake,15.0,5.8,MW,ISCGEM860856,1965.0
4,01/09/1965,11.938,126.427,Earthquake,15.0,5.8,MW,ISCGEM860890,1965.0


In [37]:
df_quakes_2016 = df_quakes[df_quakes['Year'] == 2016]

df_quakes_2016.head()

Unnamed: 0,Date,Latitude,Longitude,Type,Depth,Magnitude,Magnitude Type,ID,Year
22943,01/01/2016,-50.5575,139.4489,Earthquake,10.0,6.3,MWW,US10004ANT,2016.0
22944,01/01/2016,-28.6278,-177.281,Earthquake,34.0,5.8,MWW,US10004AQY,2016.0
22945,01/02/2016,44.8069,129.9406,Earthquake,585.47,5.8,MWW,US10004ATB,2016.0
22946,01/03/2016,24.8036,93.6505,Earthquake,55.0,6.7,MWW,US10004B2N,2016.0
22947,01/05/2016,30.6132,132.7337,Earthquake,4.71,5.8,MWW,US10004BEN,2016.0
