In [140]:
import pyspark
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [141]:
spark = SparkSession.builder.appName("PysparkmlModel").getOrCreate()

In [142]:
mlDF = spark.read.csv('C:/Users/ramna/DataScience/DataSet/tableauModelReport.csv',header=True)

In [143]:
from pyspark.sql.types import IntegerType
mlDF1 = mlDF.withColumn("PricePerSqft", mlDF["PricePerSqft"].cast(IntegerType())) .withColumn("SqftSize", mlDF["SqftSize"].cast(IntegerType())).withColumn("price", mlDF["price"].cast(IntegerType())).withColumn("bath", mlDF["bath"].cast(IntegerType()))

In [144]:
mlDF1.describe().show(5)

+-------+-------+------------------+------------------+------------------+------------------+--------------+--------------------+
|summary|BhkSize|      PricePerSqft|              bath|             price|          SqftSize|locationofHome|            AreaType|
+-------+-------+------------------+------------------+------------------+------------------+--------------+--------------------+
|  count|  12492|             12463|             12436|             12508|             12463|         12507|               12508|
|   mean|   null| 8081.927866484795|2.7169507880347377|115.07083466581388| 1575.026077188478|          null|                null|
| stddev|   null|109834.03851704008|1.3673681873810917|152.78077561858225|1271.0420912760098|          null|                null|
|    min|  10BHK|               267|                 1|                 8|                 1|        Anekal|      Built-up  Area|
|    max|   9BHK|          12000000|                40|              3600|             522

In [145]:
mlDF1.printSchema()

root
 |-- BhkSize: string (nullable = true)
 |-- PricePerSqft: integer (nullable = true)
 |-- bath: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- SqftSize: integer (nullable = true)
 |-- locationofHome: string (nullable = true)
 |-- AreaType: string (nullable = true)



In [155]:
from pyspark.sql.functions import corr
mlDF1.select(corr('price','SqftSize')).show(3)

+---------------------+
|corr(price, SqftSize)|
+---------------------+
|   0.5727580283915608|
+---------------------+



In [161]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['SqftSize'], outputCol='features')
mlDF2 = assembler.setHandleInvalid("skip").transform(mlDF1)

In [167]:
train,test = mlDF2.randomSplit([0.7, 0.3])

In [168]:
train.show(2)

+--------+-----+
|features|price|
+--------+-----+
|   [1.0]|  120|
|   [5.0]|  115|
+--------+-----+
only showing top 2 rows



In [169]:
test.show(2)

+--------+-----+
|features|price|
+--------+-----+
| [250.0]|   40|
| [284.0]|    8|
+--------+-----+
only showing top 2 rows



In [162]:
mlDF2 = mlDF2.select(['features','price'])

In [163]:
linearReg = LinearRegression(maxIter=10, labelCol='price') 

In [164]:
lnrModel = linearReg.fit(mlDF2)

In [165]:
print(f'Intercept: {lnrModel.intercept}\nCoefficient: {lnrModel.coefficients.values}')

Intercept: 6.447608656996459
Coefficient: [0.06889671]


In [171]:
trainSummary = lnrModel.summary
print("RMSE: %f" % trainSummary.rootMeanSquaredError)
print("\nr2: %f" % trainSummary.r2)


RMSE: 125.325005

r2: 0.328052


In [172]:
from  pyspark.sql.functions import abs
predictions = lnrModel.transform(test)
x =((predictions['price']-predictions['prediction'])/predictions['price'])*100
predictions = predictions.withColumn('Accuracy',abs(x))
predictions.select("prediction","price","Accuracy","features").show(5)

+------------------+-----+------------------+--------+
|        prediction|price|          Accuracy|features|
+------------------+-----+------------------+--------+
|23.671786660412906|   40|40.820533348967736| [250.0]|
|26.014274868877543|    8|225.17843586096927| [284.0]|
| 26.84103541304153|   22| 22.00470642291605| [296.0]|
|30.561457861779484|   13|135.08813739830373| [350.0]|
|31.250424981916144|   16|  95.3151561369759| [360.0]|
+------------------+-----+------------------+--------+
only showing top 5 rows



In [173]:
from pyspark.ml.evaluation import RegressionEvaluator
pred_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")
print("R Squared (R2) on test data = %g" % pred_evaluator.evaluate(predictions))

R Squared (R2) on test data = 0.437989


In [175]:
r2 = trainSummary.r2
n = mlDF2.count()
p = len(mlDF2.columns)
adjusted_r2 = 1-(1-r2)*(n-1)/(n-p-1)

In [177]:
lnrModel.summary.rootMeanSquaredError

125.32500489914895

In [93]:
lnrsmry = lnrModel.summary

In [97]:
print(f'Explained Variance: {lnrsmry.explainedVariance}\nR Squared: {lnrsmry.r2}')
print(f'Std. Error: {lnrsmry.coefficientStandardErrors}\nRoot Mean Squared Err: {lnrsmry.rootMeanSquaredError}')
print(f'Mean Absolute Err: {lnrsmry.meanAbsoluteError}\nMean Squared Err: {lnrsmry.meanSquaredError}')
print(f'P-value: {lnrsmry.pValues}')
lnrsmry.residuals.show(5)
print(f'Num Iterations: {lnrsmry.totalIterations}\nObjective History: {lnrsmry.objectiveHistory}')

Explained Variance: 7667.998337884259
R Squared: 0.3280517590869928
Std. Error: [0.0008833214437298061, 1.7877442980548]
Root Mean Squared Err: 125.32500489914895
Mean Absolute Err: 52.128559566502204
Mean Squared Err: 15706.356852971709
P-value: [0.0, 0.0003114950045708387]
+-------------------+
|          residuals|
+-------------------+
|-14.458169875445279|
| -15.73774742670733|
|-24.123663073395406|
| -50.09021239319043|
| -37.65887395667521|
+-------------------+
only showing top 5 rows

Num Iterations: 1
Objective History: [0.0]


In [98]:
lnrsmry.predictions.show(5)

+--------+-----+------------------+
|features|price|        prediction|
+--------+-----+------------------+
|[1350.0]| 85.0| 99.45816987544528|
|[1296.0]| 80.0| 95.73774742670733|
|[1200.0]| 65.0|  89.1236630733954|
|[1185.0]| 38.0| 88.09021239319043|
|[1440.0]| 68.0|105.65887395667521|
+--------+-----+------------------+
only showing top 5 rows

