In [1]:
sc.install_pypi_package("numpy")
import numpy as np

# Importing spark features
from pyspark.sql.functions import to_timestamp, year, month, dayofmonth, when
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Needed to add .config since I had timestamps before 1900-01-01T00:00:00Z
spark = SparkSession.builder.appName("Earthquake") \
                            .config("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY") \
                            .config("spark.sql.catalogImplementation", "hive") \
                            .enableHiveSupport() \
                            .getOrCreate()
spark.sparkContext

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4,application_1718292993751_0005,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting numpy
  Downloading numpy-1.26.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.2 MB)
Installing collected packages: numpy
Successfully installed numpy-1.26.4

<SparkContext master=yarn appName=livy-session-4>

In [2]:
s3_path = "s3a://csc555earthquake-final-project/Earthquake Data/*.csv"

# Combines all csv files (EarthquakeData1.csv - EarthquakeData51.csv) into one data frame
earthquake_df = spark.read.csv(s3_path, header=True, inferSchema=True)
earthquake_df.printSchema()
earthquake_df.show(5, vertical = True, truncate = False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- time: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- depth: double (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- nst: integer (nullable = true)
 |-- gap: double (nullable = true)
 |-- dmin: double (nullable = true)
 |-- rms: double (nullable = true)
 |-- net: string (nullable = true)
 |-- id: string (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- place: string (nullable = true)
 |-- type: string (nullable = true)
 |-- horizontalError: double (nullable = true)
 |-- depthError: double (nullable = true)
 |-- magError: double (nullable = true)
 |-- magNst: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- locationSource: string (nullable = true)
 |-- magSource: string (nullable = true)

-RECORD 0----------------------------------------------------
 time            | 2024-06-13 09:48:59.841                   
 latitude        | 6

In [3]:
# Count and Print number of rows to ensure everything is good

row_count = earthquake_df.count()
print(f"The number of rows in the earthquake_df is: {row_count}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The number of rows in the earthquake_df is: 1007810

In [4]:
# Cleaning Dataset: Dropping duplicate rows

cleaned_earthquake_df = earthquake_df.dropDuplicates()
row_count_2 = cleaned_earthquake_df.count()
print(f"The number of rows in the cleaned_earthquake_df is: {row_count_2}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The number of rows in the cleaned_earthquake_df is: 1007810

In [5]:
# Writing cleaned data into new S3 folder, for Amazon Athena

cleaned_data_path = "s3a://csc555earthquake-final-project/cleaned-earthquake-data/"
cleaned_earthquake_df.write.parquet(cleaned_data_path, mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Returning results from storing data in Athena

s3__athena_query_path = "s3://csc555earthquake-final-project/athena-query-results/Unsaved/2024/06/13/8f8cba00-45a0-439f-8fd1-e1254f4c009a.csv"
earthquake_data = spark.read.csv(s3_path, header=True, inferSchema=True)
earthquake_data.show(5, vertical = True, truncate = False)
earthquake_data.count() # Should be equal to 1007810

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0----------------------------------------------------
 time            | 2024-06-13 09:48:59.841                   
 latitude        | 66.5868                                   
 longitude       | -150.7921                                 
 depth           | 4.5                                       
 mag             | 2.7                                       
 magType         | ml                                        
 nst             | NULL                                      
 gap             | NULL                                      
 dmin            | NULL                                      
 rms             | 0.84                                      
 net             | ak                                        
 id              | ak0247l1bkwu                              
 updated         | 2024-06-13 09:51:50.239                   
 place           | 48 km SE of Bettles, Alaska               
 type            | earthquake                                
 horizon

In [7]:
# Change time column to have seperate year, month, and dayofmonth

earthquake_data = earthquake_data.withColumn("year", year("time"))
earthquake_data = earthquake_data.withColumn("month", month("time"))
earthquake_data = earthquake_data.withColumn("dayofmonth", dayofmonth("time"))

earthquake_data.select("time", "year", "month", "dayofmonth").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----+-----+----------+
|                time|year|month|dayofmonth|
+--------------------+----+-----+----------+
|2024-06-13 09:48:...|2024|    6|        13|
|2024-06-13 08:56:...|2024|    6|        13|
|2024-06-13 07:40:...|2024|    6|        13|
|2024-06-13 07:05:...|2024|    6|        13|
|2024-06-13 06:28:...|2024|    6|        13|
+--------------------+----+-----+----------+
only showing top 5 rows

In [8]:
# Create classification names

earthquake_data = earthquake_data.withColumn("category",
    when(earthquake_data.mag < 4.0, "minor")
    .when((earthquake_data.mag >= 4.0) & (earthquake_data.mag < 6.0), "light")
    .when((earthquake_data.mag >= 6.0) & (earthquake_data.mag < 7.0), "moderate")
    .otherwise("strong")
)

earthquake_data.show(3, vertical = True, truncate = False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0-----------------------------------------------
 time            | 2024-06-13 09:48:59.841              
 latitude        | 66.5868                              
 longitude       | -150.7921                            
 depth           | 4.5                                  
 mag             | 2.7                                  
 magType         | ml                                   
 nst             | NULL                                 
 gap             | NULL                                 
 dmin            | NULL                                 
 rms             | 0.84                                 
 net             | ak                                   
 id              | ak0247l1bkwu                         
 updated         | 2024-06-13 09:51:50.239              
 place           | 48 km SE of Bettles, Alaska          
 type            | earthquake                           
 horizontalError | NULL                                 
 depthError      | 0.4         

In [9]:
# Index the label column
indexer = StringIndexer(inputCol="category", outputCol="label")
indexer_model = indexer.fit(earthquake_data)
indexed_data = indexer.fit(earthquake_data).transform(earthquake_data)

feature_columns = ["latitude", "longitude", "depth", "nst", "gap", "dmin", "rms", "year", "month", "dayofmonth"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="skip")
prepared_data = assembler.transform(indexed_data)

# Show a sample of the prepared data
prepared_data.select("features", "label").show(5, truncate = False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------------------------------------------------------------------+-----+
|features                                                                                    |label|
+--------------------------------------------------------------------------------------------+-----+
|[38.8569984436035,-123.012496948242,5.26000022888184,55.0,54.0,0.05301,0.08,2024.0,6.0,13.0]|1.0  |
|[49.5568,153.4488,192.677,72.0,134.0,4.443,0.5,2024.0,6.0,13.0]                             |0.0  |
|[55.485,162.715,49.112,117.0,94.0,3.43,0.9,2024.0,6.0,13.0]                                 |0.0  |
|[-24.2194,179.9965,512.568,33.0,81.0,5.338,0.75,2024.0,6.0,13.0]                            |0.0  |
|[-18.91,-172.7208,10.0,34.0,100.0,2.648,1.64,2024.0,6.0,13.0]                               |0.0  |
+--------------------------------------------------------------------------------------------+-----+
only showing top 5 rows

In [10]:
# Split the data into training and test sets
train_data, test_data = prepared_data.randomSplit([0.8, 0.2], seed=1242002)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Train the Random Forest model
dt = RandomForestClassifier(labelCol="label", featuresCol="features", probabilityCol="probability")
dt_model = dt.fit(train_data)

# Make predictions
predictions = dt_model.transform(test_data)

label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=indexer_model.labels)
converted_predictions = label_converter.transform(predictions)

converted_predictions.select("features", "probability", "prediction", "predictedLabel").show(5, vertical=True, truncate = False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0----------------------------------------------------------------------------------------------
 features       | [19.3721,-64.6825,83.0,3.0,346.0,1.6951,0.35,2018.0,6.0,1.0]                         
 probability    | [0.04584271759964734,0.9538616676077097,2.7520678994681863E-4,2.0408002696123674E-5] 
 prediction     | 1.0                                                                                  
 predictedLabel | minor                                                                                
-RECORD 1----------------------------------------------------------------------------------------------
 features       | [19.4025002,-155.2716675,1.62,13.0,79.0,0.0068,0.15,2018.0,6.0,1.0]                  
 probability    | [0.034568207946584675,0.9651473286467779,2.446227891041416E-4,3.984061753323502E-5]  
 prediction     | 1.0                                                                                  
 predictedLabel | minor                                         

In [12]:
# Probability of Earthquakes at Shinjuku, Tokyo, Japan by June 13, 2028

new_data = spark.createDataFrame([
    (35.6895, 139.6917, 10.0, 30, 90.0, 0.5, 0.1, 2028, 6, 13)
], ["latitude", "longitude", "depth", "nst", "gap", "dmin", "rms", "year", "month", "dayofmonth"])

new_data_prepared = assembler.transform(new_data)
new_predictions = dt_model.transform(new_data_prepared)
new_converted_predictions = label_converter.transform(new_predictions)
new_converted_predictions.select("features", "probability", "predictedLabel").show(vertical=True, truncate=False)

# The most likely category for the biggest earthquake at Shinjuku by June 13, 2028 will be a light one (4.0 and 6.0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0--------------------------------------------------------------------------------------------
 features       | [35.6895,139.6917,10.0,30.0,90.0,0.5,0.1,2028.0,6.0,13.0]                          
 probability    | [0.9179719324878892,0.07687383560140351,0.004361845088357927,7.923868223493101E-4] 
 predictedLabel | light

In [13]:
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")

# Evaluate other metrics
precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision").evaluate(predictions)
recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall").evaluate(predictions)
f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1").evaluate(predictions)
print(f"Precision = {precision}")
print(f"Recall = {recall}")
print(f"F1 Score = {f1}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test Accuracy = 0.9642747237375857
Precision = 0.9632559857810964
Recall = 0.9642747237375856
F1 Score = 0.9625111307856469