## Import the necessary libraries

In [0]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# importing SparkSession then creating a new SparkSession object to use Spark

## Read the csv file within spark

In [0]:
path = '/FileStore/tables/Bengaluru_House_Data.csv'
data = spark.read.option('header', 'true').csv(path)
data.show(10)

+--------------------+-------------+--------------------+---------+-------+----------+----+-------+-----+
|           area_type| availability|            location|     size|society|total_sqft|bath|balcony|price|
+--------------------+-------------+--------------------+---------+-------+----------+----+-------+-----+
|Super built-up  Area|       19-Dec|Electronic City P...|    2 BHK|Coomee |      1056|   2|      1|39.07|
|          Plot  Area|Ready To Move|    Chikka Tirupathi|4 Bedroom|Theanmp|      2600|   5|      3|  120|
|      Built-up  Area|Ready To Move|         Uttarahalli|    3 BHK|   null|      1440|   2|      3|   62|
|Super built-up  Area|Ready To Move|  Lingadheeranahalli|    3 BHK|Soiewre|      1521|   3|      1|   95|
|Super built-up  Area|Ready To Move|            Kothanur|    2 BHK|   null|      1200|   2|      1|   51|
|Super built-up  Area|Ready To Move|          Whitefield|    2 BHK|DuenaTa|      1170|   2|      1|   38|
|Super built-up  Area|       18-May|    Old Ai

## Get the Schema of the spark dataframe  using printSchema()

In [0]:
data.printSchema()

root
 |-- area_type: string (nullable = true)
 |-- availability: string (nullable = true)
 |-- location: string (nullable = true)
 |-- size: string (nullable = true)
 |-- society: string (nullable = true)
 |-- total_sqft: string (nullable = true)
 |-- bath: string (nullable = true)
 |-- balcony: string (nullable = true)
 |-- price: string (nullable = true)



## Check the columns in the spark dataframe

In [0]:
data.columns

Out[4]: ['area_type',
 'availability',
 'location',
 'size',
 'society',
 'total_sqft',
 'bath',
 'balcony',
 'price']

## Convert the columns to the appropriate data type 

In [0]:
from pyspark.sql.types import *

df_new = data.withColumn('total_sqft', data['total_sqft'].cast(IntegerType()))
df_new = df_new.withColumn("bath", df_new["bath"].cast(IntegerType()))
df_new = df_new.withColumn("balcony", df_new["balcony"].cast(IntegerType()))
df_new = df_new.withColumn("price", df_new["price"].cast(DoubleType()))

df_new.printSchema()

root
 |-- area_type: string (nullable = true)
 |-- availability: string (nullable = true)
 |-- location: string (nullable = true)
 |-- size: string (nullable = true)
 |-- society: string (nullable = true)
 |-- total_sqft: integer (nullable = true)
 |-- bath: integer (nullable = true)
 |-- balcony: integer (nullable = true)
 |-- price: double (nullable = true)



In [0]:
df_new.groupBy('area_type').count().show()

+--------------------+-----+
|           area_type|count|
+--------------------+-----+
|      Built-up  Area| 2418|
|Super built-up  Area| 8790|
|          Plot  Area| 2025|
|        Carpet  Area|   87|
+--------------------+-----+



**Inference**: Super built up area type has the highest number of values in the `area_type`.

In [0]:
df_new.groupBy('availability').count().show()

+-------------+-----+
| availability|count|
+-------------+-----+
|       20-Nov|    4|
|       20-Jan|   24|
|       16-Oct|    1|
|       20-Mar|    2|
|       21-Jul|    8|
|       20-Feb|    2|
|       19-Jun|   40|
|       18-Mar|  131|
|       21-Jun|   17|
|       19-Oct|   15|
|Ready To Move|10581|
|       19-Jul|   36|
|       19-Aug|   18|
|       21-Sep|    6|
|       20-May|    4|
|       17-Jan|    1|
|       18-Nov|   47|
|       21-Nov|    3|
|       22-Mar|    3|
|       21-Dec|   93|
+-------------+-----+
only showing top 20 rows



**Inference** : Most number of houses are Ready to move.

In [0]:
df_new.groupBy('location').count().show()

