In [31]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import desc
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [3]:
df = spark.read.csv('./data/final_data_set.csv',
                   inferSchema=True,
                    header=True
                   )

In [4]:
df = df.drop('_c0')

In [5]:
print((df.count(), len(df.columns)))

(5852, 22)


In [6]:
df.printSchema()

root
 |-- price: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- sqft_living15: integer (nullable = true)
 |-- sqft_lot15: integer (nullable = true)
 |-- walk_score: integer (nullable = true)
 |-- transit_score: integer (nullable = true)
 |-- pers_crime_score: integer (nullable = true)
 |-- prop_crime_score: integer (nullable = true)
 |-- income: integer (nullable = true)
 |-- renovated_yrs_ago: integer (nullable = true)
 |-- log_price: double (nullable = true)



In [7]:
df.show(3)

+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+
| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|sqft_living15|sqft_lot15|walk_score|transit_score|pers_crime_score|prop_crime_score|income|renovated_yrs_ago|         log_price|
+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+
|221900|       3|      1.0|       1180|    5650|   1.0|         0|   0|        3|    7|      1180|            0|      60|         1340|      5650|        46|           46|               2|               3| 71524|               60|12.309982108920686|


In [8]:
df.describe(['price']).show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|              5852|
|   mean| 553231.8257006152|
| stddev|247486.31361324288|
|    min|             90000|
|    max|           1570000|
+-------+------------------+



In [9]:
df.describe().toPandas()

Unnamed: 0,summary,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,waterfront,view,condition,...,yr_built,sqft_living15,sqft_lot15,walk_score,transit_score,pers_crime_score,prop_crime_score,income,renovated_yrs_ago,log_price
0,count,5852.0,5852.0,5852.0,5852.0,5852.0,5852.0,5852.0,5852.0,5852.0,...,5852.0,5852.0,5852.0,5852.0,5852.0,5852.0,5852.0,5852.0,5852.0,5852.0
1,mean,553231.8257006152,3.126794258373206,1.9082792207792207,1755.2424812030076,5288.316131237184,1.527682843472317,0.0018796992481203,0.2441900205058099,3.4448051948051948,...,61.30587833219412,1647.2812713602186,5202.725563909775,67.68113465481886,52.604750512645246,1.960868079289132,2.099794941900205,81775.06818181818,57.034005468216,13.13247397082676
2,stddev,247486.31361324288,1.071525033061304,0.7756144290599073,721.24214276076,5906.49018430074,0.6112397050160592,0.0433184329838561,0.7471306086749057,0.7036749588932281,...,35.04861598472822,459.8347984633792,6063.99439546608,18.92226538694838,11.10841208248582,0.831366665959762,0.8804568112835992,26074.238176516246,35.831109923801336,0.4261721665779862
3,min,90000.0,0.0,0.0,370.0,520.0,1.0,0.0,0.0,1.0,...,0.0,460.0,651.0,0.0,0.0,1.0,1.0,12269.0,0.0,11.407564949312402
4,max,1570000.0,33.0,7.5,6070.0,219978.0,3.5,1.0,4.0,5.0,...,115.0,5600.0,216928.0,99.0,95.0,4.0,4.0,199542.0,115.0,14.266586177324491


In [10]:
df.show(2)

+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+
| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|sqft_living15|sqft_lot15|walk_score|transit_score|pers_crime_score|prop_crime_score|income|renovated_yrs_ago|         log_price|
+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+
|221900|       3|      1.0|       1180|    5650|   1.0|         0|   0|        3|    7|      1180|            0|      60|         1340|      5650|        46|           46|               2|               3| 71524|               60|12.309982108920686|


In [11]:
df.registerTempTable('df')
bedrooms = spark.sql(r'''SELECT avg(price), bedrooms FROM df GROUP BY bedrooms''')
bedrooms.show()

+-----------------+--------+
|       avg(price)|bedrooms|
+-----------------+--------+
|335107.0202020202|       1|
|724756.8928571428|       6|
|532000.7742316785|       3|
|707386.0604229607|       5|
|893999.8333333334|       9|
|685722.4777358491|       4|
|         715600.0|       8|
|         685830.0|       7|
|         660000.0|      10|
|         520000.0|      11|
|         640000.0|      33|
|435478.5604699378|       2|
|         691500.0|       0|
+-----------------+--------+



In [12]:
df.select('price', 'bedrooms').show(10)

+------+--------+
| price|bedrooms|
+------+--------+
|221900|       3|
|538000|       3|
|180000|       2|
|662500|       3|
|468000|       2|
|530000|       5|
|650000|       4|
|485000|       4|
|385000|       4|
|937000|       3|
+------+--------+
only showing top 10 rows



In [13]:
df.groupBy('bedrooms').count().show()

+--------+-----+
|bedrooms|count|
+--------+-----+
|       1|   99|
|       6|   84|
|       3| 2538|
|       5|  331|
|       9|    6|
|       4| 1325|
|       8|    5|
|       7|   12|
|      10|    1|
|      11|    1|
|      33|    1|
|       2| 1447|
|       0|    2|
+--------+-----+



