In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 64.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=17d91e7c5be5c0d3ae3284fd857c8ef5ff83a8fd9828b00a2f7a1fb9863bc14e
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.appName('Questions').getOrCreate()

In [4]:
spark

# **1**.Read the given CSV file in a Hive table
## Perform the following tasks using PySpark

#2. Read the data from Hive table as spark dataframe 

In [6]:
data = spark.read.csv('/content/boston.csv',header='true', 
                      inferSchema='true')
data.show(5)

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|18.0|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|       24.0|
|0.027310001| 0.0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729| 0.0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001| 0.0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999| 0.0|2.

## 3.Get the correlation between dependent and independent variables

In [7]:
import six
for i in data.columns:
    if not( isinstance(data.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, data.stat.corr('MV',i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615
Correlation to MV for  MV 1.0


In [8]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')


In [9]:
vhouse_df = vectorAssembler.transform(data)

In [11]:
vhouse_df =vhouse_df.select(['features', 'MV'])
vhouse_df.show(3)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
+--------------------+-----------+
only showing top 3 rows



## 4.Build a linear regression model to predict house price 

In [12]:
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [13]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.007903775587140805,0.01073788093124137,-0.007186843552412929,1.979217890324662,-6.237654838894437,4.2770957357925505,0.0,-0.6217534450049654,0.0,0.0,-0.8831886324952014,0.005753997826681718,-0.6077237761500195]
Intercept: 23.24246112480068


##5. Evaluate the Linear Regression model by getting the RMSE and R-squared values

In [14]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 5.007570
r2: 0.726474


In [15]:
train_df.describe().show()

+-------+------------------+
|summary|                MV|
+-------+------------------+
|  count|               361|
|   mean|22.597229964110802|
| stddev| 9.588050635682276|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



In [16]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
| 31.38667619429801|       24.0|[0.00632,18.0,2.3...|
| 28.20396528417091|       22.0|[0.01096,55.0,2.2...|
|30.829703073981793|35.40000153|[0.01311,90.0,1.2...|
| 39.38154610188929|       50.0|[0.01381,80.0,0.4...|
| 32.24377294115004|31.60000038|[0.01432,100.0,1....|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.670419


In [17]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 4.67514


In [18]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 10
objectiveHistory: [0.5, 0.431705373757803, 0.23159383442447737, 0.2084557833847855, 0.17784943893076097, 0.1737457924079901, 0.17274542528740153, 0.1715364747720391, 0.17122945848759194, 0.17111183937441043, 0.1710329096042634]
+--------------------+
|           residuals|
+--------------------+
|  0.7470938546778001|
|0.009665815115333487|
|  2.1518303366220444|
| -2.3711034878181287|
|  -3.659163959047092|
|   7.749567984390531|
|   8.043578554087084|
|   2.763395139128928|
|  0.9106866599170829|
| -0.6159184014613395|
|   9.658186666968348|
| -1.9785735577361336|
|   4.722623890846222|
| -0.9102540877629828|
| -10.517634140318517|
|  -4.338318139805644|
|  2.7009465599460114|
|  1.1053039891362246|
| -2.2587716128617252|
| 0.13605473790860856|
+--------------------+
only showing top 20 rows



In [19]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","MV","features").show()

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
| 31.38667619429801|       24.0|[0.00632,18.0,2.3...|
| 28.20396528417091|       22.0|[0.01096,55.0,2.2...|
|30.829703073981793|35.40000153|[0.01311,90.0,1.2...|
| 39.38154610188929|       50.0|[0.01381,80.0,0.4...|
| 32.24377294115004|31.60000038|[0.01432,100.0,1....|
|26.595798375381804|23.10000038|[0.0187,85.0,4.15...|
|26.418148891492514|       33.0|[0.019509999,17.5...|
| 25.64930339693282|21.60000038|[0.027310001,0.0,...|
| 20.40928624447486|       17.5|[0.031129999,0.0,...|
| 30.49997897616508|33.40000153|[0.032370001,0.0,...|
|28.749409996096716|24.10000038|[0.034449998,82.5...|
|24.909299632952173|19.39999962|[0.03466,35.0,6.0...|
| 24.74055528109508|22.89999962|[0.03551,25.0,4.8...|
|29.612965577396395|       23.5|[0.035840001,80.0...|
| 25.83407760604879|24.79999924|[0.036589999,25.0...|
|21.667386040068422|21.10000

In [20]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'MV')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="MV", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 4.89641
