In [4]:
#----------------------------------------------EXTRACTION-------------------------------------
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

#Configure spark session
spark = SparkSession.builder.master('local[2]').appName('quake_etl').config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:2.4.1').getOrCreate()

# Load the dataset "database.csv"
df_load = spark.read.csv(r"/Users/Elimane/SPARK/data/database.csv", header=True)
#Preview df_load
df_load.show(2)

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|          ID|Source|Location Source|Magnitude Source|   Status|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       null|                  null|        6|            MW|       

In [10]:
#Drop fields we don't need from df_load
lst_dropped_columns = ['Depth Error', 'Time', 'Depth Seismic Stations','Magnitude Error','Magnitude Seismic Stations','Azimuthal Gap', 'Horizontal Distance','Horizontal Error',
    'Root Mean Square','Source','Location Source','Magnitude Source','Status']
df_load = df_load.drop(*lst_dropped_columns)
#Preview df_load
df_load.show(5)


+----------+--------+---------+----------+-----+---------+--------------+------------+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|
+----------+--------+---------+----------+-----+---------+--------------+------------+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|
+----------+--------+---------+----------+-----+---------+--------------+------------+
only showing top 5 rows



In [11]:
# Create year fields and add it to dataframe
df_load = df_load.withColumn('Year', year(to_timestamp('Date', 'dd/MM/yyyy')))
#Preview df_load
#df_load = df_load.drop('Day','')
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [12]:
#Build the quake frequency dataframe using the year field
df_quake_freq = df_load.groupBy('Year').count().withColumnRenamed('count', 'Count')
#Preview df_quake_freq
df_quake_freq.show(5)

[Stage 4:>                                                          (0 + 1) / 1]

+----+-----+
|Year|Count|
+----+-----+
|1990|  196|
|1975|  150|
|1977|  148|
|2003|  187|
|2007|  211|
+----+-----+
only showing top 5 rows



                                                                                

In [13]:
#----------------------------------------------TRANSFORMATION-------------------------------------

