In [6]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:
# Import necessary libraries

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
appName= "hive_pyspark"
master= "local"

In [8]:
# Creating spark session
spark = SparkSession.builder.master(master).appName(appName).enableHiveSupport().getOrCreate()

In [9]:
# Show DataBases
df = spark.sql("show databases")
df.show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [10]:
# Converting csv file to hive table
datafile = spark.read.csv("/content/boston.csv",header=True)
datafile.show(5)
datafile.write.saveAsTable("boston")

+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM| ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+---+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632| 18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|  0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|  0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|  0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|  0|2.18000006

In [11]:
# DataFrame Created by fetching data from  botson hive table

df1 = spark.sql("select * from boston")
df1.show()

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|  18|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|         24|
|0.027310001|   0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729|   0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001|   0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999|   0|2.

In [12]:
# Casting all Columns to double

from pyspark.sql.functions import col

table = spark.sql("select * from boston")

df2 = table.select([col(c).cast("double") for c in table.columns])

In [13]:
# Schema  of dataframe

df2.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



In [22]:
lst=[]
for i in df2.columns:
  if(i!="MV"):
    lst.append(i)
lst    

['CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'NOX',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PT',
 'B',
 'LSTAT']

In [24]:
for i in lst:
  print(i," - ",df2.stat.corr(i,"MV"))

CRIM  -  -0.3883046116575089
ZN  -  0.360445344637529
INDUS  -  -0.48372517128143366
CHAS  -  0.1752601777529185
NOX  -  -0.4273207763683772
RM  -  0.6953599371272672
AGE  -  -0.3769545671428867
DIS  -  0.24992873873512172
RAD  -  -0.38162623156691683
TAX  -  -0.46853593528654536
PT  -  -0.5077867038116086
B  -  0.3334608226834165
LSTAT  -  -0.7376627294671615


In [41]:
# converting DataFrame to two columns Features and label


from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = lst , outputCol = 'features')
final_df = vectorAssembler.transform(df2)
final_df = final_df.select(['features', 'MV'])
final_df.show(10)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
|[0.032370001,0.0,...|33.40000153|
|[0.069049999,0.0,...|36.20000076|
|[0.029850001,0.0,...|28.70000076|
|[0.088289998,12.5...|22.89999962|
|[0.144549996,12.5...|27.10000038|
|[0.211239994,12.5...|       16.5|
|[0.170039997,12.5...|18.89999962|
+--------------------+-----------+
only showing top 10 rows



In [42]:
# Split into train and test DataFrame

splits = final_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [43]:
# Linear Regression

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.05844061053507809,0.014940706036129004,0.0,2.3287934953346277,-8.81646890734017,5.158946418063817,-0.0026279605774610776,-0.6420914892199767,0.0,-0.0008224174316863815,-0.8473106388320544,0.0058730367613063755,-0.39411021346720065]
Intercept: 16.319146516485393


In [45]:
# RMSE and R2 Score of LinearRegression (Train)
train_score = lr_model.summary
print("RMSE: %f" % train_score.rootMeanSquaredError)
print("r2: %f" % train_score.r2)

RMSE: 4.816827
r2: 0.730505


In [46]:
# RMSE and R2 Score of LinearRegression (Test)
test_score = lr_model.evaluate(test_df)
print("RMSE: %f" % test_score.rootMeanSquaredError)
print("r2: %f" % test_score.r2)

RMSE: 5.153306
r2: 0.668451
