In [34]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [35]:
# Import necessary libraries

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
appName= "hive_pyspark"
master= "local"

In [36]:
# Creating spark session
spark = SparkSession.builder.master(master).appName(appName).enableHiveSupport().getOrCreate()

In [37]:
# Show DataBases
df = spark.sql("show databases")
df.show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [39]:
# Converting csv file to hive table
datafile = spark.read.csv("/content/boston.csv",header=True)
datafile.show(5)
datafile.write.saveAsTable("boston")

+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM| ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632| 18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|  0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|  0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|  0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|  0|2.18000006

In [40]:
# DataFrame Created by fetching data from  botson hive table

df1 = spark.sql("select * from boston")
df1.show()

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|  18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|   0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|   0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|   0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|   0|2.

In [41]:
# Casting all Columns to double

from pyspark.sql.functions import col

table = spark.sql("select * from boston")

df2 = table.select([col(c).cast("double") for c in table.columns])

In [42]:
# Schema  of dataframe

df2.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



In [43]:
# correlation between dependent and independent variables
import six
for i in df2.columns:
    if not( isinstance(df2.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, df2.stat.corr('MV',i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615
Correlation to MV for  MV 1.0


In [44]:
# converting DataFrame to two columns Features and label

cols = df2.columns
cols.remove('MV')

from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = cols , outputCol = 'features')
final_df = vectorAssembler.transform(df2)
final_df = final_df.select(['features', 'MV'])
final_df.show(3)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
+--------------------+-----------+
only showing top 3 rows



In [45]:
# Split into train and test DataFrame

splits = final_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [46]:
# Linear Regression

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.05001065399421923,0.02481814438396697,-0.00037041471764136214,1.4405822731141704,-5.3079020891678566,3.9686620748408936,-0.0017157396994151944,-0.7267199341127945,0.0,-0.0007104884340947846,-0.7870692158228544,0.008811313651531748,-0.527476590907259]
Intercept: 21.436608712575563


In [47]:
# RMSE and R2 Score of LinearRegression 

print("RMSE: %f" % lr_model.summary.rootMeanSquaredError)
print("r2: %f" % lr_model.summary.r2)

RMSE: 4.920404
r2: 0.721252


In [48]:
# RMSE and R2 Score of LinearRegression Test Data

lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|27.861699054448017|       22.0|[0.01096,55.0,2.2...|
|30.994619991362548|29.10000038|[0.01439,60.0,2.9...|
| 31.75761591307047|32.90000153|[0.01778,95.0,1.4...|
| 26.64124351247619|23.10000038|[0.0187,85.0,4.15...|
|30.897381153060497|34.70000076|[0.02729,0.0,7.07...|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.704794


In [49]:
# RMSE of Test Data
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 4.84002


In [50]:
print("numIterations: %d" % lr_model.summary.totalIterations)
print("objectiveHistory: %s" % str(lr_model.summary.objectiveHistory))
lr_model.summary.residuals.show()

numIterations: 10
objectiveHistory: [0.49999999999999956, 0.4311028521971911, 0.22931548508091384, 0.2084563040484514, 0.18204050442924605, 0.17869861532186226, 0.17714326401783656, 0.17635111235244696, 0.17612115819272461, 0.17598231474678722, 0.17584372075759416]




+--------------------+
|           residuals|
+--------------------+
|  -6.654227018764537|
|  0.7779829064207142|
|  1.0231688470129328|
|   4.756295549672501|
|  1.0638134543301767|
|  11.333122163759647|
|-0.37858727970709083|
|  -3.534989815533283|
|    8.92102558295268|
|   8.428120029388928|
|   3.280035007295254|
|   7.524533984120989|
| -1.0360834286996088|
|  10.113076020641614|
| -1.8775048045233937|
|  5.3762679845228405|
|-0.49139838708166295|
|   -9.54266613434065|
|  -4.236302367094314|
| -2.2973999143630977|
+--------------------+
only showing top 20 rows



In [51]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","MV","features").show()

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|27.861699054448017|       22.0|[0.01096,55.0,2.2...|
|30.994619991362548|29.10000038|[0.01439,60.0,2.9...|
| 31.75761591307047|32.90000153|[0.01778,95.0,1.4...|
| 26.64124351247619|23.10000038|[0.0187,85.0,4.15...|
|30.897381153060497|34.70000076|[0.02729,0.0,7.07...|
|25.175239059001438|21.60000038|[0.027310001,0.0,...|
|29.859460150767248|30.79999924|[0.027629999,75.0...|
|24.590329909321785|19.39999962|[0.03466,35.0,6.0...|
|23.395909542617805|20.89999962|[0.03548,80.0,3.6...|
| 36.64022126044747|45.40000153|[0.035780001,20.0...|
|25.621336679501923|24.79999924|[0.036589999,25.0...|
|26.313624951962012|23.20000076|[0.038710002,52.5...|
| 28.05985274492076|       28.0|[0.041129999,25.0...|
|26.268679078387528|24.79999924|[0.042970002,52.5...|
|  23.9331381434123|       20.5|[0.043370001,21.0...|
|   23.897096220296|19.79999