In [18]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import datetime


spark = SparkSession.builder.appName("lr_model_spark").getOrCreate()

# data = spark.range(0, 100).select("id")
# data = data.withColumn("feature1", rand(seed=42))
# data = data.withColumn("feature2", rand(seed=27))
# data = data.withColumn("label", data["feature1"] * 2 + data["feature2"] * 3)

# # https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html
# # multiple columns into a vector column
# assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
# data = assembler.transform(data)

## REMARK: if we had data then we could read it in like
# https://spark.apache.org/docs/1.5.2/ml-linear-methods.html
# from pyspark.mllib.util import MLUtils

# # Load training data
# training = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()

# number of features, how intensive do we want the job?
num_features = 4

# make your dataframe and give yourself ample columns
# 1 column for target, one for feature vector, one extra
data = spark.range(0, num_features+3).select("id")


# now we can create feature columns dynamically
for i in range(num_features):
    seed = i  # keep track of seed values for reproducibility
    data = data.withColumn(f"feature{i+1}", rand(seed=seed))

# make our target
# target = feature_1 * 1 + feature_2 * 2 + ... + feature_n *n
data = data.withColumn("label", sum(data[f"feature{i+1}"] * (i+1) for i in range(num_features)))

# https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html
# multiple columns into a vector column
feature_columns = [f"feature{i+1}" for i in range(num_features)]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# training and test sets
train_data, test_data = data.randomSplit([0.7, 0.3])

# elastic net regularized linear regression model
# https://spark.apache.org/docs/1.5.2/ml-linear-methods.html
lr = LinearRegression(featuresCol='features', labelCol='label', regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)

# Print the weights and intercept for linear regression
# note that lr_model.weights is outdated
print("Weights: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

predictions = lr_model.transform(test_data)

# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.RegressionEvaluator.html
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
# the average difference between a statistical model’s predicted values and the actual values. 
# Mathematically, it is the standard deviation of the residuals. 
# Residuals represent the distance between the regression line and the data points.
# https://statisticsbyjim.com/regression/root-mean-square-error-rmse/
print("Root Mean Squared Error (RMSE) on test data =", rmse)


# how to store the model, pass ing a file_path
model_file_path = f"lr_model{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
lr_model.save(model_file_path)

spark.stop()


AttributeError: 'LinearRegressionModel' object has no attribute 'weights'