In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# configuring spark session
spark = SparkSession\
    .builder\
    .master('local[2]')\
    .appName('quake_etl')\
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:2.4.1')\
    .getOrCreate()

# SECTION 1: Data processing with Pyspark and MongoDB
### Data Extraction 

In [3]:
# Load the dataset
df_load =spark.read.csv(r"C:\Users\user\Desktop\DataScience\EarthQuake\database.csv", header=True)
# preview df_load
df_load.take

<bound method DataFrame.take of DataFrame[Date: string, Time: string, Latitude: string, Longitude: string, Type: string, Depth: string, Depth Error: string, Depth Seismic Stations: string, Magnitude: string, Magnitude Type: string, Magnitude Error: string, Magnitude Seismic Stations: string, Azimuthal Gap: string, Horizontal Distance: string, Horizontal Error: string, Root Mean Square: string, ID: string, Source: string, Location Source: string, Magnitude Source: string, Status: string]>

In [4]:
# Droping field we dont need from the dataframe
lst_dropped_columns = ['Depth Error','Depth Seismic Stations','Magnitude Error','Magnitude Seismic Stations','Azimuthal Gap',
                      'Horizontal Distance','Horizontal Error','Root Mean Square','Source','Location Source',
                       'Magnitude Source','Status','Time']
df_load = df_load.drop(*lst_dropped_columns)
# preview df_load
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|
+----------+--------+---------+----------+-----+---------+--------------+------------+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|
+----------+--------+---------+----------+-----+---------+--------------+------------+
only showing top 5 rows



In [5]:
# creating a year field and adding to the dataframe
df_load =df_load.withColumn('Year', year(to_timestamp('Date', 'dd/MM/yyyy')))
# Prreview df_load
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [6]:
# Build the quake frequency dataframe using the year field and counts for each year
df_quake_freq = df_load.groupBy('Year').count().withColumnRenamed('count', 'Counts')
df_quake_freq.show(5)

+----+------+
|Year|Counts|
+----+------+
|1990|   196|
|1975|   150|
|1977|   148|
|2003|   187|
|2007|   211|
+----+------+
only showing top 5 rows



## Data Transformation