+--------------------+-----+
|            location|count|
+--------------------+-----+
|    Kaval Byrasandra|   22|
| 7th Block Jayanagar|    5|
|        Vittal Nagar|    3|
|Jay an agar 4 T B...|    1|
|            Sarjapur|   82|
|ITI Employees Layout|    1|
|         Nehru Nagar|   11|
|     Teachers Colony|    2|
|      Bendiganahalli|    2|
|    Chikka Banaswadi|    3|
|          BTM Layout|   17|
|          Vimanapura|    3|
|             manyata|    1|
|        Bettahalsoor|    3|
|     Ambedkar Colony|    7|
|          Cooke Town|   16|
|2nd Phase Judicia...|   11|
|          SBM Colony|    2|
|      Goraguntepalya|    1|
|      Pragathi Nagar|    4|
+--------------------+-----+
only showing top 20 rows



In [0]:
df_new.groupBy('size').count().show()

+----------+-----+
|      size|count|
+----------+-----+
|    14 BHK|    1|
| 5 Bedroom|  297|
|     7 BHK|   17|
|    16 BHK|    1|
|      null|   16|
| 1 Bedroom|  105|
| 9 Bedroom|   46|
| 3 Bedroom|  547|
|    27 BHK|    1|
|      1 RK|   13|
|10 Bedroom|   12|
|    11 BHK|    2|
|     4 BHK|  591|
|18 Bedroom|    1|
| 6 Bedroom|  191|
|     2 BHK| 5199|
| 4 Bedroom|  826|
| 2 Bedroom|  329|
|     6 BHK|   30|
|     8 BHK|    5|
+----------+-----+
only showing top 20 rows



## Check the correlation between total_sqft and price

In [0]:
from pyspark.sql.functions import corr
df_new.select(corr('total_sqft','price')).show()

+-----------------------+
|corr(total_sqft, price)|
+-----------------------+
|     0.5729032305054705|
+-----------------------+



**Inference**: There is a positive correlation between `total_sqft` and `price`.

## Check the missing values in the dataframe

In [0]:
from pyspark.sql.functions import isnull, when, count, col,isnan
df_new.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_new.columns]).show()

+---------+------------+--------+----+-------+----------+----+-------+-----+
|area_type|availability|location|size|society|total_sqft|bath|balcony|price|
+---------+------------+--------+----+-------+----------+----+-------+-----+
|        0|           0|       1|  16|   5502|       247|  73|    609|    0|
+---------+------------+--------+----+-------+----------+----+-------+-----+



**Inference**: There are missing values. `society` has the highest number of values that are null/nan. Hence we can drop that column, as it has no further significance.

In [0]:
df_new = df_new.drop('society')
df_new.columns

Out[14]: ['area_type',
 'availability',
 'location',
 'size',
 'total_sqft',
 'bath',
 'balcony',
 'price']

**Inference**: The `society` column has been dropped.

In [0]:
print(df_new.count(), len(df_new.columns))

13320 8


## Drop the nan values across the rows

In [0]:
df_c = df_new.na.drop()
print(df_c.count(), len(df_c.columns))

12524 8


**Inference** The nan values have been dropped.

## Convert all the string columns into numeric values using StringIndexer transformer

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

indexers = StringIndexer(inputCols= [ 'area_type', 'availability','location', 'size'], 
                         outputCols=[  'area_type_index', 'availability_index','location_index', 'size_index'],
                        handleInvalid='keep')
df_ = indexers.fit(df_c).transform(df_c)


##  Represent each of the indexed values in one form with a one-hot 
encoded vector using OneHotEncoder

In [0]:
encoder = OneHotEncoder(inputCols=[ 'area_type_index', 'availability_index','location_index', 'size_index'],
                        outputCols=[ 'area_type_vec', 'availability_vec','location_vec', 'size_vec'])
df__ = encoder.fit(df_).transform(df_)

## Assemble all of the input columns into one single vector that would act as the input feature for the model using VectorAssembler

In [0]:
assembler = VectorAssembler(inputCols=['area_type_vec', 'availability_vec','location_vec', 'size_vec', 'total_sqft',
                             'bath','balcony' ], outputCol='features')
df = assembler.transform(df__)


In [0]:
df = df.select('features', 'price')
df.show(10)

