In [1]:
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, second, col, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import PolynomialExpansion

In [2]:
spark = SparkSession.builder \
.master("spark://10.129.5.252:7077") \
.appName("USGS-ML") \
.getOrCreate()

In [3]:
df = spark.read.option("delimiter", ";").csv("RAW1.csv", header=True)

In [4]:
df = df.withColumn("year", year("time")) \
    .withColumn("month", month("time")) \
    .withColumn("day", dayofmonth("time")) \
    .withColumn("hour", hour("time")) \
    .withColumn("minute", minute("time")) \
    .withColumn("second", second("time")) \
    .withColumn("depth", col("depth").cast("double")) \
    .withColumn("mag", col("mag").cast("double")) \
    .withColumn("latitude", col("latitude").cast("double")) \
    .withColumn("longitude", col("longitude").cast("double"))
df.printSchema()

root
 |-- time: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- depth: double (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- nst: string (nullable = true)
 |-- gap: string (nullable = true)
 |-- dmin: string (nullable = true)
 |-- rms: string (nullable = true)
 |-- net: string (nullable = true)
 |-- id: string (nullable = true)
 |-- updated: string (nullable = true)
 |-- place: string (nullable = true)
 |-- type: string (nullable = true)
 |-- horizontalError: string (nullable = true)
 |-- depthError: string (nullable = true)
 |-- magError: string (nullable = true)
 |-- magNst: string (nullable = true)
 |-- status: string (nullable = true)
 |-- locationSource: string (nullable = true)
 |-- magSource: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- 

In [5]:
df = df.filter((col("depth").isNotNull()) & (col("mag").isNotNull()))

In [6]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [7]:
# Select features and target variable
feature_cols = ['latitude', 'longitude', 'depth']  
target_col = 'mag'

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol='xx')
data = assembler.transform(train_data).select('xx', target_col)

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=10)

poly_expansion = PolynomialExpansion(inputCol='xx', outputCol='xx_poly', degree=2)

# Feature Scaling
scaler = StandardScaler(inputCol='xx_poly', outputCol='features', withStd=True, withMean=True)

# Linear Regression model
lr = LinearRegression(featuresCol='features', labelCol=target_col)

# Pipeline
pipeline = Pipeline(stages=[poly_expansion, scaler, lr])

# Hyperparameter grid for regularization strength (alpha)
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01, 0.001]) \
    .build()

# Cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=RegressionEvaluator(labelCol=target_col, predictionCol='prediction', metricName='rmse'),
                          numFolds=2)

# Train the model
cv_model = crossval.fit(train_data)

# Make predictions on the test data
predictions = cv_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) after optimization:", rmse)

Root Mean Squared Error (RMSE) after optimization: 0.4015175335524103


In [8]:
from pyspark.ml.regression import GeneralizedLinearRegression, IsotonicRegression, LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

# features and target column
feature_cols = ['latitude', 'longitude', 'depth', 'year', 'month']
target_col = 'mag'

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol='xx')
data = assembler.transform(df).select('xx', target_col)

train_data, test_data = data.randomSplit([0.8, 0.2], seed=10)

#regression models
models = [
    LinearRegression(featuresCol='xx', labelCol=target_col),
    GeneralizedLinearRegression(featuresCol='xx', labelCol=target_col),
    IsotonicRegression(featuresCol='xx', labelCol=target_col),
    DecisionTreeRegressor(featuresCol='xx', labelCol=target_col),
    RandomForestRegressor(featuresCol='xx', labelCol=target_col),
    GBTRegressor(featuresCol='xx', labelCol=target_col)
]

# Train and evaluate each model
for model in models:
    # Train the model
    model_name = model.__class__.__name__
    print(f"Training {model_name}...")
    model = model.fit(train_data)
    
    # Make predictions on the test data
    predictions = model.transform(test_data)
    
    # Evaluate the model
    evaluator = RegressionEvaluator(labelCol=target_col, predictionCol='prediction', metricName='rmse')
    rmse = evaluator.evaluate(predictions)
    
    # Print the RMSE
    print(f"{model_name} RMSE: {rmse}")

Training LinearRegression...
LinearRegression RMSE: 0.3956456412135186
Training GeneralizedLinearRegression...
GeneralizedLinearRegression RMSE: 0.3956456412135186
Training IsotonicRegression...
IsotonicRegression RMSE: 0.39906875309894774
Training DecisionTreeRegressor...
DecisionTreeRegressor RMSE: 0.3903016692979934
Training RandomForestRegressor...
RandomForestRegressor RMSE: 0.39041635465500457
Training GBTRegressor...
GBTRegressor RMSE: 0.385958922169134


In [9]:
gbt = GBTRegressor(featuresCol='xx', labelCol=target_col)

# Define the parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10, 15]) \
    .addGrid(gbt.maxIter, [10, 20, 30]) \
    .build()

# Define the evaluator
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol='prediction', metricName='rmse')

# Create CrossValidator
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters.
cv_model = crossval.fit(train_data)

# Make predictions on test data
predictions = cv_model.transform(test_data)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) after cross-validation:", rmse)

In [None]:
predictions = cv_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) after optimization:", rmse)

# Now, let's use the test data split from the original dataset to make predictions and evaluate each trained model

# Make predictions using each trained model on the original test data split
for model in models:
    model_name = model.__class__.__name__
    print(f"\n{model_name} Predictions:")
    
    # Make predictions
    predictions = model.transform(test_data)
    
    # Evaluate the predictions
    evaluator = RegressionEvaluator(labelCol=target_col, predictionCol='prediction', metricName='rmse')
    rmse = evaluator.evaluate(predictions)
    
    # Print RMSE
    print(f"RMSE: {rmse}")