In [2]:
import findspark
findspark.init()

In [3]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [5]:
#Configure Spark
spark = SparkSession\
            .builder\
            .master('local[2]')\
            .appName('quake_etl_ml')\
            .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:2.4.1')\
            .getOrCreate()

In [6]:
#ML
#Data Pre-processing

#Load the test data file into DF

df_test = spark.read.csv(r'query.csv',header=True)

#preview
df_test.take(1)

[Row(time='2017-01-02T00:13:06.300Z', latitude='-36.0365', longitude='51.9288', depth='10', mag='5.7', magType='mwb', nst=None, gap='26', dmin='14.685', rms='1.37', net='us', id='us10007p5d', updated='2017-03-27T23:53:17.040Z', place='Southwest Indian Ridge', type='earthquake', horizontalError='10.3', depthError='1.7', magError='0.068', magNst='21', status='reviewed', locationSource='us', magSource='us')]

In [7]:
#Load the training data from mongo into DF
df_train = spark.read.format('mongo')\
    .option('spark.mongodb.input.uri','mongodb://127.0.0.1:27017/Quake.quakes').load()

#preview
df_train.show(5)

+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|      Date|Depth|          ID|Latitude|Longitude|Magnitude|Magnitude Type|      Type|Year|                 _id|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|01/02/1965|131.6|ISCGEM860706|  19.246|  145.616|      6.0|            MW|Earthquake|1965|[6001d3cd4f44ba5b...|
|01/04/1965| 80.0|ISCGEM860737|   1.863|  127.352|      5.8|            MW|Earthquake|1965|[6001d3cd4f44ba5b...|
|01/05/1965| 20.0|ISCGEM860762| -20.579| -173.972|      6.2|            MW|Earthquake|1965|[6001d3cd4f44ba5b...|
|01/08/1965| 15.0|ISCGEM860856| -59.076|  -23.557|      5.8|            MW|Earthquake|1965|[6001d3cd4f44ba5b...|
|01/09/1965| 15.0|ISCGEM860890|  11.938|  126.427|      5.8|            MW|Earthquake|1965|[6001d3cd4f44ba5b...|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+-----

In [8]:
#select column to thet we need
df_test_clean = df_test['time','Latitude','Longitude','mag','Depth']

#Rename columns
df_test_clean = df_test_clean.withColumnRenamed('time','date')\
                            .withColumnRenamed('mag','Magnitude')

#preview
df_test_clean.show(5)

+--------------------+--------+---------+---------+------+
|                date|Latitude|Longitude|Magnitude| Depth|
+--------------------+--------+---------+---------+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|      5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|      5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|      6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|      5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|      5.5| 10.26|
+--------------------+--------+---------+---------+------+
only showing top 5 rows



In [9]:
#Print df_test_clean schema
df_test_clean.printSchema()

# cost fileds from string to numeric types
df_test_clean = df_test_clean.withColumn('Latitude',df_test_clean['Latitude'].cast(DoubleType()))\
                .withColumn('Longitude',df_test_clean['Longitude'].cast(DoubleType()))\
                .withColumn('Depth',df_test_clean['Depth'].cast(DoubleType()))\
                .withColumn('Magnitude',df_test_clean['Magnitude'].cast(DoubleType()))

#Print df_test_clean schema after change types
df_test_clean.printSchema()

root
 |-- date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Depth: string (nullable = true)

root
 |-- date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)



In [10]:
#create training and testing df
df_testing = df_test_clean['Latitude','Longitude','Magnitude','Depth']
df_training = df_train['Latitude','Longitude','Magnitude','Depth']

In [11]:
#Drop nulls
df_testing = df_testing.dropna()
df_training = df_training.dropna()

In [12]:
#build ML model

# select features to parse into our modek then create the feature vector
assembler = VectorAssembler(inputCols=['Latitude','Longitude','Depth'],outputCol='features')

#create the model
model_reg = RandomForestRegressor(featuresCol='features',labelCol='Magnitude')

# Chain the assemblier with the model in a pipeline
pipeline = Pipeline(stages=[assembler,model_reg])

# Train model
model = pipeline.fit(df_training)

# Make the prediction
pred_resulte = model.transform(df_testing)


In [13]:
#preview pred_resulte df
pred_resulte.show(5)


+--------+---------+---------+------+--------------------+------------------+
|Latitude|Longitude|Magnitude| Depth|            features|        prediction|
+--------+---------+---------+------+--------------------+------------------+
|-36.0365|  51.9288|      5.7|  10.0|[-36.0365,51.9288...| 5.833306409270847|
|  -4.895| -76.3675|      5.9| 106.0|[-4.895,-76.3675,...| 5.877663364698073|
|-23.2513| 179.2383|      6.3|551.62|[-23.2513,179.238...| 5.918379334619855|
| 24.0151|  92.0177|      5.7|  32.0|[24.0151,92.0177,...|5.8978373572569085|
|-43.3527| -74.5017|      5.5| 10.26|[-43.3527,-74.501...| 5.872559885848837|
+--------+---------+---------+------+--------------------+------------------+
only showing top 5 rows



In [14]:
#Evaluate the model
# rmse should be less than 0.5 for model to be useful

evaluator = RegressionEvaluator(labelCol='Magnitude',predictionCol='prediction',metricName='rmse')
rmse = evaluator.evaluate(pred_resulte)
print('Root mean squared Error(RMSE) on test data = %g' %rmse)

Root mean squared Error(RMSE) on test data = 0.401871


In [15]:
#craete the prediction dataset
df_pred_resulte = pred_resulte['Latitude','Longitude','prediction']

df_pred_resulte = df_pred_resulte.withColumnRenamed('prediction','Pred_Magnitude')

#Add columns to pred dataset
df_pred_resulte = df_pred_resulte.withColumn('Year',lit('2017'))\
                    .withColumn('rmse',lit(rmse))

#priview
df_pred_resulte.show()

+--------+---------+------------------+----+------------------+
|Latitude|Longitude|    Pred_Magnitude|Year|              rmse|
+--------+---------+------------------+----+------------------+
|-36.0365|  51.9288| 5.833306409270847|2017|0.4018710001835658|
|  -4.895| -76.3675| 5.877663364698073|2017|0.4018710001835658|
|-23.2513| 179.2383| 5.918379334619855|2017|0.4018710001835658|
| 24.0151|  92.0177|5.8978373572569085|2017|0.4018710001835658|
|-43.3527| -74.5017| 5.872559885848837|2017|0.4018710001835658|
|-19.3733| 176.0518| 5.966636348806326|2017|0.4018710001835658|
|-19.3977| 175.9532| 5.884481024542788|2017|0.4018710001835658|
|-19.1207| 176.1875| 5.884481024542788|2017|0.4018710001835658|
|-18.9749| 176.2872| 5.964495990390334|2017|0.4018710001835658|
|-17.8694| 167.1235| 6.005895726135375|2017|0.4018710001835658|
|-18.7942| 176.2567| 5.889364924849649|2017|0.4018710001835658|
|-22.3176|  -67.795|  5.89005740404204|2017|0.4018710001835658|
| -6.2269| 147.4769| 5.858057952384796|2

In [16]:
#load the predictiob dataset into mongodb
df_pred_resulte.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri','mongodb://127.0.0.1:27017/Quake.pred_results').save()