In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
appName= "hive_pyspark"
master= "local"

In [3]:
spark = SparkSession.builder \
	.master(master).appName(appName).enableHiveSupport().getOrCreate()

In [4]:
df=spark.sql("show databases")
df.show()

+---------+
|namespace|
+---------+
|  default|
+---------+



Read the given CSV file in a Hive 

In [6]:
data=spark.read.csv("boston.csv",header=True)
data.show(5)
data.write.saveAsTable("bostontable")

+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM| ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632| 18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|  0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|  0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|  0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|  0|2.18000006

In [7]:
df=spark.sql("select * from bostontable")
df.show()

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|  18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|   0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|   0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|   0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|   0|2.

Read the data from Hive table as spark dataframe

In [8]:
for col in df.dtypes:
    print(col[0]+" , "+col[1])

CRIM , string
ZN , string
INDUS , string
CHAS , string
NOX , string
RM , string
AGE , string
DIS , string
RAD , string
TAX , string
PT , string
B , string
LSTAT , string
MV , string


In [9]:
# Changing the Data Type of the columns to double
from pyspark.sql.functions import col
df = df.select([col(column).cast('double') for column in df.columns])

In [10]:
#Get All column names and it's types
for col in df.dtypes:
    print(col[0]+" , "+col[1])

CRIM , double
ZN , double
INDUS , double
CHAS , double
NOX , double
RM , double
AGE , double
DIS , double
RAD , double
TAX , double
PT , double
B , double
LSTAT , double
MV , double


In [11]:
# convert to vector column first
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=df.columns, outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)

In [12]:
df.dtypes

[('CRIM', 'double'),
 ('ZN', 'double'),
 ('INDUS', 'double'),
 ('CHAS', 'double'),
 ('NOX', 'double'),
 ('RM', 'double'),
 ('AGE', 'double'),
 ('DIS', 'double'),
 ('RAD', 'double'),
 ('TAX', 'double'),
 ('PT', 'double'),
 ('B', 'double'),
 ('LSTAT', 'double'),
 ('MV', 'double')]

Get the correlation between dependent and independent variables

In [13]:
# Printing the correlation of the IDV columns & the DV column using stst function
for col in df.dtypes:
    print('Correlation of',col[0],'&', 'MV =',df.stat.corr(col[0],'MV'))

Correlation of CRIM & MV = -0.3883046116575089
Correlation of ZN & MV = 0.360445344637529
Correlation of INDUS & MV = -0.48372517128143366
Correlation of CHAS & MV = 0.1752601777529185
Correlation of NOX & MV = -0.4273207763683772
Correlation of RM & MV = 0.6953599371272672
Correlation of AGE & MV = -0.3769545671428867
Correlation of DIS & MV = 0.24992873873512172
Correlation of RAD & MV = -0.38162623156691683
Correlation of TAX & MV = -0.46853593528654536
Correlation of PT & MV = -0.5077867038116086
Correlation of B & MV = 0.3334608226834165
Correlation of LSTAT & MV = -0.7376627294671615
Correlation of MV & MV = 1.0


Build a linear regression model to predict house price

In [14]:
# Using Vector assembler to transform each row into a vector
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(df)
vhouse_df = vhouse_df.select(['features', 'MV'])
vhouse_df.show(3)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
+--------------------+-----------+
only showing top 3 rows



In [15]:
# Spliting the dataset into training and testing datasets
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [16]:
# Building a Linear Regression Model
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
# fitting the Linear Regression Model
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.04201970185640318,0.010714446902985515,0.0,3.527307541785578,-6.154107696720758,3.744732915921346,0.0,-0.7949301600555968,1.6514759252508173e-05,0.0,-0.6038896071882205,0.00807411560529177,-0.6133120748633886]
Intercept: 21.252590516584764


 Evaluate the Linear Regression model by getting the RMSE and R-squared values

In [18]:
# Printing the RMSE & R2 Values of the model
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 4.942463
r2: 0.712588


In [19]:
# Comparing the Predicted and Visually
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MV","features").show(5)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|30.887727384418568|32.70000076|[0.01301,35.0,1.5...|
| 37.17739866483629|       50.0|[0.01381,80.0,0.4...|
|26.862183623765166|       24.5|[0.01501,80.0,2.0...|
| 41.30965014507718|       50.0|[0.01501,90.0,1.2...|
|34.278166476248074|       44.0|[0.01538,90.0,3.7...|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.718054


In [20]:
# Print RMSE for Test Data
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 4.8343


In [21]:
# Printing Residuals
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 10
objectiveHistory: [0.5, 0.4360833233915557, 0.2456996700095724, 0.21979719554757443, 0.18756120953301644, 0.18472744250440873, 0.18296106060619488, 0.1818328272487555, 0.18156765173078268, 0.18148517583503665, 0.18139770279345838]




+-------------------+
|          residuals|
+-------------------+
| -6.415456560079502|
|  2.578240227390701|
| -4.703407442581863|
|  5.994175152628522|
|  1.751667626385128|
| 1.2683534463333501|
| -1.047674784288514|
|  4.773939574998693|
| -2.496434095035024|
| 0.4187864624213411|
|  11.76447638222767|
|-0.7569485357052272|
|  6.797331066477113|
| 0.4827290392985546|
| -9.314123121044815|
| 3.4265200538693925|
|-3.7113406916915075|
| 1.4822046231585588|
| 0.9258887378655913|
|  2.313511646778995|
+-------------------+
only showing top 20 rows



In [22]:
# Checking the predictions
predictions = lr_model.transform(test_df)
predictions.select("prediction","MV","features").show()

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|30.887727384418568|32.70000076|[0.01301,35.0,1.5...|
| 37.17739866483629|       50.0|[0.01381,80.0,0.4...|
|26.862183623765166|       24.5|[0.01501,80.0,2.0...|
| 41.30965014507718|       50.0|[0.01501,90.0,1.2...|
|34.278166476248074|       44.0|[0.01538,90.0,3.7...|
| 30.53193238363042|32.90000153|[0.01778,95.0,1.4...|
| 25.15517608924158|       33.0|[0.019509999,17.5...|
|27.553002326120435|23.89999962|[0.025429999,55.0...|
|27.440903488901053|       25.0|[0.028750001,28.0...|
|29.140539663115263|31.20000076|[0.03049,55.0,3.7...|
|32.359105544550346|34.90000153|[0.03359,75.0,2.9...|
|21.984220705408532|20.89999962|[0.03548,80.0,3.6...|
|24.772159853274097|22.89999962|[0.03551,25.0,4.8...|
|29.066484227817412|27.89999962|[0.036150001,80.0...|
|25.812714878172343|24.79999924|[0.036589999,25.0...|
|23.382658275036444|20.70000