In [7]:
# preview df_load schema
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [8]:
# Cast some field from string into numeric types
df_load = df_load.withColumn('Latitude', df_load['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_load['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_load['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_load['Magnitude'].cast(DoubleType()))\

# Preview dataframe again
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake| 15.0|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [9]:
# Preview df_load Schema again
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [10]:
# Creating average magnitude and maximum magnitude for the quake frequency
df_max = df_load.groupBy('Year').max('Magnitude').withColumnRenamed('max(Magnitude)','Max_Magnitude')
df_avg = df_load.groupBy('Year').avg('Magnitude').withColumnRenamed('avg(Magnitude)','Avg_Magnitude')

df_avg.show(5)

+----+-----------------+
|Year|    Avg_Magnitude|
+----+-----------------+
|1990|5.858163265306125|
|1975| 5.84866666666667|
|1977|5.757432432432437|
|2003|5.850802139037435|
|2007| 5.89099526066351|
+----+-----------------+
only showing top 5 rows



In [11]:
# Join df_max and df_avg to df_quake_freq
df_quake_freq = df_quake_freq.join(df_avg, ['Year']).join(df_max,['Year'])
# Preview df_quake_freq
df_quake_freq.show(5)

+----+------+-----------------+-------------+
|Year|Counts|    Avg_Magnitude|Max_Magnitude|
+----+------+-----------------+-------------+
|1990|   196|5.858163265306125|          7.6|
|1975|   150| 5.84866666666667|          7.8|
|1977|   148|5.757432432432437|          7.6|
|2003|   187|5.850802139037435|          7.6|
|2007|   211| 5.89099526066351|          8.4|
+----+------+-----------------+-------------+
only showing top 5 rows



In [12]:
# Remove nulls
df_load.dropna()
df_quake_freq.dropna()

DataFrame[Year: int, Counts: bigint, Avg_Magnitude: double, Max_Magnitude: double]

## Data Loading into MongoDB

In [15]:
# Build the tables/collections in mongodb
# Write df_load to mongodb
df_load.write.format('mongo')\
     .mode('overwrite')\
     .option('spark.mongodb.output.uri','mongodb://127.0.0.1:27017/Quake.quakes').save()

In [16]:
# write df_quake_freq to mongodb
df_quake_freq.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1:27017/Quake.quake_freq').save()

# SECTION 2:Machine Learning with pyspark and MongoDB

## Data Pre-processing and building the machine learning model

In [17]:
#load the test data file into a dataframe
df_test = spark.read.csv(r"C:\Users\user\Desktop\DataScience\EarthQuake\query.csv", header=True)
# preview df_load
df_test.take(1)

[Row(time='2017-01-02T00:13:06.300Z', latitude='-36.0365', longitude='51.9288', depth='10', mag='5.7', magType='mwb', nst=None, gap='26', dmin='14.685', rms='1.37', net='us', id='us10007p5d', updated='2017-03-27T23:53:17.040Z', place='Southwest Indian Ridge', type='earthquake', horizontalError='10.3', depthError='1.7', magError='0.068', magNst='21', status='reviewed', locationSource='us', magSource='us')]

In [19]:
# load the training data from mongo into a dataframe
df_train = spark.read.format('mongo')\
    .option('spark.mongodb.input.uri','mongodb://127.0.0.1:27017/Quake.quakes').load()

# preview train dataset
df_train.show(5)

+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|      Date|Depth|          ID|Latitude|Longitude|Magnitude|Magnitude Type|      Type|Year|                 _id|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|01/02/1965|131.6|ISCGEM860706|  19.246|  145.616|      6.0|            MW|Earthquake|1965|{634595629fd41d24...|
|01/04/1965| 80.0|ISCGEM860737|   1.863|  127.352|      5.8|            MW|Earthquake|1965|{634595629fd41d24...|
|01/05/1965| 20.0|ISCGEM860762| -20.579| -173.972|      6.2|            MW|Earthquake|1965|{634595629fd41d24...|
|01/08/1965| 15.0|ISCGEM860856| -59.076|  -23.557|      5.8|            MW|Earthquake|1965|{634595629fd41d24...|
|01/09/1965| 15.0|ISCGEM860890|  11.938|  126.427|      5.8|            MW|Earthquake|1965|{634595629fd41d24...|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+-----

In [20]:
# Cleaning of testing data and remove fields we dont need
df_test_clean = df_test['time','latitude','longitude','mag','depth']
df_test_clean.show(5)

+--------------------+--------+---------+---+------+
|                time|latitude|longitude|mag| depth|
+--------------------+--------+---------+---+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|5.5| 10.26|
+--------------------+--------+---------+---+------+
only showing top 5 rows



In [21]:
# Renaming the test dataset field to match our training data
df_test_clean = df_test_clean.withColumnRenamed('time','Date')\
    .withColumnRenamed('latitude','Latitude')\
    .withColumnRenamed('longitude','Longitude')\
    .withColumnRenamed('mag','Magnitude')\
    .withColumnRenamed('depth','Depth')\

# previewing the dataframe again
df_test_clean.show(5)

+--------------------+--------+---------+---------+------+
|                Date|Latitude|Longitude|Magnitude| Depth|
+--------------------+--------+---------+---------+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|      5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|      5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|      6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|      5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|      5.5| 10.26|
+--------------------+--------+---------+---------+------+
only showing top 5 rows



In [22]:
# preview schema of test dataset
df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Depth: string (nullable = true)



In [24]:
# changing the data type of some variables into numerical data type
df_test_clean = df_test_clean.withColumn('Latitude', df_test_clean['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_test_clean['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_test_clean['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_test_clean['Magnitude'].cast(DoubleType()))


In [25]:
df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)



In [28]:
# create training and testing dataframes
df_testing = df_test_clean['Latitude','Longitude','Magnitude', 'Depth']
df_training = df_train['Latitude','Longitude','Magnitude', 'Depth']

In [32]:
# preview df_training 
df_training.show(5)
df_training.count()

+--------+---------+---------+-----+
|Latitude|Longitude|Magnitude|Depth|
+--------+---------+---------+-----+
|  19.246|  145.616|      6.0|131.6|
|   1.863|  127.352|      5.8| 80.0|
| -20.579| -173.972|      6.2| 20.0|
| -59.076|  -23.557|      5.8| 15.0|
|  11.938|  126.427|      5.8| 15.0|
+--------+---------+---------+-----+
only showing top 5 rows



23412

In [31]:
# preview df_testing
df_testing.show(5)
df_testing.count()

+--------+---------+---------+------+
|Latitude|Longitude|Magnitude| Depth|
+--------+---------+---------+------+
|-36.0365|  51.9288|      5.7|  10.0|
|  -4.895| -76.3675|      5.9| 106.0|
|-23.2513| 179.2383|      6.3|551.62|
| 24.0151|  92.0177|      5.7|  32.0|
|-43.3527| -74.5017|      5.5| 10.26|
+--------+---------+---------+------+
only showing top 5 rows



397

In [33]:
# Drop records with null values from our dataframes
df_training = df_training.dropna()
df_testing = df_testing.dropna()

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [35]:
# select features to parse into our model and then create the feature vector
assembler = VectorAssembler(inputCols=['Latitude','Longitude','Depth'], outputCol='features')

# create the model
model_reg = RandomForestRegressor(featuresCol='features', labelCol='Magnitude')

# chain the assembler with the model in a pipeline
pipeline = Pipeline(stages=[assembler, model_reg])

#Train the model
model = pipeline.fit(df_training)

# Make the prediction
pred_results = model.transform(df_testing)

In [36]:
# Preview pred_results dataframe
pred_results.show(5)

+--------+---------+---------+------+--------------------+-----------------+
|Latitude|Longitude|Magnitude| Depth|            features|       prediction|
+--------+---------+---------+------+--------------------+-----------------+
|-36.0365|  51.9288|      5.7|  10.0|[-36.0365,51.9288...|5.827504771162459|
|  -4.895| -76.3675|      5.9| 106.0|[-4.895,-76.3675,...|5.894648370207244|
|-23.2513| 179.2383|      6.3|551.62|[-23.2513,179.238...|5.898069295971082|
| 24.0151|  92.0177|      5.7|  32.0|[24.0151,92.0177,...|5.888635235631126|
|-43.3527| -74.5017|      5.5| 10.26|[-43.3527,-74.501...|5.966163600421247|
+--------+---------+---------+------+--------------------+-----------------+
only showing top 5 rows



In [37]:
# Evaluation of the model
# rmse should be less than 0.5 for the model to be useful
evaluator = RegressionEvaluator(labelCol='Magnitude', predictionCol='prediction', metricName ='rmse')
rmse = evaluator.evaluate(pred_results)
print('Root Mean Square (RMSE) on test data = %g' % rmse)

Root Mean Square (RMSE) on test data = 0.403477


## Prediction Dataset Creation

In [38]:
# Create the prediction dataset
df_pred_results = pred_results['Latitude','Longitude', 'prediction']

#Rename the prediction field
df_pred_results = df_pred_results.withColumnRenamed('prediction', 'Pred_Magnitude')

# Add more columns to our prediction dataset
df_pred_results = df_pred_results.withColumn('Year', lit(2017))\
    .withColumn('RMSE', lit(rmse))
# Preview df_pred_results
df_pred_results.show(5)

+--------+---------+-----------------+----+------------------+
|Latitude|Longitude|   Pred_Magnitude|Year|              RMSE|
+--------+---------+-----------------+----+------------------+
|-36.0365|  51.9288|5.827504771162459|2017|0.4034772789700646|
|  -4.895| -76.3675|5.894648370207244|2017|0.4034772789700646|
|-23.2513| 179.2383|5.898069295971082|2017|0.4034772789700646|
| 24.0151|  92.0177|5.888635235631126|2017|0.4034772789700646|
|-43.3527| -74.5017|5.966163600421247|2017|0.4034772789700646|
+--------+---------+-----------------+----+------------------+
only showing top 5 rows



In [40]:
# Load the prediction dataset into mongo db
# write df_pred_results
df_pred_results.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.pred_results').save()