In [14]:
# Divide one column by another, save result as new column (save as new df to make permanent)
df.withColumn('price_per_sq_ft', df['price'] / df['sqft_living']).show(5)

+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+------------------+
| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|sqft_living15|sqft_lot15|walk_score|transit_score|pers_crime_score|prop_crime_score|income|renovated_yrs_ago|         log_price|   price_per_sq_ft|
+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+------------------+
|221900|       3|      1.0|       1180|    5650|   1.0|         0|   0|        3|    7|      1180|            0|      60|         1340|      5650|        46|           46|               2|     

In [15]:
# Create new column with lambda function - if condition of home lower than 4. Again, did not save this to our dataframe (just for my reference)
condition_udf = udf(lambda condition: True if condition < 4 else False)
df.withColumn('new_column', condition_udf(df['condition'])).show(5)

+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+----------+
| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|sqft_living15|sqft_lot15|walk_score|transit_score|pers_crime_score|prop_crime_score|income|renovated_yrs_ago|         log_price|new_column|
+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+----------+
|221900|       3|      1.0|       1180|    5650|   1.0|         0|   0|        3|    7|      1180|            0|      60|         1340|      5650|        46|           46|               2|               3| 71524|     

In [16]:
df.groupBy('bathrooms').count().sort(desc('count')).show(10)

+---------+-----+
|bathrooms|count|
+---------+-----+
|      1.0| 1629|
|     1.75|  837|
|      2.5|  779|
|      2.0|  667|
|      1.5|  538|
|     2.25|  430|
|      3.0|  252|
|     2.75|  242|
|      3.5|  181|
|     3.25|  176|
+---------+-----+
only showing top 10 rows



### Train-test split

In [17]:
X = df.select(['bedrooms',
 'bathrooms',
 'sqft_living',
 'sqft_lot',
 'floors',
 'waterfront',
 'view',
 'condition',
 'grade',
 'sqft_above',
 'sqft_basement',
 'yr_built',
 'sqft_living15',
 'sqft_lot15',
 'walk_score',
 'transit_score',
 'pers_crime_score',
 'prop_crime_score',
 'income',
 'renovated_yrs_ago'])

y = df.select(['log_price'])

In [18]:
train_df, test_df = df.randomSplit([0.7, 0.3], seed=41)

In [19]:
print((train_df.count(), len(test_df.columns)))

(4086, 22)


In [20]:
col_names = ['bedrooms',
 'bathrooms',
 'sqft_living',
 'sqft_lot',
 'floors',
 'waterfront',
 'view',
 'condition',
 'grade',
 'sqft_above',
 'sqft_basement',
 'yr_built',
 'sqft_living15',
 'sqft_lot15',
 'walk_score',
 'transit_score',
 'pers_crime_score',
 'prop_crime_score',
 'income',
 'renovated_yrs_ago']

vectorAssembler = VectorAssembler(inputCols=col_names, outputCol='features')
df_vec = vectorAssembler.transform(df)
df_vec = df_vec.select(['features', 'log_price'])
df_vec.show(2)

