In [1]:
houseDataSetDf = sqlContext.read.format("csv")\
  .option("header","true")\
  .option("inferSchema", "true")\
  .option("delimiter", ',') \
  .load("/FileStore/tables/t4ismsl81493682860925/final_data_set.csv")

display(houseDataSetDf)

In [2]:
houseDataSetDf.describe().toPandas().transpose()

In [3]:
import pprint
from pyspark.mllib.regression import LabeledPoint

# Create an instance of pretty print for output formatting
pp = pprint.PrettyPrinter(indent=1)

# Remove all rows that have NULL as a value for the x variables of interest
def removeRowsThatContainNull(row):
  headers = ["zestimate_amount", "total_approximate_size", "total_appraisal_value", "zestimate_amount", 
             "valuation_range_high", "valuation_range_low", "bedrooms", "bathrooms", "finished_sqft", "year_built"]  
  for header in headers:
    if str(row[header]) == "NULL":
      return False
  return True

# Keep only single family home types
def keepOnlySingleFamily(row):
  if str(row["usecode"]) == "SingleFamily":
    return True
  else:
    return False

# Create a list of values to be used by the label point
def getLabelPointValues(row):
  #headers = ["total_approximate_size", "total_appraisal_value", "zestimate_amount", 
  #           "valuation_range_high", "valuation_range_low", "bedrooms", "bathrooms", "finished_sqft"]
  headers = ["total_approximate_size", "finished_sqft", "bedrooms", "bathrooms", "year_built"]
  values = []
  for header in headers:
    if (header == "year_built"):
      values.append(2017 - float(row[header]))
    else:
      values.append(float(row[header]))
  return values

# Filter out any rows that contain NULL as a value
# Load the data iinto an rdd of label points 
houseDataSetLabelPoints = houseDataSetDf.rdd.filter(lambda row: removeRowsThatContainNull(row)) \
                                            .filter(lambda row: keepOnlySingleFamily(row)) \
                                            .map(lambda row: LabeledPoint(float(row["zestimate_amount"]), getLabelPointValues(row)))

print("Number of records", houseDataSetLabelPoints.count())

# Print out the label points generated
pp.pprint(houseDataSetLabelPoints.top(5))

In [4]:
import matplotlib.pyplot
import pylab

x = []
y = []

index = 1
for labelPoint in houseDataSetLabelPoints.sortBy(lambda x: x.label).collect():
  y.append(labelPoint.label)
  x.append(index)
  index = index + 1

matplotlib.pyplot.subplot(211)  
matplotlib.pyplot.plot(x,y)
matplotlib.pyplot.xlim([0,1300])
matplotlib.pyplot.xlabel('Observation')
matplotlib.pyplot.ylim([0, 520000])
matplotlib.pyplot.ylabel('Zillow Estimate Amount')
matplotlib.pyplot.title('Zillow Estimate Amount By Observation')
display(matplotlib.pyplot.show())

In [5]:
import matplotlib.pyplot
import pylab

x = []
y = []

for labelPoint in houseDataSetLabelPoints.collect():
  y.append(labelPoint.label)
  x.append(labelPoint.features[1])

matplotlib.pyplot.clf()
matplotlib.pyplot.subplot(211)  
matplotlib.pyplot.scatter(x,y)
matplotlib.pyplot.xlim([0,5000])
matplotlib.pyplot.xlabel('Finished Square Footage')
matplotlib.pyplot.ylim([0, 520000])
matplotlib.pyplot.ylabel('Zillow Estimate Amount')
matplotlib.pyplot.title('Zillow Estimate Amount By Square Footage Of House')
display(matplotlib.pyplot.show())

In [6]:
import matplotlib.pyplot
import pylab

x = []
y = []

for labelPoint in houseDataSetLabelPoints.collect():
  y.append(labelPoint.label)
  x.append(labelPoint.features[4])

matplotlib.pyplot.clf()
matplotlib.pyplot.subplot(211)  
matplotlib.pyplot.scatter(x,y)
matplotlib.pyplot.xlim([0,100])
matplotlib.pyplot.xlabel('Age')
matplotlib.pyplot.ylim([0, 520000])
matplotlib.pyplot.ylabel('Zillow Estimate Amount')
matplotlib.pyplot.title('Zillow Estimate Amount By Age')
display(matplotlib.pyplot.show())

In [7]:
import matplotlib.pyplot
import pylab

x = []
y = []

for labelPoint in houseDataSetLabelPoints.collect():
  y.append(labelPoint.label)
  x.append(labelPoint.features[3])

matplotlib.pyplot.clf()
matplotlib.pyplot.subplot(211)  
matplotlib.pyplot.scatter(x,y)
matplotlib.pyplot.xlim([0,5])
matplotlib.pyplot.xlabel('Bathroom Count')
matplotlib.pyplot.ylim([0, 520000])
matplotlib.pyplot.ylabel('Zillow Estimate Amount')
matplotlib.pyplot.title('Zillow Estimate Amount By Bathroom Count')
display(matplotlib.pyplot.show())

In [8]:
import matplotlib.pyplot
import pylab

x = []
y = []

for labelPoint in houseDataSetLabelPoints.collect():
  y.append(labelPoint.label)
  x.append(labelPoint.features[2])

matplotlib.pyplot.clf()
matplotlib.pyplot.subplot(211)  
matplotlib.pyplot.scatter(x,y)
matplotlib.pyplot.xlim([0,7])
matplotlib.pyplot.xlabel('Bedroom Count')
matplotlib.pyplot.ylim([0, 520000])
matplotlib.pyplot.ylabel('Zillow Estimate Amount')
matplotlib.pyplot.title('Zillow Estimate Amount By Bedroom Count')
display(matplotlib.pyplot.show())

In [9]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics

print("Number of records", houseDataSetLabelPoints.count())

# Split the House Data Set Label Points  into training and testing sets 
(trainingData, testData) = houseDataSetLabelPoints.randomSplit([0.7, 0.3])

# Build the model
model = LinearRegressionWithSGD.train(trainingData, iterations=100, step=0.000001)
predictionsAndLabels = testData.map(lambda p: (float(model.predict(p.features)), p.label))
metrics = RegressionMetrics(predictionsAndLabels)

# Print output
print("Training Data Set Count", str(trainingData.count()))
print("Testing Data Set Count", str(testData.count()))
print(model)
print("R-Squared", metrics.r2)
print("Mean Squared Error", metrics.meanSquaredError)
print("Root Mean Squared Error", metrics.rootMeanSquaredError)
print("Mean Absolute Error", metrics.meanAbsoluteError)
print("Explained Variance", metrics.explainedVariance)

In [10]:
display(predictionsAndLabels.collect())

In [11]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = houseDataSetLabelPoints.randomSplit([0.7, 0.3])

# Train a RandomForest model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
#  Note: Use larger numTrees in practice.
#  Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                    numTrees=3, featureSubsetStrategy="auto",
                                    impurity='variance', maxDepth=4, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() / float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression forest model:')
print(model.toDebugString())