**Create entry points to spark**

In [1]:
!pip install pyspark
import pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 53.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=8cea8d5573172b6b912f6a5e740721172e5e037a241cb98b4355979f15329a49
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Advertising').getOrCreate()

# Linear regression without cross-valiation

**importing data**

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
ad = spark.read.csv('/content/drive/MyDrive/0.MKCE/Datasets/Advertising.csv', 
                    header=True, 
                    inferSchema=True)
ad.show(5)

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows



# Transform data structure

In [6]:
from pyspark.ml.linalg import Vectors

ad_df = ad.rdd.map(lambda x: [Vectors.dense(x[0:3]), 
                              x[-1]]).toDF(['features', 
                                            'label'])

ad_df.show(5)

+----------------+-----+
|        features|label|
+----------------+-----+
|[1.0,230.1,37.8]| 22.1|
| [2.0,44.5,39.3]| 10.4|
| [3.0,17.2,45.9]|  9.3|
|[4.0,151.5,41.3]| 18.5|
|[5.0,180.8,10.8]| 12.9|
+----------------+-----+
only showing top 5 rows



# Build linear regression model

In [7]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', 
                      labelCol = 'label')

In [8]:
lr_model = lr.fit(ad_df)

In [9]:
pred = lr_model.transform(ad_df)
pred.show(5)

+----------------+-----+------------------+
|        features|label|        prediction|
+----------------+-----+------------------+
|[1.0,230.1,37.8]| 22.1| 20.60502920567592|
| [2.0,44.5,39.3]| 10.4|12.392493270268307|
| [3.0,17.2,45.9]|  9.3|12.381882445789143|
|[4.0,151.5,41.3]| 18.5|  17.6636423673972|
|[5.0,180.8,10.8]| 12.9|13.277141826096058|
+----------------+-----+------------------+
only showing top 5 rows



In [10]:
from pyspark.ml.evaluation import RegressionEvaluator 

evaluator = RegressionEvaluator(predictionCol='prediction', 
                                labelCol='label')

evaluator.setMetricName('r2').evaluate(pred)

0.8972276882820611

# Linear regression with cross-validation

# Training and test datasets

In [11]:
training, test = ad_df.randomSplit([0.8, 0.2], 
                                   seed=123)

Build cross-validation model

In [12]:
##=====build cross valiation model======

# estimator
lr = LinearRegression(featuresCol = 'features', 
                      labelCol = 'label')

# parameter grid
from pyspark.ml.tuning import ParamGridBuilder

param_grid = ParamGridBuilder().\
    addGrid(lr.regParam, [0, 0.5, 1]).\
    addGrid(lr.elasticNetParam, [0, 0.5, 1]).\
    build()
    
# evaluator
evaluator = RegressionEvaluator(predictionCol='prediction',
                                labelCol='label', 
                                metricName='r2')

# cross-validation model
from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(estimator=lr, 
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator, 
                    numFolds=4)

Fit cross-validation model

In [13]:
cv_model = cv.fit(training)

# Prediction

In [14]:
pred_training_cv = cv_model.transform(training)

pred_test_cv = cv_model.transform(test)



# Evaluation

In [15]:
evaluator.setMetricName('r2').evaluate(pred_training_cv)

0.8840866981253278

In [16]:
evaluator.setMetricName('r2').evaluate(pred_test_cv)


0.9067601533925276

# Intercept and coefficients

In [17]:
print('Intercept: ', cv_model.bestModel.intercept, "\n",
     'coefficients: ', cv_model.bestModel.coefficients)

Intercept:  4.0638796100866585 
 coefficients:  [-0.0021775996416919206,0.042199302330198106,0.16825850988073168]


# Get parameter values from the best model

**Parameters can be extracted by calling the java property.**

In [18]:
print('best regParam: ' + str(cv_model.bestModel._java_obj.getRegParam()) + "\n" +
     'best ElasticNetParam:' + str(cv_model.bestModel._java_obj.getElasticNetParam()))

best regParam: 0.5
best ElasticNetParam:0.0