+--------------------+------------------+
|            features|         log_price|
+--------------------+------------------+
|[3.0,1.0,1180.0,5...|12.309982108920686|
|[3.0,2.25,2570.0,...|13.195613839143922|
+--------------------+------------------+
only showing top 2 rows



In [21]:
train_df_vec, test_df_vec = df_vec.randomSplit([0.7, 0.3], seed=41)

### Fit the Linear Model

In [22]:
lr = LinearRegression(featuresCol='features', labelCol='log_price', standardization=False)
lr_model = lr.fit(train_df_vec)
print("Coefficients: " + str(lr_model.coefficients))
print()
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0014120365331601508,0.01676982191138472,9.356553304619151e-05,1.4971286256994199e-06,-0.016368237950986873,0.4807758661816554,0.04724522202278337,0.0616827699313226,0.13431494839992134,0.00013030611296370933,5.5419912317765475e-05,0.00231676823846281,0.00012779473372970143,-5.29752396847563e-07,0.00544690741122347,0.0027578469879139538,-0.0615313975238129,-0.031683471618067255,3.3823416719886758e-06,-0.0007167967870149108]

Intercept: 10.622309741315698


In [23]:
def sort_coefficients(model):
    """Returns a given linear model's sorted coefficients."""
    return sorted(list(zip(lr_model.coefficients, col_names)), reverse=True)

sort_coefficients(lr_model)

[(0.4807758661816554, 'waterfront'),
 (0.13431494839992134, 'grade'),
 (0.0616827699313226, 'condition'),
 (0.04724522202278337, 'view'),
 (0.01676982191138472, 'bathrooms'),
 (0.00544690741122347, 'walk_score'),
 (0.0027578469879139538, 'transit_score'),
 (0.00231676823846281, 'yr_built'),
 (0.0014120365331601508, 'bedrooms'),
 (0.00013030611296370933, 'sqft_above'),
 (0.00012779473372970143, 'sqft_living15'),
 (9.356553304619151e-05, 'sqft_living'),
 (5.5419912317765475e-05, 'sqft_basement'),
 (3.3823416719886758e-06, 'income'),
 (1.4971286256994199e-06, 'sqft_lot'),
 (-5.29752396847563e-07, 'sqft_lot15'),
 (-0.0007167967870149108, 'renovated_yrs_ago'),
 (-0.016368237950986873, 'floors'),
 (-0.031683471618067255, 'prop_crime_score'),
 (-0.0615313975238129, 'pers_crime_score')]

In [24]:
def training_summary(model):
    """Prints out R^2 and RMSE and returns trainingSummary given a model."""
    training_summary = model.summary
    print(f'R^2: {round(training_summary.r2, 4)}')
    print(f'RMSE: {round(training_summary.rootMeanSquaredError, 4)}')
    return training_summary

training_summary(lr_model)

R^2: 0.7463
RMSE: 0.2126


<pyspark.ml.regression.LinearRegressionTrainingSummary at 0x107ac6198>

In [25]:
lr_predictions = lr_model.transform(test_df_vec)
lr_predictions.select("prediction","log_price","features").show(5)

lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="log_price",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+------------------+--------------------+
|        prediction|         log_price|            features|
+------------------+------------------+--------------------+
|12.698925408373261|12.449018824140563|[1.0,0.5,880.0,16...|
| 12.65886953365868|12.577636201962656|[1.0,0.75,740.0,1...|
|12.657726789110075|12.768541502448002|[1.0,0.75,930.0,6...|
|13.026544779377419|12.708664464369061|[1.0,1.0,580.0,17...|
|12.708244742655083|12.216022976383341|[1.0,1.0,590.0,83...|
+------------------+------------------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.753261


### Create Pipeline

In [26]:
df.show(2)

+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+
| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|sqft_living15|sqft_lot15|walk_score|transit_score|pers_crime_score|prop_crime_score|income|renovated_yrs_ago|         log_price|
+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+
|221900|       3|      1.0|       1180|    5650|   1.0|         0|   0|        3|    7|      1180|            0|      60|         1340|      5650|        46|           46|               2|               3| 71524|               60|12.309982108920686|


In [27]:
# Create a new column to calculate year home was renovated
df.withColumn('renovated_year', 2019 - df['renovated_yrs_ago']).show(3)

+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+--------------+
| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|sqft_living15|sqft_lot15|walk_score|transit_score|pers_crime_score|prop_crime_score|income|renovated_yrs_ago|         log_price|renovated_year|
+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+-------------+----------+----------+-------------+----------------+----------------+------+-----------------+------------------+--------------+
|221900|       3|      1.0|       1180|    5650|   1.0|         0|   0|        3|    7|      1180|            0|      60|         1340|      5650|        46|           46|               2|               3|

In [28]:
new = spark.sql(r'''
SELECT price, bedrooms
FROM df
WHERE bedrooms >= 4''')
new.show(5)

+------+--------+
| price|bedrooms|
+------+--------+
|530000|       5|
|650000|       4|
|485000|       4|
|385000|       4|
|687500|       4|
+------+--------+
only showing top 5 rows



In [45]:
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

In [40]:
z.mean

DenseVector([3.1268, 1.9083, 1755.2425, 5288.3161, 1.5277, 0.0019, 0.2442, 3.4448, 7.37, 1403.9357, 351.3067, 61.3059, 1647.2813, 5202.7256, 67.6811, 52.6048, 1.9609, 2.0998, 81775.0682, 57.034])

In [41]:
z.std

DenseVector([1.0715, 0.7756, 721.2421, 5906.4902, 0.6112, 0.0433, 0.7471, 0.7037, 0.9391, 553.3933, 414.061, 35.0486, 459.8348, 6063.9944, 18.9223, 11.1084, 0.8314, 0.8805, 26074.2382, 35.8311])

#### Create a pipeline

In [33]:
pipeline = Pipeline(stages=[vectorAssembler, scaler, lr]) 

# fit and transform
lr_model2 = pipeline.fit(train_df)
lr_pred2 = lr_model2.transform(test_df)

# the results are the same for GBT as the second table above
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="log_price", metricName="r2")
print("R Squared (R^2) on test data = %g" % lr_evaluator.evaluate(lr_pred2))
lr_pred2.select("prediction", "log_price", "features").show(5)

R Squared (R^2) on test data = 0.74923
+------------------+------------------+--------------------+
|        prediction|         log_price|            features|
+------------------+------------------+--------------------+
|12.474578371044746|11.580584113444043|[3.0,1.5,910.0,51...|
|12.686302284486954|11.736069016284437|[3.0,1.0,1230.0,4...|
| 12.59558726473031| 11.79810440720389|[5.0,2.0,1430.0,5...|
|12.625001600151219|11.918390573078392|[4.0,2.0,1460.0,7...|
|12.373305287825216|11.925035115797062|[2.0,1.0,720.0,42...|
+------------------+------------------+--------------------+
only showing top 5 rows

