In [1]:
import pyspark   # se importa pyspark
from pyspark.sql import SparkSession # importar una sesion 
from pyspark.sql.types import * # se importa para poder ver o comprobar los tipos utilizados
from pyspark.sql.functions import * # importamos todas las funciones de sql

In [None]:
#spark = SparkSession.builder.getOrCreate() # se crea un dataframe, y se verifica la sesion de spark 
#df = spark.sql("Select 'Spark' As Hello") # query de sql 
#df.show()

In [2]:
# Configuramos la sesión de spark  
spark = SparkSession\
    .builder\
    .master('local[2]')\
    .appName('quake_etl')\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:2.4.1')\
    .getOrCreate()

In [3]:
# Cargamos el dataset
df_quake = spark.read.csv(r'database.csv', header=True)
# tomamos y leemos la primera parte
df_quake.take(1)

[Row(Date='01/02/1965', Time='13:44:18', Latitude='19.246', Longitude='145.616', Type='Earthquake', Depth='131.6', Depth Error=None, Depth Seismic Stations=None, Magnitude='6', Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860706', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic')]

In [4]:
df_quake.show(5)

+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+
|      Date|    Time|Latitude|Longitude|      Type|Depth|Depth Error|Depth Seismic Stations|Magnitude|Magnitude Type|Magnitude Error|Magnitude Seismic Stations|Azimuthal Gap|Horizontal Distance|Horizontal Error|Root Mean Square|          ID|Source|Location Source|Magnitude Source|   Status|
+----------+--------+--------+---------+----------+-----+-----------+----------------------+---------+--------------+---------------+--------------------------+-------------+-------------------+----------------+----------------+------------+------+---------------+----------------+---------+
|01/02/1965|13:44:18|  19.246|  145.616|Earthquake|131.6|       null|                  null|        6|            MW|       

In [5]:
df_quake.describe()

DataFrame[summary: string, Date: string, Time: string, Latitude: string, Longitude: string, Type: string, Depth: string, Depth Error: string, Depth Seismic Stations: string, Magnitude: string, Magnitude Type: string, Magnitude Error: string, Magnitude Seismic Stations: string, Azimuthal Gap: string, Horizontal Distance: string, Horizontal Error: string, Root Mean Square: string, ID: string, Source: string, Location Source: string, Magnitude Source: string, Status: string]

In [6]:
# quitamos las columnas inecesarios 
quitar = ['Depth Error', 'Time', 'Depth Seismic Stations','Magnitude Error','Magnitude Seismic Stations','Azimuthal Gap', 'Horizontal Distance','Horizontal Error',
    'Root Mean Square','Source','Location Source','Magnitude Source','Status']

df_quake = df_quake.drop(*quitar)
# mostramos los nuevos datos
df_quake.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|
+----------+--------+---------+----------+-----+---------+--------------+------------+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|
+----------+--------+---------+----------+-----+---------+--------------+------------+
only showing top 5 rows



In [7]:
# Create a new column with only the years
df_quake = df_quake.withColumn('Year', year(to_timestamp('Date', 'dd/MM/yyyy')))
# show eith the new column
df_quake.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [8]:
# Count the frecuency of each year
df_frequency = df_quake.groupBy('Year').count().withColumnRenamed('count', 'Counts')
# show the frecuency
df_frequency.show(8)

+----+------+
|Year|Counts|
+----+------+
|1990|   196|
|1975|   150|
|1977|   148|
|2003|   187|
|2007|   211|
|1974|   147|
|2015|   175|
|2006|   176|
+----+------+
only showing top 8 rows