In [14]:
# Cast some fields from string into numeric types
df_load = df_load.withColumn('Latitude', df_load['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_load['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_load['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_load['Magnitude'].cast(DoubleType()))

# Preview df_load
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake| 15.0|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [15]:
# Preview df_load schema
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [16]:
# Create avg magnitude and max magnitude fields and add to df_quake_freq
df_max = df_load.groupBy('Year').max('Magnitude').withColumnRenamed('max(Magnitude)', 'Max_Magnitude')
df_avg = df_load.groupBy('Year').avg('Magnitude').withColumnRenamed('avg(Magnitude)', 'Avg_Magnitude')

In [17]:
# Join df_max, and df_avg to df_quake_freq
df_quake_freq = df_quake_freq.join(df_avg, ['Year']).join(df_max, ['Year'])
# Preview df_quake_freq
df_quake_freq.show(5)

                                                                                

+----+-----+-----------------+-------------+
|Year|Count|    Avg_Magnitude|Max_Magnitude|
+----+-----+-----------------+-------------+
|1990|  196|5.858163265306125|          7.6|
|1975|  150| 5.84866666666667|          7.8|
|1977|  148|5.757432432432437|          7.6|
|2003|  187|5.850802139037435|          7.6|
|2007|  211| 5.89099526066351|          8.4|
+----+-----+-----------------+-------------+
only showing top 5 rows



In [18]:
# Remove nulls
df_load.dropna()
df_quake_freq.dropna()

DataFrame[Year: int, Count: bigint, Avg_Magnitude: double, Max_Magnitude: double]

In [19]:
# Preview dataframes
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake| 15.0|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [20]:
df_quake_freq.show(5)

                                                                                

+----+-----+-----------------+-------------+
|Year|Count|    Avg_Magnitude|Max_Magnitude|
+----+-----+-----------------+-------------+
|1990|  196|5.858163265306125|          7.6|
|1975|  150| 5.84866666666667|          7.8|
|1977|  148|5.757432432432437|          7.6|
|2003|  187|5.850802139037435|          7.6|
|2007|  211| 5.89099526066351|          8.4|
+----+-----+-----------------+-------------+
only showing top 5 rows



In [21]:
#----------------------------------------------LOADING-------------------------------------

In [22]:
# Build the tables or collections
# Write df_load to mongodb
df_load.write.format('mongo').mode('overwrite').option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.quakes').save()

                                                                                

In [23]:
# Write df_quake_freq to mongodb
df_quake_freq.write.format('mongo').mode('overwrite').option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.quake_freq').save()

                                                                                

In [24]:
"""

Section Machine Learning Part

"""

'\n\nSection Machine Learning Part\n\n'

In [25]:
#Load the test data into a dataframe
df_test = spark.read.csv(r"/Users/Elimane/SPARK/data/query.csv", header=True)

# Preview df_test
df_test.take(2)


[Row(time='2017-01-02T00:13:06.300Z', latitude='-36.0365', longitude='51.9288', depth='10', mag='5.7', magType='mwb', nst=None, gap='26', dmin='14.685', rms='1.37', net='us', id='us10007p5d', updated='2017-03-27T23:53:17.040Z', place='Southwest Indian Ridge', type='earthquake', horizontalError='10.3', depthError='1.7', magError='0.068', magNst='21', status='reviewed', locationSource='us', magSource='us'),
 Row(time='2017-01-02T13:13:48.710Z', latitude='-4.895', longitude='-76.3675', depth='106', mag='5.9', magType='mww', nst=None, gap='31', dmin='3.002', rms='0.82', net='us', id='us10007p7n', updated='2020-01-02T23:56:16.978Z', place='37km E of Barranca, Peru', type='earthquake', horizontalError='7.1', depthError='1.9', magError=None, magNst=None, status='reviewed', locationSource='us', magSource='us')]

In [26]:
# Load training data from mongo
df_train = spark.read.format('mongo').option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.quakes').load()

# Preview df_train
df_train.show(5)

                                                                                

+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|      Date|Depth|          ID|Latitude|Longitude|Magnitude|Magnitude Type|      Type|Year|                 _id|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|01/02/1965|131.6|ISCGEM860706|  19.246|  145.616|      6.0|            MW|Earthquake|1965|{61d09a8de8153646...|
|01/04/1965| 80.0|ISCGEM860737|   1.863|  127.352|      5.8|            MW|Earthquake|1965|{61d09a8de8153646...|
|01/05/1965| 20.0|ISCGEM860762| -20.579| -173.972|      6.2|            MW|Earthquake|1965|{61d09a8de8153646...|
|01/08/1965| 15.0|ISCGEM860856| -59.076|  -23.557|      5.8|            MW|Earthquake|1965|{61d09a8de8153646...|
|01/09/1965| 15.0|ISCGEM860890|  11.938|  126.427|      5.8|            MW|Earthquake|1965|{61d09a8de8153646...|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+-----

In [27]:
# Select field we will use and discard fields we don't need
df_test_cleaned = df_test['time','latitude','longitude','mag','depth']

# Preview df_test_cleaned
df_test_cleaned.show(2)

+--------------------+--------+---------+---+-----+
|                time|latitude|longitude|mag|depth|
+--------------------+--------+---------+---+-----+
|2017-01-02T00:13:...|-36.0365|  51.9288|5.7|   10|
|2017-01-02T13:13:...|  -4.895| -76.3675|5.9|  106|
+--------------------+--------+---------+---+-----+
only showing top 2 rows



In [28]:
# Rename fields
df_test_cleaned = df_test_cleaned.withColumnRenamed('time', 'Date')\
    .withColumnRenamed('latitude', 'Latitude')\
    .withColumnRenamed('longitude', 'Longitude')\
    .withColumnRenamed('mag', 'Magnitude')\
    .withColumnRenamed('depth', 'Depth')\

# Preview df_test_cleaned
df_test_cleaned.show(2)

+--------------------+--------+---------+---------+-----+
|                Date|Latitude|Longitude|Magnitude|Depth|
+--------------------+--------+---------+---------+-----+
|2017-01-02T00:13:...|-36.0365|  51.9288|      5.7|   10|
|2017-01-02T13:13:...|  -4.895| -76.3675|      5.9|  106|
+--------------------+--------+---------+---------+-----+
only showing top 2 rows



In [29]:
# Preview sSchema
df_test_cleaned.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Depth: string (nullable = true)



In [30]:
# Cast some string fields into numeric fields
df_test_cleaned = df_test_cleaned.withColumn('Latitude', df_test_cleaned['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_test_cleaned['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_test_cleaned['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_test_cleaned['Magnitude'].cast(DoubleType()))


# Preview df_test_cleaned Schema
df_test_cleaned.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)



In [31]:
# Create testing and training dataframes
df_testing = df_test_cleaned['Latitude','Longitude','Magnitude','Depth']
df_training = df_train['Latitude','Longitude','Magnitude','Depth']

df_testing = df_testing.withColumn('Latitude', df_testing['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_testing['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_testing['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_testing['Magnitude'].cast(DoubleType()))

df_training = df_training.withColumn('Latitude', df_training['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_training['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_training['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_training['Magnitude'].cast(DoubleType()))

In [32]:
# Preview df_training
df_training.show(5)

+--------+---------+---------+-----+
|Latitude|Longitude|Magnitude|Depth|
+--------+---------+---------+-----+
|  19.246|  145.616|      6.0|131.6|
|   1.863|  127.352|      5.8| 80.0|
| -20.579| -173.972|      6.2| 20.0|
| -59.076|  -23.557|      5.8| 15.0|
|  11.938|  126.427|      5.8| 15.0|
+--------+---------+---------+-----+
only showing top 5 rows



In [33]:
# Preview df_testing
df_testing.show(5)

+--------+---------+---------+------+
|Latitude|Longitude|Magnitude| Depth|
+--------+---------+---------+------+
|-36.0365|  51.9288|      5.7|  10.0|
|  -4.895| -76.3675|      5.9| 106.0|
|-23.2513| 179.2383|      6.3|551.62|
| 24.0151|  92.0177|      5.7|  32.0|
|-43.3527| -74.5017|      5.5| 10.26|
+--------+---------+---------+------+
only showing top 5 rows



In [34]:
# Drop records with null values from our dataframes
df_testing = df_testing.dropna()
df_training = df_training.dropna()
df_training.printSchema()
df_testing.printSchema()

root
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)

root
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)



In [35]:
#----------------------------------------------ML MODELS------------------------------------------------------------

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator


In [36]:

# Select features to parse into our model and then create the feature vector
assembler = VectorAssembler(inputCols=['Latitude','Longitude','Depth'], outputCol='features')

# Create the model
model_reg = RandomForestRegressor(featuresCol='features', labelCol='Magnitude')


# Chain the assembler with the model in a pipeline
pipeline = Pipeline(stages=[assembler, model_reg])

# Train the model
model = pipeline.fit(df_training)

# Make the prediction
pred_results = model.transform(df_testing)


                                                                                

In [37]:
# Preview pred_results dataframe
pred_results.show(5)

+--------+---------+---------+------+--------------------+------------------+
|Latitude|Longitude|Magnitude| Depth|            features|        prediction|
+--------+---------+---------+------+--------------------+------------------+
|-36.0365|  51.9288|      5.7|  10.0|[-36.0365,51.9288...| 5.850136724800972|
|  -4.895| -76.3675|      5.9| 106.0|[-4.895,-76.3675,...| 5.869637793167875|
|-23.2513| 179.2383|      6.3|551.62|[-23.2513,179.238...| 5.896591561887881|
| 24.0151|  92.0177|      5.7|  32.0|[24.0151,92.0177,...| 5.931774818629581|
|-43.3527| -74.5017|      5.5| 10.26|[-43.3527,-74.501...|5.9204458792780486|
+--------+---------+---------+------+--------------------+------------------+
only showing top 5 rows



In [38]:
# Evaluate the model
# rmse should be less than 0.5 for the model to be useful
evaluator = RegressionEvaluator(labelCol='Magnitude', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(pred_results)
print('Root Mean Squared Error (RMSE) on test data = %g ' %rmse)

Root Mean Squared Error (RMSE) on test data = 0.403077 


In [39]:
"""

Create the prediction dataset
"""

'\n\nCreate the prediction dataset\n'

In [40]:
# Create the prediction dataset
df_pred_results = pred_results['Latitude','Longitude','prediction']

# Rename the prediction field
df_pred_results = df_pred_results.withColumnRenamed('prediction','Pred_Magnitude')

# Add more columns to our prediction dataset
df_pred_results = df_pred_results.withColumn('Year', lit(2017)).withColumn('RMSE', lit(rmse))

# Preview df_pred_results
df_pred_results.show(5)

+--------+---------+------------------+----+-------------------+
|Latitude|Longitude|    Pred_Magnitude|Year|               RMSE|
+--------+---------+------------------+----+-------------------+
|-36.0365|  51.9288| 5.850136724800972|2017|0.40307689171713734|
|  -4.895| -76.3675| 5.869637793167875|2017|0.40307689171713734|
|-23.2513| 179.2383| 5.896591561887881|2017|0.40307689171713734|
| 24.0151|  92.0177| 5.931774818629581|2017|0.40307689171713734|
|-43.3527| -74.5017|5.9204458792780486|2017|0.40307689171713734|
+--------+---------+------------------+----+-------------------+
only showing top 5 rows



In [41]:
# Load the prediction dataset into mongodb
# Write df_pred_results
df_pred_results.write.format('mongo').mode('overwrite').option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.df_pred_results').save()