In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Import necessary libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.mllib.stat import Statistics
appName= "hive_pyspark"
master= "local"

In [None]:
spark = SparkSession.builder.master(master).appName(appName).enableHiveSupport().getOrCreate()

In [None]:
# Show DataBases
df = spark.sql("show databases")
df.show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [None]:
# Converting csv file to hive table
datafile = spark.read.csv("/content/boston.csv",header=True)
datafile.show(5)
datafile.write.saveAsTable("boston")

+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM| ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632| 18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|  0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|  0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|  0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|  0|2.18000006

In [None]:
# DataFrame Created by fetching data from  botson hive table

df= spark.sql("select * from boston")
df.show()

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|  18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|   0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|   0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|   0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|   0|2.

In [None]:
# Casting all Columns to double
from pyspark.sql.functions import col
table = spark.sql("select * from boston")
df = table.select([col(c).cast("double") for c in table.columns])

In [None]:
#prints datatype
df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



In [None]:
df.columns

['CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'NOX',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PT',
 'B',
 'LSTAT',
 'MV']

In [None]:
print(df.count(),len(df.columns))

506 14


In [None]:
from pyspark.sql.functions import *
for i in df.columns:
  if(i!='MV'):
    print(i,"vs MV",df.stat.corr(i,'MV'))

CRIM vs MV -0.3883046116575089
ZN vs MV 0.360445344637529
INDUS vs MV -0.48372517128143366
CHAS vs MV 0.1752601777529185
NOX vs MV -0.4273207763683772
RM vs MV 0.6953599371272672
AGE vs MV -0.3769545671428867
DIS vs MV 0.24992873873512172
RAD vs MV -0.38162623156691683
TAX vs MV -0.46853593528654536
PT vs MV -0.5077867038116086
B vs MV 0.3334608226834165
LSTAT vs MV -0.7376627294671615


In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
#creating vectors from features
#Apache MLlib takes input in vector form
assembler=VectorAssembler(inputCols=['CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'NOX',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PT',
 'B',
 'LSTAT'],outputCol='features')
output=assembler.transform(df)
output.select('features','MV').show(5)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
|[0.032370001,0.0,...|33.40000153|
|[0.069049999,0.0,...|36.20000076|
+--------------------+-----------+
only showing top 5 rows



In [None]:
#final data consist of features and label which is crew.
final_data=output.select('features','MV')
#splitting data into train and test
train_data,test_data=final_data.randomSplit([0.7,0.3])
train_data.describe().show()

+-------+------------------+
|summary|                MV|
+-------+------------------+
|  count|               349|
|   mean|22.676217837392553|
| stddev| 9.196703081653078|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



In [None]:
test_data.describe().show()

+-------+-----------------+
|summary|               MV|
+-------+-----------------+
|  count|              157|
|   mean|22.21401270178344|
| stddev|9.219387669617026|
|    min|              5.0|
|    max|             50.0|
+-------+-----------------+



In [None]:
from pyspark.ml.regression import LinearRegression
LRmodel=LinearRegression(featuresCol='features',labelCol='MV')
#pass train_data to train model
training=LRmodel.fit(train_data)

In [None]:
data = test_data.select('features')

In [None]:
predictions = training.transform(data)

In [None]:
test_data.show()

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.01096,55.0,2.2...|       22.0|
|[0.01311,90.0,1.2...|35.40000153|
|[0.0136,75.0,4.0,...|18.89999962|
|[0.01439,60.0,2.9...|29.10000038|
|[0.01501,90.0,1.2...|       50.0|
|[0.0187,85.0,4.15...|23.10000038|
|[0.02177,82.5,2.0...|42.29999924|
|[0.02899,40.0,1.2...|26.60000038|
|[0.031129999,0.0,...|       17.5|
|[0.033059999,0.0,...|20.60000038|
|[0.03548,80.0,3.6...|20.89999962|
|[0.035840001,80.0...|       23.5|
|[0.036150001,80.0...|27.89999962|
|[0.03768,80.0,1.5...|34.59999847|
|[0.040109999,80.0...|33.29999924|
|[0.043370001,21.0...|       20.5|
|[0.04417,70.0,2.2...|24.79999924|
|[0.04462,25.0,4.8...|23.89999962|
|[0.045899998,52.5...|22.29999924|
+--------------------+-----------+
only showing top 20 rows



In [None]:
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[0.00632,18.0,2.3...| 30.64954201260531|
|[0.01096,55.0,2.2...| 27.63945317798794|
|[0.01311,90.0,1.2...|31.179826334096525|
|[0.0136,75.0,4.0,...|15.760746983349105|
|[0.01439,60.0,2.9...|31.830746531224875|
|[0.01501,90.0,1.2...| 45.11363531591966|
|[0.0187,85.0,4.15...|25.409066864608896|
|[0.02177,82.5,2.0...| 37.07958727310088|
|[0.02899,40.0,1.2...|22.082847986903484|
|[0.031129999,0.0,...|16.457553703395345|
|[0.033059999,0.0,...| 22.06974614342972|
|[0.03548,80.0,3.6...|21.907115668667203|
|[0.035840001,80.0...|30.688532333501918|
|[0.036150001,80.0...| 32.19356624986632|
|[0.03768,80.0,1.5...|35.025298999651206|
|[0.040109999,80.0...|  36.4566486325125|
|[0.043370001,21.0...|24.352143511776603|
|[0.04417,70.0,2.2...|31.152890732693677|
|[0.04462,25.0,4.8...|27.547444165111383|
|[0.045899998,52.5...|27.487850803665914|
+--------------------+------------

In [None]:
#evaluating model trained for Rsquared error
train_res=training.evaluate(train_data)
print('Train Score (R2) :',train_res.r2)
test_res=training.evaluate(test_data)
print('Test Score (R2) :',test_res.r2)

Train Score (R2) : 0.7399772475767292
Test Score (R2) : 0.7287776303572815


In [None]:
#evaluating model trained for RMSE 
train_res=training.evaluate(train_data)
print('Train Score (RMSE) :',train_res.rootMeanSquaredError)
test_res=training.evaluate(test_data)
print('Test Score (RMSE) :',test_res.rootMeanSquaredError)

Train Score (RMSE) : 4.682898550861255
Test Score (RMSE) : 4.786050791032814
