In [None]:

file_path = "dbfs:/FileStore/shared_uploads/fadliazhar66@gmail.com/data/regression/"

# Reading all Parquet files in a directory
df = spark.read.parquet(file_path)

# Displaying the schema of the dataframe
df.printSchema()

# Displaying some rows of data
df.show()


root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)

+--------------+-----+
|      features|label|
+--------------+-----+
|[3.0,10.1,3.0]|  2.0|
| [2.0,1.1,1.0]|  1.0|
|[1.0,0.1,-1.0]|  0.0|
|[1.0,0.1,-1.0]|  0.0|
| [2.0,4.1,1.0]|  2.0|
+--------------+-----+



In [None]:
# This Python code uses the PySpark library for machine learning, particularly the `LinearRegression` module, to create a linear regression model. It sets specific parameters for the linear regression model, such as the maximum number of iterations (`setMaxIter`), regularization parameter (`setRegParam`), and elastic net mixing parameter (`setElasticNetParam`). The `explainParams()` method is then used to print a summary of the parameters and their current values. Finally, the linear regression model is fitted to a DataFrame `df` using the `fit` method, and the resulting model is stored in the variable `lrModel`. The model is trained on the data in the DataFrame, and the parameters specified earlier influence the training process.
from pyspark.ml.regression import LinearRegression

lr = LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
print(lr.explainParams())  
lrModel = lr.fit(df)

In [None]:
# In this Python code, the `lrModel` variable holds a trained linear regression model. By accessing its summary, various insights about the model's performance are obtained. The `summary.residuals.show()` line displays the residuals, representing the differences between the predicted and actual values from the model on the dataset. The `summary.totalIterations` prints the total number of iterations performed during the model training. The `summary.objectiveHistory` provides a history of objective function values at each iteration during training. The `summary.rootMeanSquaredError` gives the root mean squared error, a measure of the model's prediction accuracy. Finally, `summary.r2` prints the R-squared value, indicating how well the model explains the variance in the data. These summary statistics collectively offer a comprehensive overview of the linear regression model's characteristics and performance on the given dataset.
summary = lrModel.summary
summary.residuals.show()
print (summary.totalIterations)
print (summary.objectiveHistory)
print (summary.rootMeanSquaredError)
print (summary.r2)


+--------------------+
|           residuals|
+--------------------+
| 0.12805046585610147|
| -0.1446826926157201|
|-0.41903832622420606|
|-0.41903832622420606|
|  0.8547088792080306|
+--------------------+

5
[0.5000000000000001, 0.43152958103627864, 0.313233593388102, 0.312256926665541, 0.30915060819830303, 0.30915058933480266]
0.47308424392175985
0.720239122691221


In [None]:
# This Python code utilizes PySpark's machine learning library for generalized linear regression (GLR). The `GeneralizedLinearRegression` module is employed to create a GLR model. The specific settings for the model are defined, such as using a Gaussian family (`setFamily("gaussian")`), an identity link function (`setLink("identity")`), a maximum number of iterations (`setMaxIter(10)`), regularization parameter (`setRegParam(0.3)`), and the specification of a column for link prediction (`setLinkPredictionCol("linkOut")`). The `explainParams()` method is then used to print a summary of the model's parameters and their values. Subsequently, the GLR model is fitted to a DataFrame (`df`) using the `fit` method, resulting in the trained model stored in the variable `glrModel`. The parameters specified earlier influence the training process, determining the family of the distribution, link function, regularization, and other characteristics of the generalized linear regression model.
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression()\
 .setFamily("gaussian")\
 .setLink("identity")\
 .setMaxIter(10)\
 .setRegParam(0.3)\
 .setLinkPredictionCol("linkOut")
print (glr.explainParams())
glrModel = glr.fit(df)

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
family: The name of family which is a description of the error distribution to be used in the model. Supported options: gaussian (default), binomial, poisson, gamma and tweedie. (default: gaussian, current: gaussian)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
link: The name of link function which provides the relationship between the linear predictor and the mean of the distribution function. Supported options: identity, log, inverse, logit, probit, cloglog and sqrt. (current: identity)
linkPower: The index in the power link function. Only applicable to the Tweedie family. (undefined)
linkPredictionCol: link prediction (linear predictor) column name (current: linkOut)
maxIter: max number of iterations (>= 0). (default: 25, current: 10)
offsetCol: The offset column name. If this is not set or em