+--------------------+-----+
|            features|price|
+--------------------+-----+
|(1371,[0,9,96,134...|39.07|
|(1371,[2,4,261,13...|120.0|
|(1371,[1,4,88,134...| 62.0|
|(1371,[0,4,208,13...| 95.0|
|(1371,[0,4,122,13...| 51.0|
|(1371,[0,4,82,134...| 38.0|
|(1371,[0,4,91,134...|63.25|
|(1371,[0,16,82,13...| 70.0|
|(1371,[2,4,82,134...|295.0|
|(1371,[0,4,92,134...| 38.0|
+--------------------+-----+
only showing top 10 rows



## Select only the features column as input and the price column as output for 
training the linear regression model

In [0]:
model_df= df.select('features','price')

In [0]:
print((model_df.count(), len(model_df.columns)))

(12524, 2)


## Split the dataframe into train and test sets in order to train and evaluate


In [0]:
train_df,test_df=model_df.randomSplit([0.75,0.25])
print((train_df.count(), len(train_df.columns)))
print((test_df.count(), len(test_df.columns)))

(9370, 2)
(3154, 2)


**Inference** : We split the dataframe in 75/25 ratio and train our model on 75% of the dataset

## Build a Linear Regression model, then print the coefficients and intercept

In [0]:
from pyspark.ml.regression import LinearRegression
lin_reg = LinearRegression(labelCol='price')


In [0]:
lr_model = lin_reg.fit(train_df)

In [0]:
print(lr_model.coefficients)

[-13.145347392161165,-14.723308138784237,41.70604960995152,-4.539807486308372,-3.2715690421337476,-6.321260209849993,-5.789912345108837,6.3325244127959115,-6.152369232503143,-6.744298227219199,-13.63826787311768,16.938822406974136,-10.296023373765907,3.6904718781280907,6.949128726354645,2.5847454893988666,-4.591308133013084,8.470606731438112,-12.966490971265605,182.9304070872814,-25.295859081077843,57.45908093354914,-0.9241247923546961,11.015409553415815,-6.299204273265504,1.2893897609837746,-18.5107210944927,-5.277137403554312,37.10852062998193,-1.8565601515099828,6.523989190789846,-3.320564915198755,3.9336566367881347,1.6610153584197698,-2.3949100495323252,-12.920849560737746,27.509184360789167,12.076762563392911,11.937052999214693,1.797591194244539,135.12055453371366,-0.8084076298821646,21.85257275328394,-15.695854718977694,-9.336185389291023,-22.144832103468207,-0.20844453027386417,25.235332582501588,27.955188810913267,-8.905009801178258,34.698307802666335,13.55154739469145,0.0,16.

In [0]:
print(lr_model.intercept)

-11.45986049402116


In [0]:
train_pred = lr_model.evaluate(train_df)
print(train_pred.r2)

0.6888338423511803


In [0]:
test_res = lr_model.evaluate(test_df)
print(test_res.r2)

0.3682640289952497


In [0]:
print(test_res.rootMeanSquaredError)

107.28691388158236


In [0]:
test_res = lr_model.evaluate(test_df).predictions
test_res.show(10, False)

+--------------------------------------------------------------------+-----+------------------+
|features                                                            |price|prediction        |
+--------------------------------------------------------------------+-----+------------------+
|(1371,[0,4,82,1341,1368,1369,1370],[1.0,1.0,1.0,1.0,920.0,2.0,1.0]) |63.0 |47.9832743396163  |
|(1371,[0,4,82,1341,1368,1369,1370],[1.0,1.0,1.0,1.0,925.0,2.0,1.0]) |35.0 |48.259046701160926|
|(1371,[0,4,82,1341,1368,1369,1370],[1.0,1.0,1.0,1.0,1015.0,2.0,1.0])|45.0 |53.22294920896432 |
|(1371,[0,4,82,1341,1368,1369,1370],[1.0,1.0,1.0,1.0,1050.0,2.0,3.0])|43.0 |61.1010391401626  |
|(1371,[0,4,82,1341,1368,1369,1370],[1.0,1.0,1.0,1.0,1100.0,2.0,2.0])|52.0 |60.88492105541599 |
|(1371,[0,4,82,1341,1368,1369,1370],[1.0,1.0,1.0,1.0,1102.0,2.0,1.0])|48.0 |58.02138829984092 |
|(1371,[0,4,82,1341,1368,1369,1370],[1.0,1.0,1.0,1.0,1105.0,2.0,1.0])|35.4 |58.186851716767706|
|(1371,[0,4,82,1341,1368,1369,1370],[1.0

In [0]:
test_res.select(['price','prediction']).show(10,False)

+-----+------------------+
|price|prediction        |
+-----+------------------+
|63.0 |47.9832743396163  |
|35.0 |48.259046701160926|
|45.0 |53.22294920896432 |
|43.0 |61.1010391401626  |
|52.0 |60.88492105541599 |
|48.0 |58.02138829984092 |
|35.4 |58.186851716767706|
|41.36|58.186851716767706|
|39.99|61.160693416960626|
|35.22|58.4074696060034  |
+-----+------------------+
only showing top 10 rows



<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=4235e2a8-072b-477c-9e24-db5cfe03e525' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>