<a href="https://colab.research.google.com/github/aakashr30/Linear-Regression-In-Pyspark-Adversting-Analysis-/blob/main/linear_regression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Create entry points to spark

In [None]:
!pip install pyspark
import pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 29 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 53.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=ce96d8b5821965aaffec6170882dccfc28d04692b4492e067f4284c57fe85eba
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
#from pyspark import SparkContext
#sc = SparkContext(master = 'local')

#from pyspark.sql import SparkSession
#spark = SparkSession.builder \
#          .appName("Python Spark SQL basic example") \
#          .config("spark.some.config.option", "some-value") \
#          .getOrCreate()

In [None]:
from pyspark.sql import SparkSession

In [None]:
#  Create SparkSession object 'spark'
spark = SparkSession.builder.appName('Advertising').getOrCreate()

# Linear regression without cross-valiation

## Import data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
ad = spark.read.csv('/content/drive/MyDrive/0.MKCE/Datasets/Advertising.csv', 
                    header=True, 
                    inferSchema=True)
ad.show(5)

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows



## Transform data structure

In [None]:
from pyspark.ml.linalg import Vectors

ad_df = ad.rdd.map(lambda x: [Vectors.dense(x[0:3]), 
                              x[-1]]).toDF(['features', 
                                            'label'])

ad_df.show(5)

+----------------+-----+
|        features|label|
+----------------+-----+
|[1.0,230.1,37.8]| 22.1|
| [2.0,44.5,39.3]| 10.4|
| [3.0,17.2,45.9]|  9.3|
|[4.0,151.5,41.3]| 18.5|
|[5.0,180.8,10.8]| 12.9|
+----------------+-----+
only showing top 5 rows



## Build linear regression model

In [None]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', 
                      labelCol = 'label')

## Fit the model

In [None]:
lr_model = lr.fit(ad_df)

## Prediction

In [None]:
pred = lr_model.transform(ad_df)
pred.show(5)

## Module evaluation

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator 

evaluator = RegressionEvaluator(predictionCol='prediction', 
                                labelCol='label')

evaluator.setMetricName('r2').evaluate(pred)

## Compare results with R
The comparison below shows that the linear regression analyses from pyspark and R obtained very close results.

```{r}
# intercept and coefficients from R
advertise = read.csv('data/Advertising.csv', header = TRUE)
lr_ad = lm(Sales~., data = advertise)
lr_ad$coefficients

 (Intercept)           TV        Radio    Newspaper 
 2.938889369  0.045764645  0.188530017 -0.001037493
 
# intercept and coefficents from pyspark
lr_model.intercept

2.9388893694594134

lr_model.coefficients

DenseVector([0.0458, 0.1885, -0.001])

# R squared from R
summary(lr_ad)$r.squared

0.8972106

# R squared from pyspark
evaluator.evaluate(ad_pred, {evaluator.metricName: "r2"})

0.897210638178952

```

# Linear regression with cross-validation

## Training and test datasets

In [None]:
training, test = ad_df.randomSplit([0.8, 0.2], 
                                   seed=123)

## Build cross-validation model

In [None]:
##=====build cross valiation model======

# estimator
lr = LinearRegression(featuresCol = 'features', 
                      labelCol = 'label')

# parameter grid
from pyspark.ml.tuning import ParamGridBuilder

param_grid = ParamGridBuilder().\
    addGrid(lr.regParam, [0, 0.5, 1]).\
    addGrid(lr.elasticNetParam, [0, 0.5, 1]).\
    build()
    
# evaluator
evaluator = RegressionEvaluator(predictionCol='prediction',
                                labelCol='label', 
                                metricName='r2')

# cross-validation model
from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(estimator=lr, 
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator, 
                    numFolds=4)

## Fit cross-validation model

In [None]:
cv_model = cv.fit(training)

## Prediction

In [None]:
pred_training_cv = cv_model.transform(training)

pred_test_cv = cv_model.transform(test)

## Evaluation

In [None]:
# performance on training data
evaluator.setMetricName('r2').evaluate(pred_training_cv)

In [None]:
# performance on test data
evaluator.setMetricName('r2').evaluate(pred_test_cv)

## Intercept and coefficients

In [None]:
print('Intercept: ', cv_model.bestModel.intercept, "\n",
     'coefficients: ', cv_model.bestModel.coefficients)

## Get parameter values from the best model

Parameters can be extracted by calling the java property.

In [None]:
print('best regParam: ' + str(cv_model.bestModel._java_obj.getRegParam()) + "\n" +
     'best ElasticNetParam:' + str(cv_model.bestModel._java_obj.getElasticNetParam()))