<a href="https://colab.research.google.com/github/Achrafech/-Hackathon-GIS-in-the-heart-of-smart-cities-SIGINOV-/blob/main/PysparkDataPipeline_210915_141306.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Configure SparkSession for local mode
spark = SparkSession \
    .builder \
    .master('local[2]') \
    .appName('quake_etl') \
    .getOrCreate()

In [4]:
# Load the dataset
df_load = spark.read.csv(r"/content/database.csv", header=True)
# Preview df_load
df_load.take(1)

[Row(Date='01/02/1965', Time='13:44:18', Latitude='19.246', Longitude='145.616', Type='Earthquake', Depth='131.6', Depth Error=None, Depth Seismic Stations=None, Magnitude='6', Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860706', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic')]

In [5]:
# Drop fields we don't need from df_load
lst_dropped_columns = ['Depth Error', 'Time', 'Depth Seismic Stations','Magnitude Error','Magnitude Seismic Stations','Azimuthal Gap', 'Horizontal Distance','Horizontal Error',
    'Root Mean Square','Source','Location Source','Magnitude Source','Status']

df_load = df_load.drop(*lst_dropped_columns)
# Preview df_load
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|
+----------+--------+---------+----------+-----+---------+--------------+------------+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|
+----------+--------+---------+----------+-----+---------+--------------+------------+
only showing top 5 rows



In [6]:
# Create a year field and add it to the dataframe
df_load = df_load.withColumn('Year', year(to_timestamp('Date', 'dd/MM/yyyy')))
# Preview df_load
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [7]:
# Build the quakes frequency dataframe using the year field and counts for each year
df_quake_freq = df_load.groupBy('Year').count().withColumnRenamed('count', 'Counts')
# Preview df_quake_freq
df_quake_freq.show(5)

+----+------+
|Year|Counts|
+----+------+
|1990|   196|
|1975|   150|
|1977|   148|
|2003|   187|
|2007|   211|
+----+------+
only showing top 5 rows