In [None]:
# This code employs PySpark's machine learning library for decision tree regression. The `DecisionTreeRegressor` module is utilized to create a decision tree regression model. The default settings for the model are used. The `explainParams()` method is employed to print a summary of the model's parameters and their default values. Subsequently, the decision tree regression model is fitted to a DataFrame (`df`) using the `fit` method, resulting in the trained model stored in the variable `dtrModel`. The parameters can be further customized to influence the training process, but in this instance, the default settings are utilized. The decision tree regression model aims to predict a continuous target variable based on the features in the provided DataFrame.
from pyspark.ml.regression import DecisionTreeRegressor
dtr = DecisionTreeRegressor()
print (dtr.explainParams())
dtrModel = dtr.fit(df)

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featuresCol: features column name. (default: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: variance (default: variance)
labelCol: label column name. (default: label)
leafCol: Leaf indices column name. Predicted leaf index of each instance in each tree by preorder. (default: )
maxBins: Max number of bins for discretizing continuous features.  Must be >

In [None]:
# This code involves PySpark's machine learning library for random forest regression and gradient-boosted tree regression. First, the `RandomForestRegressor` module is utilized to create a random forest regression model with default settings. The `explainParams()` method is then used to print a summary of the model's parameters and their default values. The random forest regression model is subsequently fitted to a DataFrame (`df`), resulting in the trained model stored in the variable `rfModel`. Afterward, the code employs the `GBTRegressor` module to create a gradient-boosted tree regression model with default settings. Similarly, the `explainParams()` method is used to print a summary of the gradient-boosted tree model's parameters and their default values. The gradient-boosted tree regression model is then fitted to the same DataFrame, and the resulting trained model is stored in the variable `gbtModel`. Both random forest and gradient-boosted tree regression models aim to predict a continuous target variable based on the features in the provided DataFrame, with the option to customize various hyperparameters for enhanced performance.
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
rf = RandomForestRegressor()
print (rf.explainParams())
rfModel = rf.fit(df)
gbt = GBTRegressor()
print (gbt.explainParams())
gbtModel = gbt.fit(df)

bootstrap: Whether bootstrap samples are used when building trees. (default: True)
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the featur

In [None]:
# This Python code involves PySpark's machine learning library to perform cross-validated hyperparameter tuning for a generalized linear regression (GLR) model 
# using the `CrossValidator` and `RegressionEvaluator`. First, a GLR model is instantiated with specified settings for family and link. A pipeline is then created, including the GLR model. Hyperparameter options for regularization are defined using `ParamGridBuilder`. The `RegressionEvaluator` is configured to use root mean squared error (RMSE) as the evaluation metric. The `CrossValidator` is set up with the pipeline, evaluator, hyperparameter grid, and the number of folds for cross-validation. The `fit` method is then called on the cross-validator, using the provided DataFrame (`df`), resulting in the trained model stored in the variable `model`. This approach allows for the systematic testing of different regularization parameters in the generalized linear regression model and the selection of the optimal configuration based on cross-validated performance.
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
glr = GeneralizedLinearRegression().setFamily("gaussian").setLink("identity")
pipeline = Pipeline().setStages([glr])
params = ParamGridBuilder().addGrid(glr.regParam, [0, 0.5, 1]).build()
evaluator = RegressionEvaluator()\
 .setMetricName("rmse")\
 .setPredictionCol("prediction")\
 .setLabelCol("label")
cv = CrossValidator()\
 .setEstimator(pipeline)\
 .setEvaluator(evaluator)\
 .setEstimatorParamMaps(params)\
 .setNumFolds(2) 
model = cv.fit(df)

In [None]:
# This Python code utilizes PySpark's `RegressionMetrics` from `pyspark.mllib.evaluation` to compute various regression evaluation metrics for the model's 
# predictions on the given DataFrame (`df`). The `model.transform(df)` applies the trained model to the DataFrame, and the subsequent `select` statement extracts the "prediction" and "label" columns. The resulting DataFrame is then converted to an RDD (Resilient Distributed Dataset), where each row is transformed into a tuple of floating-point values. These tuples represent the predicted and actual values. The `RegressionMetrics` class is then applied to this RDD, and various regression metrics are computed. The metrics include the mean squared error (`metrics.meanSquaredError`), root mean squared error (`metrics.rootMeanSquaredError`), R-squared (`metrics.r2`), mean absolute error (`metrics.meanAbsoluteError`), and explained variance (`metrics.explainedVariance`). These metrics provide a comprehensive assessment of the model's performance in terms of prediction accuracy, goodness of fit, and variability explained.

from pyspark.mllib.evaluation import RegressionMetrics
out = model.transform(df)\
 .select("prediction", "label").rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = RegressionMetrics(out)
print ("MSE: " + str(metrics.meanSquaredError))
print ("RMSE: " + str(metrics.rootMeanSquaredError))
print ("R-squared: " + str(metrics.r2))
print ("MAE: " + str(metrics.meanAbsoluteError))
print ("Explained variance: " + str(metrics.explainedVariance))



MSE: 0.15705521472392636
RMSE: 0.39630192369445594
R-squared: 0.803680981595092
MAE: 0.3141104294478528
Explained variance: 0.6429447852760728
