In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import isnan, when, count, col

In [None]:
spark = SparkSession.builder.appName('CaliHousing').getOrCreate()

# **Part 1 - Data Preprocessing**

In [None]:
df = spark.read.csv('/FileStore/tables/1990californiahousing.csv', inferSchema = True, header = True)

df.show(5)

In [None]:
df.select('median_house_value').describe().show() #20640 rows in total with mean value of $206855 for housing price

**1.1 Cleaning Null Values**

In [None]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show() #looks like 207 null values from total_bedrooms

In [None]:
df = df.na.drop(how = 'any') #so dropping rows where all the row values are null
df.select('median_house_value').describe().show() #see we dropped 207 null rows 

**1.2 Encoding Categorial Data**

In [None]:
df.groupBy('ocean_proximity').count().show() #so we got 5 categories for ocean proximity

In [None]:
encoder = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_cat")
encoded = encoder.fit(df).transform(df)
encoded.show(5) #notice new column that encodes the ocean prox data

**1.3 Creating Features Column**

In [None]:
assembler = VectorAssembler(inputCols=['longitude','latitude','housing_median_age','total_rooms','total_bedrooms','population','households','median_income',
                                       'ocean_proximity_cat'],
                           outputCol='features') 
#so we're grabbing our relevant features and turning them into a single vector, since PySpark need 2 columns coming in as Features and Label. OutputCol is the single feature column that we combined

output = assembler.transform(encoded) #applying our vector assembler to all our data

output.printSchema()

In [None]:
output.select('features').head(1) #notice features is a DenseVector containing all the features we combined

**1.4 Creating Feature and Label Dataframe**


In [None]:
final_data = output.select('features', 'median_house_value') #so the data we'll be using to apply LR will be the features and the label

final_data.show(3)

**1.5 Train Test Split**


In [None]:
train, test = final_data.randomSplit([0.8,0.2]) #train will have 80%, test will have 20% of data

train.describe().show()

In [None]:
test.describe().show()

# **Part 2 - Creating Model**

In [None]:
lr = LinearRegression(featuresCol='features', labelCol='median_house_value', predictionCol='prediction', maxIter=200) #Creating instance of linear regression module

lrmodel = lr.fit(train) #fitting our train data

# **Part 3 - Evaluating Model**

In [None]:
test_results = lrmodel.evaluate(test)

test_results.residuals.show() #residuals is the diff between predicted values and label

In [None]:
test_results.rootMeanSquaredError #we're off on average by $71944.62 for housing price, so we're within one standard deviation of $116129.73

In [None]:
final_data.describe().show() #we're within one standard deviation