In [9]:
df_quake.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [10]:
# Change the types of the columns for the correct type
df_quake = df_quake.withColumn('Latitude', df_quake['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_quake['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_quake['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_quake['Magnitude'].cast(DoubleType()))

# show
df_quake.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake| 15.0|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [11]:
# corrobore the types of the columns again
df_quake.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [12]:
# Find the max Magnitude 
df_max = df_quake.groupBy('Year').max('Magnitude').withColumnRenamed('max(Magnitude)', 'Max_Magnitude')
df_max.show(8)

+----+-------------+
|Year|Max_Magnitude|
+----+-------------+
|1990|          7.6|
|1975|          7.8|
|1977|          7.6|
|2003|          7.6|
|2007|          8.4|
|1974|          7.6|
|2015|          7.5|
|2006|          8.0|
+----+-------------+
only showing top 8 rows



In [13]:
# Find the average of magnitude
df_avg = df_quake.groupBy('Year').avg('Magnitude').withColumnRenamed('avg(Magnitude)', 'Avg_Magnitude')
df_avg.show(8)

+----+-----------------+
|Year|    Avg_Magnitude|
+----+-----------------+
|1990|5.858163265306125|
|1975| 5.84866666666667|
|1977|5.757432432432437|
|2003|5.850802139037435|
|2007| 5.89099526066351|
|1974|5.890476190476194|
|2015|5.842857142857147|
|2006|5.838068181818182|
+----+-----------------+
only showing top 8 rows



In [14]:
# Join the df_frecuency with df_avg and df_max
df_frequency = df_frequency.join(df_avg, ['Year']).join(df_max, ['Year'])
# show the new Df
df_frequency.show(8)

+----+------+-----------------+-------------+
|Year|Counts|    Avg_Magnitude|Max_Magnitude|
+----+------+-----------------+-------------+
|1990|   196|5.858163265306125|          7.6|
|1975|   150| 5.84866666666667|          7.8|
|1977|   148|5.757432432432437|          7.6|
|2003|   187|5.850802139037435|          7.6|
|2007|   211| 5.89099526066351|          8.4|
|1974|   147|5.890476190476194|          7.6|
|2015|   175|5.842857142857147|          7.5|
|2006|   176|5.838068181818182|          8.0|
+----+------+-----------------+-------------+
only showing top 8 rows



In [15]:
# Show the count null values in df_quake 
df_quake.select([count(when(col(c).isNull(), c)).alias(c) for c in df_quake.columns]).show()

+----+--------+---------+----+-----+---------+--------------+---+-----+
|Date|Latitude|Longitude|Type|Depth|Magnitude|Magnitude Type| ID| Year|
+----+--------+---------+----+-----+---------+--------------+---+-----+
|   0|       0|        0|   0|    0|        0|             3|  0|14211|
+----+--------+---------+----+-----+---------+--------------+---+-----+



In [16]:
# view the shape of df_quake
(df_quake.count(), len(df_quake.columns))

(23412, 9)

In [17]:
# show the count null values in df_frequency
df_frequency.select([count(when(col(c).isNull(), c)).alias(c) for c in df_frequency.columns]).show()

+----+------+-------------+-------------+
|Year|Counts|Avg_Magnitude|Max_Magnitude|
+----+------+-------------+-------------+
|   0|     0|            0|            0|
+----+------+-------------+-------------+



In [18]:
# view the shape of df_frequency
(df_frequency.count(), len(df_frequency.columns))

(52, 4)

In [19]:
# Remove the null values 
df_quake.dropna()
df_frequency.dropna()

DataFrame[Year: int, Counts: bigint, Avg_Magnitude: double, Max_Magnitude: double]

### Show in other DF drop of the null values

In [None]:
# Remove the null values 
new_quake = df_quake.dropna()
new_frequency = df_frequency.dropna()

In [None]:
# view the shape of df_quake
(new_quake.count(), len(new_quake.columns))

In [None]:
# view the shape of df_frequency
(new_frequency.count(), len(new_frequency.columns))

In [None]:
# view of dataframes
df_quake.show(8)

In [None]:
df_frequency.show(8)

### Connect with Mongodb

In [20]:
# For built the collections in mongodb, we need to write df_quake to mongodb
df_quake.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.quakes').save()

In [21]:
# Write df_frequency to mongodb
df_frequency.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.quake_freq').save()

### Data pre-processing

In [22]:
# Read the test data
df_test = spark.read.csv(r'query.csv', header=True)
df_test.take(1)

[Row(time='2017-01-02T00:13:06.300Z', latitude='-36.0365', longitude='51.9288', depth='10', mag='5.7', magType='mwb', nst=None, gap='26', dmin='14.685', rms='1.37', net='us', id='us10007p5d', updated='2017-03-27T23:53:17.040Z', place='Southwest Indian Ridge', type='earthquake', horizontalError='10.3', depthError='1.7', magError='0.068', magNst='21', status='reviewed', locationSource='us', magSource='us')]

In [23]:
# show the other dataset
df_test.show(8)

+--------------------+--------+---------+------+---+-------+----+---+------+----+---+----------+--------------------+--------------------+----------+---------------+----------+--------+------+--------+--------------+---------+
|                time|latitude|longitude| depth|mag|magType| nst|gap|  dmin| rms|net|        id|             updated|               place|      type|horizontalError|depthError|magError|magNst|  status|locationSource|magSource|
+--------------------+--------+---------+------+---+-------+----+---+------+----+---+----------+--------------------+--------------------+----------+---------------+----------+--------+------+--------+--------------+---------+
|2017-01-02T00:13:...|-36.0365|  51.9288|    10|5.7|    mwb|null| 26|14.685|1.37| us|us10007p5d|2017-03-27T23:53:...|Southwest Indian ...|earthquake|           10.3|       1.7|   0.068|    21|reviewed|            us|       us|
|2017-01-02T13:13:...|  -4.895| -76.3675|   106|5.9|    mww|null| 31| 3.002|0.82| us|us10007

In [24]:
(df_test.count(), len(df_test.columns))

(397, 22)

In [25]:
# Load the training data of mongodb
df_train = spark.read.format('mongo')\
    .option('spark.mongodb.input.uri', 'mongodb://127.0.0.1:27017/Quake.quakes').load()

# Preview the data training
df_train.show(5)

+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|      Date|Depth|          ID|Latitude|Longitude|Magnitude|Magnitude Type|      Type|Year|                 _id|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|01/02/1965|131.6|ISCGEM860706|  19.246|  145.616|      6.0|            MW|Earthquake|1965|[5f242e152a31516a...|
|01/04/1965| 80.0|ISCGEM860737|   1.863|  127.352|      5.8|            MW|Earthquake|1965|[5f242e152a31516a...|
|01/05/1965| 20.0|ISCGEM860762| -20.579| -173.972|      6.2|            MW|Earthquake|1965|[5f242e152a31516a...|
|01/08/1965| 15.0|ISCGEM860856| -59.076|  -23.557|      5.8|            MW|Earthquake|1965|[5f242e152a31516a...|
|01/09/1965| 15.0|ISCGEM860890|  11.938|  126.427|      5.8|            MW|Earthquake|1965|[5f242e152a31516a...|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+-----

In [26]:
# Select fields we will use and discard fields we don't need
df_test_clean = df_test['time', 'latitude', 'longitude', 'mag', 'depth']
df_test_clean.show(5)

+--------------------+--------+---------+---+------+
|                time|latitude|longitude|mag| depth|
+--------------------+--------+---------+---+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|5.5| 10.26|
+--------------------+--------+---------+---+------+
only showing top 5 rows



In [27]:
# Rename fields
df_test_clean = df_test_clean.withColumnRenamed('time', 'Date')\
    .withColumnRenamed('latitude', 'Latitude')\
    .withColumnRenamed('longitude', 'Longitude')\
    .withColumnRenamed('mag', 'Magnitude')\
    .withColumnRenamed('depth', 'Depth')

df_test_clean.show(5)

+--------------------+--------+---------+---------+------+
|                Date|Latitude|Longitude|Magnitude| Depth|
+--------------------+--------+---------+---------+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|      5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|      5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|      6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|      5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|      5.5| 10.26|
+--------------------+--------+---------+---------+------+
only showing top 5 rows



In [28]:
# whatch the schema
df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Depth: string (nullable = true)



In [29]:
# Use of "cast" for change the types of string to double 
df_test_clean = df_test_clean.withColumn('Latitude', df_test_clean['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_test_clean['Longitude'].cast(DoubleType()))\
    .withColumn('Magnitude', df_test_clean['Magnitude'].cast(DoubleType()))\
    .withColumn('Depth', df_test_clean['Depth'].cast(DoubleType()))

In [30]:
# corroborate
df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)



In [31]:
# Create training and testing dataframes
df_training = df_train['Latitude', 'Longitude', 'Magnitude', 'Depth'] # 1965 to 2016

df_testing = df_test_clean['Latitude', 'Longitude', 'Magnitude', 'Depth']

In [32]:
# see the training df
df_training.show(8)

+--------+---------+---------+-----+
|Latitude|Longitude|Magnitude|Depth|
+--------+---------+---------+-----+
|  19.246|  145.616|      6.0|131.6|
|   1.863|  127.352|      5.8| 80.0|
| -20.579| -173.972|      6.2| 20.0|
| -59.076|  -23.557|      5.8| 15.0|
|  11.938|  126.427|      5.8| 15.0|
| -13.405|  166.629|      6.7| 35.0|
|  27.357|   87.867|      5.9| 20.0|
| -13.309|  166.212|      6.0| 35.0|
+--------+---------+---------+-----+
only showing top 8 rows



In [33]:
# see the testing df
df_testing.show(8)

+--------+---------+---------+------+
|Latitude|Longitude|Magnitude| Depth|
+--------+---------+---------+------+
|-36.0365|  51.9288|      5.7|  10.0|
|  -4.895| -76.3675|      5.9| 106.0|
|-23.2513| 179.2383|      6.3|551.62|
| 24.0151|  92.0177|      5.7|  32.0|
|-43.3527| -74.5017|      5.5| 10.26|
|-19.3733| 176.0518|      6.9|  12.0|
|-19.3977| 175.9532|      5.7|  10.0|
|-19.1207| 176.1875|      6.0|  10.0|
+--------+---------+---------+------+
only showing top 8 rows



In [None]:
#df_training.select([count(when(col(c).isNull(), c)).alias(c) for c in df_training.columns]).show()

In [None]:
#df_training.select([count(when(isnan(c), c)).alias(c) for c in df_training.columns]).show()

In [None]:
#df_testing.select([count(when(col(c).isNull(), c)).alias(c) for c in df_testing.columns]).show()

In [None]:
#df_testing.select([count(when(isnan(c), c)).alias(c) for c in df_testing.columns]).show()

In [34]:
# Drop the records with null values from our dataframe 
df_training = df_training.dropna()
df_testing = df_testing.dropna()

In [35]:
df_training.show(5)

+--------+---------+---------+-----+
|Latitude|Longitude|Magnitude|Depth|
+--------+---------+---------+-----+
|  19.246|  145.616|      6.0|131.6|
|   1.863|  127.352|      5.8| 80.0|
| -20.579| -173.972|      6.2| 20.0|
| -59.076|  -23.557|      5.8| 15.0|
|  11.938|  126.427|      5.8| 15.0|
+--------+---------+---------+-----+
only showing top 5 rows



In [36]:
(df_training.count(), len(df_training.columns))

(23412, 4)

In [37]:
df_testing.show(5)

+--------+---------+---------+------+
|Latitude|Longitude|Magnitude| Depth|
+--------+---------+---------+------+
|-36.0365|  51.9288|      5.7|  10.0|
|  -4.895| -76.3675|      5.9| 106.0|
|-23.2513| 179.2383|      6.3|551.62|
| 24.0151|  92.0177|      5.7|  32.0|
|-43.3527| -74.5017|      5.5| 10.26|
+--------+---------+---------+------+
only showing top 5 rows



In [38]:
(df_testing.count(), len(df_testing.columns))

(397, 4)

In [39]:
# Import the libraries for make the model 
from pyspark.ml import Pipeline # allows us to maintain the data flow of all the relevant transformations that are required to reach the end result
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator                                   

In [40]:
# Select the features to analyze into our model and then create the feature vector 
assembler = VectorAssembler(inputCols=['Latitude', 'Longitude', 'Depth'], outputCol='features')

# Create the model // labelcol it is for predict the column magnitude (dependent variable)
# the features are that the model need for make the predictions 
model_reg = RandomForestRegressor(featuresCol='features', 
                                  labelCol='Magnitude',
                                  seed=42) # Default is predictionCol = "prediction"

# Chain the assembler with the model in a pipeline 
pipeline = Pipeline(stages=[assembler, model_reg])

# Train the model // we fit the pipeline with the df_training
model = pipeline.fit(df_training)  # To keep our model trained

# Make the prediction 
pred_results = model.transform(df_testing)

In [41]:
# Preview pred_results dataframe
pred_results.show(8)

+--------+---------+---------+------+--------------------+-----------------+
|Latitude|Longitude|Magnitude| Depth|            features|       prediction|
+--------+---------+---------+------+--------------------+-----------------+
|-36.0365|  51.9288|      5.7|  10.0|[-36.0365,51.9288...|5.834359794866099|
|  -4.895| -76.3675|      5.9| 106.0|[-4.895,-76.3675,...|5.874717259402566|
|-23.2513| 179.2383|      6.3|551.62|[-23.2513,179.238...|5.906461434446237|
| 24.0151|  92.0177|      5.7|  32.0|[24.0151,92.0177,...|5.894743761504126|
|-43.3527| -74.5017|      5.5| 10.26|[-43.3527,-74.501...|5.940326268704404|
|-19.3733| 176.0518|      6.9|  12.0|[-19.3733,176.051...|5.982007726977924|
|-19.3977| 175.9532|      5.7|  10.0|[-19.3977,175.953...| 5.89185886831292|
|-19.1207| 176.1875|      6.0|  10.0|[-19.1207,176.187...| 5.89185886831292|
+--------+---------+---------+------+--------------------+-----------------+
only showing top 8 rows



In [None]:
#(pred_results.count(), len(pred_results.columns))

### Evaluate the model

In [47]:
# Evaluate the model
# rmse (root mean square area) should be < 0.5 for the model to be useful
evaluator = RegressionEvaluator(labelCol='Magnitude', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(pred_results)
print(f'Root Mean Squared Error (RMSE) on test data = {rmse}')

Root Mean Squared Error (RMSE) on test data = 0.4011923484762858


### Create the Dataset with the predictions

In [48]:
# Create the prediction dataset
df_pred_results = pred_results['Latitude', 'Longitude', 'prediction']

# Rename the prediction field
df_pred_results = df_pred_results.withColumnRenamed('prediction', 'Pred_Magnitude')

# Add more columns to our prediction dataset
df_pred_results = df_pred_results.withColumn('Year', lit(2017))\
    .withColumn('RMSE', lit(rmse))

# see the dataset wih the predictions
df_pred_results.show(8)

+--------+---------+-----------------+----+------------------+
|Latitude|Longitude|   Pred_Magnitude|Year|              RMSE|
+--------+---------+-----------------+----+------------------+
|-36.0365|  51.9288|5.834359794866099|2017|0.4011923484762858|
|  -4.895| -76.3675|5.874717259402566|2017|0.4011923484762858|
|-23.2513| 179.2383|5.906461434446237|2017|0.4011923484762858|
| 24.0151|  92.0177|5.894743761504126|2017|0.4011923484762858|
|-43.3527| -74.5017|5.940326268704404|2017|0.4011923484762858|
|-19.3733| 176.0518|5.982007726977924|2017|0.4011923484762858|
|-19.3977| 175.9532| 5.89185886831292|2017|0.4011923484762858|
|-19.1207| 176.1875| 5.89185886831292|2017|0.4011923484762858|
+--------+---------+-----------------+----+------------------+
only showing top 8 rows



### Load the prediction dataset into mongodb

In [49]:
# Write df_pred_results
df_pred_results.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.pred_results').save() 

### Data visualization

In [50]:
import pandas as pd
# Library for show the visualization in the notebook and in other file
from bokeh.io import output_notebook, output_file
# For access more advanced capabilities in our graphs
from bokeh.plotting import figure, show, ColumnDataSource
# For pop ups when we over certain section of our crops
from bokeh.models.tools import HoverTool
import math
from math import pi
# For manage diferents colors
from bokeh.palettes import Category20c
# For Create a DataSpec dict to generate a CumSum expression for a ColumnDataSource.
from bokeh.transform import cumsum
# For make geop map 
from bokeh.plotting import figure, output_file, show
from bokeh.tile_providers import CARTODBPOSITRON, get_provider
# For create themes in dashboards 
from bokeh.themes import built_in_themes
from bokeh.io import curdoc
# For work with mongo
from pymongo import MongoClient

### For the visualization we focus to use python

In [51]:
# Create a custom read function to read data from mongodb into a dataframe
def read_mongo(host='127.0.0.1', port=27017, username=None, password=None, db='Quake', collection='pred_results'):
    
    mongo_uri = f'mongodb://{host}:{port}/{db}.{collection}'
    
    # Connect to mongodb
    conn = MongoClient(mongo_uri)
    db = conn[db]
    
    # Select all records from the collection
    cursor = db[collection].find()
    
    # Create the dataframe
    df = pd.DataFrame(list(cursor))
    
    # Delete the _id field
    del df['_id']
    
    return df

In [52]:
# Load the datasets from mongodb
df_quakes = read_mongo(collection='quakes')
df_quake_freq = read_mongo(collection='quake_freq')
df_quake_pred = read_mongo(collection='pred_results')

In [94]:
df_quakes.to_csv('Quakes.csv')

In [96]:
df_quake_freq.to_csv('Quakes_freq.csv')

In [97]:
df_quake_pred.to_csv('Quakes_with_pred_results.csv')

In [53]:
df_quakes = df_quakes.sort_values('Year')
df_quakes.head(3)

Unnamed: 0,Date,Latitude,Longitude,Type,Depth,Magnitude,Magnitude Type,ID,Year
0,01/02/1965,19.246,145.616,Earthquake,131.6,6.0,MW,ISCGEM860706,1965.0
169,06/12/1965,44.167,149.87,Earthquake,35.0,5.6,MW,ISCGEM856064,1965.0
170,06/12/1965,43.775,149.448,Earthquake,38.0,5.7,MW,ISCGEM856066,1965.0


In [54]:
df_quake_freq = df_quake_freq.sort_values('Year')
df_quake_freq.head(5)

Unnamed: 0,Year,Counts,Avg_Magnitude,Max_Magnitude
27,1965,156,6.009615,8.7
17,1966,98,6.060714,7.7
21,1967,103,5.962136,7.2
13,1968,106,6.070755,7.6
22,1969,114,6.015789,7.5


In [55]:
df_quake_pred = df_quake_pred.sort_values('Year')
df_quake_pred.head(3)

Unnamed: 0,Latitude,Longitude,Pred_Magnitude,Year,RMSE
0,-36.0365,51.9288,5.83436,2017,0.401192
269,-5.0845,147.827,5.890987,2017,0.401192
268,-18.7854,169.0946,5.905093,2017,0.401192


In [56]:
# We create one dataframe that content only the quakes of 2016
df_quakes_2016 = df_quakes[df_quakes['Year'] == 2016]
df_quakes_2016.head()

Unnamed: 0,Date,Latitude,Longitude,Type,Depth,Magnitude,Magnitude Type,ID,Year
23280,10/04/2016,27.8519,141.9442,Earthquake,35.0,5.7,MWW,USD0007JX3,2016.0
23241,09/01/2016,-37.3176,178.7148,Earthquake,10.0,5.5,MB,US10006JFZ,2016.0
23242,09/02/2016,-55.3295,-30.9179,Earthquake,8.0,5.5,MWB,US10006JJ6,2016.0
23243,09/02/2016,-19.5002,-173.7059,Earthquake,10.0,5.5,MWB,US10006JMT,2016.0
23244,09/03/2016,40.320833,-125.687667,Earthquake,28.57,5.64,MW,NC72689331,2016.0


In [57]:
# Show plots embedded in jupyter notebook
output_notebook()

In [58]:
# Create custom style function to style our plots
def style(p):
    # Title
    p.title.align='center'
    p.title.text_font_size='20pt'
    p.title.text_font='serif'
    
    # Axis titles
    p.xaxis.axis_label_text_font_size='14pt'
    p.xaxis.axis_label_text_font_style='bold'
    p.yaxis.axis_label_text_font_size='14pt'
    p.yaxis.axis_label_text_font_style='bold'
    
    # Tick labels
    p.xaxis.major_label_text_font_size='12pt'
    p.yaxis.major_label_text_font_size='12pt'
    
    # Plot the legend in the top left corner
    p.legend.location='top_left'
    
    return p

In [59]:
# Create one function for the Geo Map plot
def plotMap():
    lat = df_quakes_2016['Latitude'].values.tolist()
    lon = df_quakes_2016['Longitude'].values.tolist()
    
    pred_lat = df_quake_pred['Latitude'].values.tolist()
    pred_lon = df_quake_pred['Longitude'].values.tolist()
    
    lst_lat = []
    lst_lon = []
    lst_pred_lat = []
    lst_pred_lon = []
    
    # Convert Lat and Long values into merc_projection format
    for i in range(len(lon)):
        r_major = 6378137.000
        x = r_major * math.radians(lon[i])
        scale = x/lon[i]
        y = 180.0/math.pi * math.log(math.tan(math.pi/4.0 +
            lat[i] * (math.pi/180.0)/2.0)) * scale
        
        lst_lon.append(x)
        lst_lat.append(y)
        
    # Convert predicted lat and long values into merc_projection format
    for j in range(len(pred_lon)):
        r_major = 6378137.000
        x = r_major * math.radians(pred_lon[j])
        scale = x/pred_lon[j]
        y = 180.0/math.pi * math.log(math.tan(math.pi/4.0 +
            pred_lat[j] * (math.pi/180.0)/2.0)) * scale
        
        lst_pred_lon.append(x)
        lst_pred_lat.append(y)
    
    
    df_quakes_2016['coords_x'] = lst_lat
    df_quakes_2016['coords_y'] = lst_lon
    df_quake_pred['coords_x'] = lst_pred_lat
    df_quake_pred['coords_y'] = lst_pred_lon
    
    # Scale the circles
    df_quakes_2016['Mag_Size'] = df_quakes_2016['Magnitude'] * 4
    df_quake_pred['Mag_Size'] = df_quake_pred['Pred_Magnitude'] * 4
    
    # create datasources for our ColumnDataSource object
    lats = df_quakes_2016['coords_x'].tolist()
    longs = df_quakes_2016['coords_y'].tolist()
    mags = df_quakes_2016['Magnitude'].tolist()
    years = df_quakes_2016['Year'].tolist()
    mag_size = df_quakes_2016['Mag_Size'].tolist()
    
    pred_lats = df_quake_pred['coords_x'].tolist()
    pred_longs = df_quake_pred['coords_y'].tolist()
    pred_mags = df_quake_pred['Pred_Magnitude'].tolist()
    pred_year = df_quake_pred['Year'].tolist()
    pred_mag_size = df_quake_pred['Mag_Size'].tolist()
    
    # Create column datasource
    cds = ColumnDataSource(
        data=dict(
            lat=lats,
            lon=longs,
            mag=mags,
            year=years,
            mag_s=mag_size
        )
    )
    
    pred_cds = ColumnDataSource(
        data=dict(
            pred_lat=pred_lats,
            pred_long=pred_longs,
            pred_mag=pred_mags,
            year=pred_year,
            pred_mag_s=pred_mag_size
        )
    )
    
    # Tooltips
    TOOLTIPS = [
        ("Year", " @year"),
        ("Magnitude", " @mag"),
        ("Predicted Magnitude", " @pred_mag")
    ]
    
    tile_provider = get_provider(CARTODBPOSITRON)

    
    # Create figure
    p = figure(title='Earthquake Map',
              plot_width=2300, plot_height=450,
              x_range=(-2000000, 6000000),
              y_range=(-1000000, 7000000),
              tooltips=TOOLTIPS)
    
    p.circle(x='lon', y='lat', size='mag_s', fill_color='#cc0000', fill_alpha=0.7,
            source=cds, legend_label='Quakes 2016')
    
    # Add circles for our predicted earthquakes
    p.circle(x='pred_long', y='pred_lat', size='pred_mag_s', fill_color='#ccff33', fill_alpha=7.0,
            source=pred_cds, legend_label='Predicted Quakes 2017')
    
    # p.add_tile(CARTODBPOSITRON)
    p.add_tile(tile_provider)
    
    # Style the map plot
    p.title.align='center'
    p.title.text_font_size='20pt'
    p.title.text_font='serif'
    
    # Legend
    p.legend.location='bottom_right'
    p.legend.background_fill_color='black'
    p.legend.background_fill_alpha=0.8
    p.legend.click_policy='hide'
    p.legend.label_text_color='white'
    p.xaxis.visible=False
    p.yaxis.visible=False
    p.axis.axis_label=None
    p.axis.visible=False
    p.grid.grid_line_color=None
    
    
    #show(p)
    
    return p
    
    
# plotMap()

In [60]:
# Create the Bar Chart
def plotBar():
    # Load the datasource 
    cds = ColumnDataSource(data=dict(
        yrs = df_quake_freq['Year'].values.tolist(),
        numQuakes = df_quake_freq['Counts'].values.tolist()
    ))
    
    # Tooltip
    TOOLTIPS = [
        ('Year', ' @yrs'),
        ('Number of earthquakes', ' @numQuakes')
    ]
    
    # Create a figure
    barChart = figure(title='Frequency of Earthquakes by Year',
                     plot_height=400,
                     plot_width=1150,
                     x_axis_label='Years',
                     y_axis_label='Number of Occurances',
                     x_minor_ticks=2,
                     y_range=(0, df_quake_freq['Counts'].max() + 100),
                     toolbar_location=None,
                     tooltips=TOOLTIPS)
    
    # Create a vertical bar 
    barChart.vbar(x='yrs', bottom=0, top='numQuakes',
                 color='#cc0000', width=0.75,
                 legend_label='Year', source=cds)
    
    # Style the bar chart
    barChart = style(barChart)
    
    #show(barChart)
    
    return barChart

#plotBar()

In [61]:
df_quake_freq.head(5)

Unnamed: 0,Year,Counts,Avg_Magnitude,Max_Magnitude
27,1965,156,6.009615,8.7
17,1966,98,6.060714,7.7
21,1967,103,5.962136,7.2
13,1968,106,6.070755,7.6
22,1969,114,6.015789,7.5


In [62]:
# Create a magnitude plot
def plotMagnitude():
    # Load the datasource
    cds = ColumnDataSource(data=dict(
        yrs = df_quake_freq['Year'].values.tolist(),
        avg_mag = df_quake_freq['Avg_Magnitude'].round(1).values.tolist(),
        max_mag = df_quake_freq['Max_Magnitude'].values.tolist()
    ))
    
    # Tooltip
    TOOLTIPS = [
        ('Year', ' @yrs'),
        ('Average Magnitude', ' @avg_mag'),
        ('Maximum Magnitude', ' @max_mag')
    ]
    
    # Create the figure
    mp = figure(title='Maximum and Average Magnitude by Year',
               plot_width=1150, plot_height=400,
               x_axis_label='Years',
               y_axis_label='Magnitude',
               x_minor_ticks=2,
               y_range=(5, df_quake_freq['Max_Magnitude'].max() + 1),
               toolbar_location=None,
               tooltips=TOOLTIPS)
    
    # Max Magnitude
    mp.line(x='yrs', y='max_mag', color='#cc0000', line_width=2, legend_label='Max Magnitude', source=cds)
    mp.circle(x='yrs', y='max_mag', color='#cc0000', size=8, fill_color='#cc0000', source=cds)
    
    # Average Magnitude 
    mp.line(x='yrs', y='avg_mag', color='yellow', line_width=2, legend_label='Avg Magnitude', source=cds)
    mp.circle(x='yrs', y='avg_mag', color='yellow', size=8, fill_color='yellow', source=cds)
    
    mp = style(mp)
    
    # show(mp)
    
    return mp

# plotMagnitude()  

In [65]:
# Build the grid plot
from bokeh.layouts import gridplot

# Make the grid
grid = gridplot([[plotMap()], [plotBar()], [plotMagnitude()]])

# Shor the grid
show(grid)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [64]:
# Display the visuals directly in the browser
output_file('dashboard.html')
# Change to a dark theme
curdoc().theme = 'dark_minimal'