Linear Regression PySpark ML

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize SparkSession
spark = SparkSession.builder \
                    .appName("LinearRegressionExample") \
                    .master("local[*]") \
                    .config("spark.executor.memory", "4g") \
                    .config("spark.driver.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
                    .getOrCreate()

spark

In [3]:
df = spark.read.csv("/content/Diabetes.csv", header=True, inferSchema=True)
df.show()
print(df.count())
print(df.rdd.getNumPartitions())

+---+-----------+-----------+-----------+------------+------+-----------+-----------------+---------------------------+
| id|   hrv_mean|    hrv_std|hrv_entropy|  experiment|  Task|Temperature|Thermal sensation|Personal Thermal Assessment|
+---+-----------+-----------+-----------+------------+------+-----------+-----------------+---------------------------+
|  1|831.6831683|252.2799922|4.572689459|Experiment 8|Typing|         27|    slightly warm|                    neutral|
|  1|834.5588235|252.6982997|4.582561852|Experiment 8|Typing|         27|    slightly warm|                    neutral|
|  1|834.5588235|252.6982997|4.582561852|Experiment 8|Typing|         27|    slightly warm|                    neutral|
|  1|834.4678218| 253.944537|4.572285578|Experiment 8|Typing|         27|    slightly warm|                    neutral|
|  1|834.4678218| 253.944537|4.572285578|Experiment 8|Typing|         27|    slightly warm|                    neutral|
|  1|834.4678218| 253.944537|4.572285578

In [None]:
df = df.repartition(4)
df.rdd.getNumPartitions()


StringIndexer

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'sex', outputCol = 'gender')

df = indexer.fit(df).transform(df)

df.show(10)

Vector Assembler

In [None]:
assembler = VectorAssembler(inputCols = ["age", "gender", "bmi","bp","tc","ldl","hdl","tch","ltg","glu"],
                            outputCol = 'features')

data = assembler.transform(df)

data = data.select('features', 'progression')
data.show(5, truncate = False)

StandardScaler

In [None]:
scaler = StandardScaler(inputCol = 'features', outputCol = 'scaledFeatures')
#scaler = MinMaxScaler(inputCol = 'features', outputCol = 'scaledFeatures')

scaler_model = scaler.fit(data) # compute the mean & std from data and store the necessary scaling parameters.
data = scaler_model.transform(data) # scale the features of rhe sata using parameteres learned during fit() step

data = data.select('scaledFeatures', 'progression')
data.show(5, truncate = False)

In [None]:
# split data
train_data, test_data = data.randomSplit([0.7, 0.3], seed = 42)
train_data.show(5, truncate = False)

Apply linear regression model

In [None]:
lr = LinearRegression(labelCol = 'progression', featuresCol = 'scaledFeatures', predictionCol = 'prediction')

# Fit the model to the training data
lr_model = lr.fit(train_data)

# Make prediction on the test data
lr_predictions = lr_model.transform(test_data)

In [None]:
ions = lr_model.transform(test_data)

lr_predictions.select('prediction', 'progression').show(10, truncate = False)

In [None]:
# Access the coefficients and intecept of the model
coefficients = lr_model.coefficients
intercept = lr_model.intercept

print(coefficients)
print(intercept)

Apply Lasso and Ridge regression models

In [None]:
# Laaso
lasso = LinearRegression(labelCol = 'progression', featuresCol = 'scaledFeatures',
                         predictionCol = 'prediction', elasticNetParam = 1.0, regParam = 0.15) # lambda: tuning
lasso_model = lasso.fit(train_data)
lasso_predictions = lasso_model.transform(test_data)

# Ridge
ridge = LinearRegression(labelCol = 'progression', featuresCol = 'scaledFeatures',
                         predictionCol = 'prediction', elasticNetParam = 0.0, regParam = 0.15) # lambda: tuning
ridge_model = ridge.fit(train_data)
ridge_predictions = ridge_model.transform(test_data)

In [None]:
lasso_predictions.select('prediction', 'progression').show(10, truncate = False)

Evaluate the models and visualize the results

In [None]:
evaluator_mse = RegressionEvaluator(labelCol = 'progression', predictionCol = 'prediction', metricName = 'mse')
# calculate MSE
mse1 = evaluator_mse.evaluate(lr_predictions)
mse2 = evaluator_mse.evaluate(lasso_predictions)
mse3 = evaluator_mse.evaluate(ridge_predictions)

evaluator_rmse = RegressionEvaluator(labelCol = 'progression', predictionCol = 'prediction', metricName = 'rmse')
# calculate RMSE
rmse1 = evaluator_rmse.evaluate(lr_predictions)
rmse2 = evaluator_rmse.evaluate(lasso_predictions)
rmse3 = evaluator_rmse.evaluate(ridge_predictions)

evaluator_r2 = RegressionEvaluator(labelCol = 'progression', predictionCol = 'prediction', metricName = 'r2')
# calculate R_squared
r2_score1 = evaluator_r2.evaluate(lr_predictions)
r2_score2 = evaluator_r2.evaluate(lasso_predictions)
r2_score3 = evaluator_r2.evaluate(ridge_predictions)

In [None]:
# print the evaluation metrics
print('Regression - MSE: ', mse1, ', RMSE: ', rmse1, ', R^2: ', r2_score1)
print('Lasso - MSE: ', mse2, ', RMSE: ', rmse2, ', R^2: ', r2_score2)
print('Ridge - MSE: ', mse3, ', RMSE: ', rmse3, ', R^2: ', r2_score3)

In [None]:

# plot
import matplotlib.pyplot as plt
import numpy as np

mse = [mse1, mse2, mse3]
rmse = [rmse1, rmse2, rmse3]
r2_score = [r2_score1, r2_score2, r2_score3]

positions = np.arange(len(mse))
bar_width = 0.2

plt.bar(positions - bar_width, mse, width = bar_width, label = 'MSE')
plt.bar(positions, rmse, width = bar_width, label = 'RMSE')
plt.bar(positions + bar_width, r2_score, width = bar_width, label = 'R2_Score')

# adding labels and title
plt.xlabel('Model')
plt.ylabel('Scores')
plt.title('Comparison of Regression Metrics')

# adding the legend
plt.legend()
plt.xticks(positions, ['Regression', 'Lasso', 'Ridge'])
plt.show()