In [8]:
# Preview df_load schema
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [9]:
# Cast some fields from string into numeric types
df_load = df_load.withColumn('Latitude', df_load['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_load['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_load['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_load['Magnitude'].cast(DoubleType()))

# Preview df_load
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake| 15.0|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [10]:
# Preview df_load schema
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [11]:
# Create avg magnitude and max magnitude fields and add to df_quake_freq
df_max = df_load.groupBy('Year').max('Magnitude').withColumnRenamed('max(Magnitude)', 'Max_Magnitude')
df_avg = df_load.groupBy('Year').avg('Magnitude').withColumnRenamed('avg(Magnitude)', 'Avg_Magnitude')

In [12]:
# Join df_max, and df_avg to df_quake_freq
df_quake_freq = df_quake_freq.join(df_avg, ['Year']).join(df_max, ['Year'])
# Preview df_quake_freq
df_quake_freq.show(5)

+----+------+-----------------+-------------+
|Year|Counts|    Avg_Magnitude|Max_Magnitude|
+----+------+-----------------+-------------+
|1990|   196|5.858163265306125|          7.6|
|1975|   150| 5.84866666666667|          7.8|
|1977|   148|5.757432432432437|          7.6|
|2003|   187|5.850802139037435|          7.6|
|2007|   211| 5.89099526066351|          8.4|
+----+------+-----------------+-------------+
only showing top 5 rows



In [13]:
# Remove nulls
df_load.dropna()
df_quake_freq.dropna()

DataFrame[Year: int, Counts: bigint, Avg_Magnitude: double, Max_Magnitude: double]

In [14]:
# Preview dataframes
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|      6.0|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake| 80.0|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake| 20.0|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake| 15.0|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake| 15.0|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [15]:
df_quake_freq.show(5)

+----+------+-----------------+-------------+
|Year|Counts|    Avg_Magnitude|Max_Magnitude|
+----+------+-----------------+-------------+
|1990|   196|5.858163265306125|          7.6|
|1975|   150| 5.84866666666667|          7.8|
|1977|   148|5.757432432432437|          7.6|
|2003|   187|5.850802139037435|          7.6|
|2007|   211| 5.89099526066351|          8.4|
+----+------+-----------------+-------------+
only showing top 5 rows



In [17]:
output_path = "/content/quake_data.csv"
df_load.write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(output_path)

print(f"DataFrame saved as CSV file at: {output_path}")

DataFrame saved as CSV file at: /content/quake_data.csv


In [19]:
output_path = "/content/quake_freq_data.csv"
df_quake_freq.write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(output_path)

print(f"df_quake_freq saved as CSV file at: {output_path}")

df_quake_freq saved as CSV file at: /content/quake_freq_data.csv


In [20]:
"""
Section: Machine Learning with Spark
"""

'\nSection: Machine Learning with Spark\n'

In [21]:
# Load data from the CSV file
file_path = "/content/quake_data.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Split the data into 80% training and 20% testing
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

In [22]:
# Preview the training and testing DataFrames
print("Training DataFrame:")
train_df.show()

Training DataFrame:
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/01/1967| -15.237| -173.608|Earthquake| 30.0|      6.5|            MW|ISCGEM839215|1967|
|01/01/1967|   -11.2|  165.416|Earthquake| 30.0|      5.9|            MW|ISCGEM839250|1967|
|01/01/1970|   -29.4| -177.169|Earthquake| 35.0|      5.6|            MW|ISCGEM799588|1970|
|01/01/1971|   -4.19|  141.183|Earthquake| 35.0|      6.0|            MW|ISCGEM787816|1971|
|01/01/1972| -17.021|  174.903|Earthquake| 10.0|      6.8|            MW|ISCGEM776618|1972|
|01/01/1975|  -4.932|  129.923|Earthquake| 20.0|      5.7|            MB|  USP00009BY|1975|
|01/01/1976| -28.949| -177.537|Earthquake| 50.0|      5.5|            MB|  USP0000E7J|1976|
|01/01/1976| -28.611| -177.638|Earthquake| 59.0|      6.2|  

In [23]:
print("Testing DataFrame:")
test_df.show()

Testing DataFrame:
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/01/1969|  51.096| -179.392|Earthquake| 45.0|      5.6|            MW|ISCGEM812771|1969|
|01/01/1973| -35.513|  -16.211|Earthquake| 33.0|      6.0|            MS|  USP0000004|1973|
|01/01/1975|  61.909| -149.738|Earthquake| 66.0|      5.9|            MB|  USP00009BD|1975|
|01/01/1977|  -7.885|  109.014|Earthquake|113.0|      5.7|            MB|  USP0000M10|1977|
|01/01/1983| -16.943|  -69.114|Earthquake|172.0|      6.3|            MW|  USP0001RXV|1983|
|01/01/1986|  19.282| -108.386|Earthquake| 10.0|      5.6|            MW|  USP0002PPD|1986|
|01/01/1995|  40.701|  143.549|Earthquake| 15.2|      6.5|           MWB|  USP0006QPV|1995|
|01/01/1997|  -0.127|  123.823|Earthquake|115.4|      5.8|   

In [24]:
# Select fields we will use and discard fields we don't need
df_test_clean = test_df['Date', 'Latitude', 'Longitude', 'Magnitude', 'Depth']
# Preview df_test_clean
df_test_clean.show(5)

+----------+--------+---------+---------+-----+
|      Date|Latitude|Longitude|Magnitude|Depth|
+----------+--------+---------+---------+-----+
|01/01/1969|  51.096| -179.392|      5.6| 45.0|
|01/01/1973| -35.513|  -16.211|      6.0| 33.0|
|01/01/1975|  61.909| -149.738|      5.9| 66.0|
|01/01/1977|  -7.885|  109.014|      5.7|113.0|
|01/01/1983| -16.943|  -69.114|      6.3|172.0|
+----------+--------+---------+---------+-----+
only showing top 5 rows



In [25]:
# Preview Schema
df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)



In [26]:
# Cast some string fields into numeric fields
df_test_clean = df_test_clean.withColumn('Latitude', df_test_clean['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_test_clean['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_test_clean['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_test_clean['Magnitude'].cast(DoubleType()))

In [27]:
df_test_clean.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Depth: double (nullable = true)



In [28]:
# Create training and testing dataframes
df_testing = df_test_clean['Latitude', 'Longitude', 'Magnitude', 'Depth']
df_training = train_df['Latitude', 'Longitude', 'Magnitude', 'Depth']

In [29]:
# Preview df_training
df_training.show(5)

+--------+---------+---------+-----+
|Latitude|Longitude|Magnitude|Depth|
+--------+---------+---------+-----+
| -15.237| -173.608|      6.5| 30.0|
|   -11.2|  165.416|      5.9| 30.0|
|   -29.4| -177.169|      5.6| 35.0|
|   -4.19|  141.183|      6.0| 35.0|
| -17.021|  174.903|      6.8| 10.0|
+--------+---------+---------+-----+
only showing top 5 rows



In [30]:
# Preview df_testing
df_testing.show(5)

+--------+---------+---------+-----+
|Latitude|Longitude|Magnitude|Depth|
+--------+---------+---------+-----+
|  51.096| -179.392|      5.6| 45.0|
| -35.513|  -16.211|      6.0| 33.0|
|  61.909| -149.738|      5.9| 66.0|
|  -7.885|  109.014|      5.7|113.0|
| -16.943|  -69.114|      6.3|172.0|
+--------+---------+---------+-----+
only showing top 5 rows



In [31]:
# Drop records with null values from our dataframes
df_testing = df_testing.dropna()
df_training = df_training.dropna()

In [32]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [33]:
# Select features to parse into our model and then create the feature vector
assembler = VectorAssembler(inputCols=['Latitude', 'Longitude', 'Depth'], outputCol='features')

# Create the Model
model_reg = RandomForestRegressor(featuresCol='features', labelCol='Magnitude')

# Chain the assembler with the model in a pipeline
pipeline = Pipeline(stages=[assembler, model_reg])

# Train the Model
model = pipeline.fit(df_training)

# Make the prediction
pred_results = model.transform(df_testing)

In [34]:
# Preview pred_results dataframe
pred_results.show(5)

+--------+---------+---------+-----+--------------------+-----------------+
|Latitude|Longitude|Magnitude|Depth|            features|       prediction|
+--------+---------+---------+-----+--------------------+-----------------+
|  51.096| -179.392|      5.6| 45.0|[51.096,-179.392,...|5.832700238333824|
| -35.513|  -16.211|      6.0| 33.0|[-35.513,-16.211,...|5.829089472289315|
|  61.909| -149.738|      5.9| 66.0|[61.909,-149.738,...|5.860084330908842|
|  -7.885|  109.014|      5.7|113.0|[-7.885,109.014,1...|5.851281509322248|
| -16.943|  -69.114|      6.3|172.0|[-16.943,-69.114,...|5.876063892064326|
+--------+---------+---------+-----+--------------------+-----------------+
only showing top 5 rows



In [35]:
# Evaluate the model
# rmse should be less than 0.5 for the model to be useful
evaluator = RegressionEvaluator(labelCol='Magnitude', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(pred_results)
print('Root Mean Squared Error (RMSE) on test data = %g' % rmse)

Root Mean Squared Error (RMSE) on test data = 0.417558


In [36]:
"""
Create the prediction dataset
"""

'\nCreate the prediction dataset\n'

In [37]:
# Create the prediction dataset
df_pred_results = pred_results['Latitude', 'Longitude', 'prediction']

# Rename the prediction field
df_pred_results = df_pred_results.withColumnRenamed('prediction', 'Pred_Magnitude')

# Add more columns to our prediction dataset
df_pred_results = df_pred_results.withColumn('Year', lit(2017))\
    .withColumn('RMSE', lit(rmse))

# Preview df_pred_results
df_pred_results.show(5)

+--------+---------+-----------------+----+------------------+
|Latitude|Longitude|   Pred_Magnitude|Year|              RMSE|
+--------+---------+-----------------+----+------------------+
|  51.096| -179.392|5.832700238333824|2017|0.4175577273202706|
| -35.513|  -16.211|5.829089472289315|2017|0.4175577273202706|
|  61.909| -149.738|5.860084330908842|2017|0.4175577273202706|
|  -7.885|  109.014|5.851281509322248|2017|0.4175577273202706|
| -16.943|  -69.114|5.876063892064326|2017|0.4175577273202706|
+--------+---------+-----------------+----+------------------+
only showing top 5 rows



In [38]:
# Save the prediction results DataFrame to a CSV file
output_path = "/content/pred_results.csv"
df_pred_results.write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(output_path)

print(f"Prediction results saved as CSV file at: {output_path}")


Prediction results saved as CSV file at: /content/pred_results.csv
