In [150]:
import csv
import pandas as pd
from pyspark import SparkContext
from pyspark.sql.functions import avg, when, concat, col, lit
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, VectorIndexer, StringIndexer, PCA
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [138]:
spark = SparkSession \
    .builder \
    .appName("HousingPrice") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

train = spark.read.csv("./train.csv", header=True, inferSchema=True)
test = spark.read.csv("./test.csv", header=True, inferSchema=True)

In [139]:
print len(train.columns), 'columns'
print train.count(), 'rows'
print len(test.columns), 'columns'
print test.count(), 'rows'

81 columns
1460 rows
80 columns
1459 rows


In [140]:
# add a SalePrice column to test set and fill it with 0, as only df with same dimensions can union
# create a dataframe "unioned" containing both training and test set, including SalePrice column

test = test.withColumn('SalePrice', lit(0))
unioned = train.union(test)

In [141]:
print len(unioned.columns), 'columns'
print unioned.count(), 'rows'

81 columns
2919 rows


In [142]:
# add year+(0.01*month) as universal time format for comparing definitive time sequence

unioned = unioned.withColumn('month_and_year', concat(col("YrSold"), lit(""), col("MoSold")/100))
unioned = unioned.withColumn('month_and_year', unioned.month_and_year.cast('int'))

print len(unioned.columns), 'columns'
print unioned.count(), 'rows'

82 columns
2919 rows


In [143]:
# since the datatype of columns of numeric values would appear as string if NA value exist
# convert value type of columns ['LotFrontage', 'GarageYrBlt', 'MasVnrArea'] to integer
# replace NA values in LotFrontage with the average vaule of the column
# replace NA values in GarageYrBlt to 0

avg_LotFrontage = unioned.agg(avg(unioned['LotFrontage'])).collect()[0][0]
unioned = unioned.withColumn('LotFrontage', unioned.LotFrontage.cast('int'))
unioned = unioned.withColumn('MasVnrArea', unioned.MasVnrArea.cast('int'))
unioned = unioned.withColumn('GarageYrBlt', unioned.GarageYrBlt.cast('int'))
unioned = unioned.dropna(subset='MasVnrArea')
unioned = unioned.na.fill({'GarageYrBlt': 0, 'LotFrontage': avg_LotFrontage})

unioned = unioned.withColumn('BsmtFinSF1', unioned.BsmtFinSF1.cast('int'))
unioned = unioned.withColumn('BsmtFinSF2', unioned.BsmtFinSF2.cast('int'))
unioned = unioned.withColumn('BsmtUnfSF', unioned.BsmtUnfSF.cast('int'))
unioned = unioned.withColumn('TotalBsmtSF', unioned.TotalBsmtSF.cast('int'))
unioned = unioned.withColumn('BsmtFullBath', unioned.BsmtFullBath.cast('int'))
unioned = unioned.withColumn('BsmtHalfBath', unioned.BsmtHalfBath.cast('int'))
unioned = unioned.withColumn('GarageCars', unioned.GarageCars.cast('int'))
unioned = unioned.withColumn('GarageArea', unioned.GarageArea.cast('int'))

unioned = unioned.dropna(how='any', subset=['BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', 
                                      'BsmtFullBath', 'BsmtHalfBath', 'GarageCars', 'GarageArea'])

print len(unioned.columns), 'columns'
print unioned.count(), 'rows'

82 columns
2893 rows


In [144]:
categorical_columns = ['MSSubClass', 'MSZoning', 'Street', 'Alley', 'LotShape', 'LandContour',
                       'Utilities', 'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1', 
                       'Condition2', 'BldgType', 'HouseStyle', 'RoofStyle', 'RoofMatl', 
                       'Exterior1st', 'Exterior2nd', 'MasVnrType', 'ExterQual', 'ExterCond', 
                       'Foundation', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 
                       'BsmtFinType2', 'Heating', 'HeatingQC', 'CentralAir', 'Electrical', 
                       'KitchenQual', 'Functional', 'FireplaceQu', 'GarageType', 'GarageFinish', 
                       'GarageQual', 'GarageCond', 'PavedDrive', 'PoolQC', 'Fence', 'MiscFeature', 
                       'SaleType', 'SaleCondition']

In [145]:
for column in categorical_columns:
    
    stringIndexer = StringIndexer(inputCol=column, outputCol=column+'Index')
    model_unioned = stringIndexer.fit(unioned)
    indexed_unioned = model_unioned.transform(unioned)

    encoder = OneHotEncoder(dropLast=False, inputCol=column+'Index', outputCol=column+'Vec')
    unioned = encoder.transform(indexed_unioned)
    unioned = unioned.drop(column+'Index')
    #unioned.select(column+'Vec').show(1)

In [146]:
for column in categorical_columns:
    unioned = unioned.drop(column)

In [147]:
print len(unioned.columns), 'columns'
print unioned.count(), 'rows'

82 columns
2893 rows


In [148]:
columns_without_SalePrice = unioned.columns
columns_without_SalePrice.remove('SalePrice')
'SalePrice' in columns_without_SalePrice

False

In [149]:
vecAssembler = VectorAssembler(inputCols=columns_without_SalePrice, outputCol="features")
unioned = vecAssembler.transform(unioned)

In [154]:
len(unioned.columns)

83

In [185]:
pca = PCA(k=15, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(unioned)
unioned = model.transform(unioned)
model.explainedVariance

DenseVector([0.9613, 0.011, 0.0077, 0.0053, 0.0049, 0.0042, 0.003, 0.0008, 0.0005, 0.0004, 0.0003, 0.0002, 0.0001, 0.0001, 0.0])

In [181]:
unioned.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- LotFrontage: integer (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- MasVnrArea: integer (nullable = true)
 |-- BsmtFinSF1: integer (nullable = true)
 |-- BsmtFinSF2: integer (nullable = true)
 |-- BsmtUnfSF: integer (nullable = true)
 |-- TotalBsmtSF: integer (nullable = true)
 |-- 1stFlrSF: integer (nullable = true)
 |-- 2ndFlrSF: integer (nullable = true)
 |-- LowQualFinSF: integer (nullable = true)
 |-- GrLivArea: integer (nullable = true)
 |-- BsmtFullBath: integer (nullable = true)
 |-- BsmtHalfBath: integer (nullable = true)
 |-- FullBath: integer (nullable = true)
 |-- HalfBath: integer (nullable = true)
 |-- BedroomAbvGr: integer (nullable = true)
 |-- KitchenAbvGr: integer (nullable = true)
 |-- TotRmsAbvGrd: integer (nullable = true)
 |-- Firep

In [16]:
unioned.select('Id', 'LotFrontage', 'LotArea').show(1480)

+----+-----------+-------+
|  Id|LotFrontage|LotArea|
+----+-----------+-------+
|   1|         65|   8450|
|   2|         80|   9600|
|   3|         68|  11250|
|   4|         60|   9550|
|   5|         84|  14260|
|   6|         85|  14115|
|   7|         75|  10084|
|   8|         69|  10382|
|   9|         51|   6120|
|  10|         50|   7420|
|  11|         70|  11200|
|  12|         85|  11924|
|  13|         69|  12968|
|  14|         91|  10652|
|  15|         69|  10920|
|  16|         51|   6120|
|  17|         69|  11241|
|  18|         72|  10791|
|  19|         66|  13695|
|  20|         70|   7560|
|  21|        101|  14215|
|  22|         57|   7449|
|  23|         75|   9742|
|  24|         44|   4224|
|  25|         69|   8246|
|  26|        110|  14230|
|  27|         60|   7200|
|  28|         98|  11478|
|  29|         47|  16321|
|  30|         60|   6324|
|  31|         50|   8500|
|  32|         69|   8544|
|  33|         85|  11049|
|  34|         70|  10552|
|

In [186]:
unioned_pd = unioned.toPandas()

In [187]:
print unioned_pd.shape

(2893, 84)


In [188]:
train_pd = unioned_pd[unioned_pd.Id <= 1460]
test_pd = unioned_pd[unioned_pd.Id > 1460]

In [189]:
print train_pd.shape
print test_pd.shape

(1452, 84)
(1441, 84)


In [190]:
train = spark.createDataFrame(train_pd)
test = spark.createDataFrame(test_pd)

In [191]:
print 'training set', len(train.columns), 'columns', '&', train.count(), 'rows'
print 'test set', len(test.columns), 'columns', '&', test.count(), 'rows'

training set 84 columns & 1452 rows
test set 84 columns & 1441 rows


In [23]:
train.printSchema()

root
 |-- Id: long (nullable = true)
 |-- LotFrontage: long (nullable = true)
 |-- LotArea: long (nullable = true)
 |-- OverallQual: long (nullable = true)
 |-- OverallCond: long (nullable = true)
 |-- YearBuilt: long (nullable = true)
 |-- YearRemodAdd: long (nullable = true)
 |-- MasVnrArea: long (nullable = true)
 |-- BsmtFinSF1: long (nullable = true)
 |-- BsmtFinSF2: long (nullable = true)
 |-- BsmtUnfSF: long (nullable = true)
 |-- TotalBsmtSF: long (nullable = true)
 |-- 1stFlrSF: long (nullable = true)
 |-- 2ndFlrSF: long (nullable = true)
 |-- LowQualFinSF: long (nullable = true)
 |-- GrLivArea: long (nullable = true)
 |-- BsmtFullBath: long (nullable = true)
 |-- BsmtHalfBath: long (nullable = true)
 |-- FullBath: long (nullable = true)
 |-- HalfBath: long (nullable = true)
 |-- BedroomAbvGr: long (nullable = true)
 |-- KitchenAbvGr: long (nullable = true)
 |-- TotRmsAbvGrd: long (nullable = true)
 |-- Fireplaces: long (nullable = true)
 |-- GarageYrBlt: long (nullable = true

In [24]:
test.printSchema()

root
 |-- Id: long (nullable = true)
 |-- LotFrontage: long (nullable = true)
 |-- LotArea: long (nullable = true)
 |-- OverallQual: long (nullable = true)
 |-- OverallCond: long (nullable = true)
 |-- YearBuilt: long (nullable = true)
 |-- YearRemodAdd: long (nullable = true)
 |-- MasVnrArea: long (nullable = true)
 |-- BsmtFinSF1: long (nullable = true)
 |-- BsmtFinSF2: long (nullable = true)
 |-- BsmtUnfSF: long (nullable = true)
 |-- TotalBsmtSF: long (nullable = true)
 |-- 1stFlrSF: long (nullable = true)
 |-- 2ndFlrSF: long (nullable = true)
 |-- LowQualFinSF: long (nullable = true)
 |-- GrLivArea: long (nullable = true)
 |-- BsmtFullBath: long (nullable = true)
 |-- BsmtHalfBath: long (nullable = true)
 |-- FullBath: long (nullable = true)
 |-- HalfBath: long (nullable = true)
 |-- BedroomAbvGr: long (nullable = true)
 |-- KitchenAbvGr: long (nullable = true)
 |-- TotRmsAbvGrd: long (nullable = true)
 |-- Fireplaces: long (nullable = true)
 |-- GarageYrBlt: long (nullable = true

In [192]:
train_copy = train
test_copy = test

In [242]:
#train = train_copy
#test = test_copy

In [193]:
print 'training set', len(train.columns), 'columns', '&', train.count(), 'rows'
print 'test set', len(test.columns), 'columns', '&', test.count(), 'rows'

training set 84 columns & 1452 rows
test set 84 columns & 1441 rows


In [194]:
train.select('pcaFeatures').show(5)
test.select('pcaFeatures').show(5)

+--------------------+
|         pcaFeatures|
+--------------------+
|[8524.21489708970...|
|[9679.25679373173...|
|[11324.1981858672...|
|[9618.71659182817...|
|[14351.6759761731...|
+--------------------+
only showing top 5 rows

+--------------------+
|         pcaFeatures|
+--------------------+
|[11671.1999432521...|
|[14403.1066482846...|
|[13895.7523445438...|
|[10043.6973642875...|
|[5073.42355043749...|
+--------------------+
only showing top 5 rows



In [195]:
rf = RandomForestRegressor(labelCol="SalePrice", featuresCol="pcaFeatures", \
                           featureSubsetStrategy='auto', maxDepth=30)
model = rf.fit(train)
predictions = model.transform(test)

In [196]:
predictions.select('Id', 'prediction').show(1441)

+----+----------+
|  Id|prediction|
+----+----------+
|1461|  154178.9|
|1462|  216924.1|
|1463|  189380.0|
|1464|  177212.5|
|1465| 175918.75|
|1466| 188668.75|
|1467|  158567.5|
|1468|  178517.3|
|1469|  182432.5|
|1470|  146660.0|
|1471|  223147.8|
|1472|  105490.0|
|1473| 114479.15|
|1474| 155559.75|
|1475|  100950.0|
|1476|  314623.9|
|1477| 247011.75|
|1478| 253707.55|
|1479|  271066.5|
|1480| 448867.15|
|1481| 298121.55|
|1482|  194230.1|
|1483| 177352.35|
|1484|  158961.5|
|1485|  155782.5|
|1486| 180061.25|
|1487|  328907.5|
|1488|  242298.6|
|1489|  188511.3|
|1490| 233861.95|
|1491|  193798.5|
|1492|  119260.0|
|1493|  186017.5|
|1494|  257320.1|
|1495|  274220.2|
|1496|  252668.9|
|1497|  182065.4|
|1498|  164290.8|
|1499| 142211.65|
|1500|  159769.9|
|1501|  173780.0|
|1502|  149808.3|
|1503| 266083.35|
|1504|  205423.9|
|1505| 234638.45|
|1506| 247306.45|
|1507|  274266.4|
|1508|  218791.5|
|1509|  167007.5|
|1510| 169681.65|
|1511|  170195.0|
|1512|  207488.6|
|1513|  17

In [197]:
avg_predicted_SalePrice = predictions.agg(avg(predictions['prediction'])).collect()[0][0]
print avg_predicted_SalePrice

183779.538508


In [198]:
predict_pd = predictions.select('Id', 'prediction').toPandas()

In [35]:
set(predict_pd.loc[:, ['Id']]['Id'].values)

{1461,
 1462,
 1463,
 1464,
 1465,
 1466,
 1467,
 1468,
 1469,
 1470,
 1471,
 1472,
 1473,
 1474,
 1475,
 1476,
 1477,
 1478,
 1479,
 1480,
 1481,
 1482,
 1483,
 1484,
 1485,
 1486,
 1487,
 1488,
 1489,
 1490,
 1491,
 1492,
 1493,
 1494,
 1495,
 1496,
 1497,
 1498,
 1499,
 1500,
 1501,
 1502,
 1503,
 1504,
 1505,
 1506,
 1507,
 1508,
 1509,
 1510,
 1511,
 1512,
 1513,
 1514,
 1515,
 1516,
 1517,
 1518,
 1519,
 1520,
 1521,
 1522,
 1523,
 1524,
 1525,
 1526,
 1527,
 1528,
 1529,
 1530,
 1531,
 1532,
 1533,
 1534,
 1535,
 1536,
 1537,
 1538,
 1539,
 1540,
 1541,
 1542,
 1543,
 1544,
 1545,
 1546,
 1547,
 1548,
 1549,
 1550,
 1551,
 1552,
 1553,
 1554,
 1555,
 1556,
 1557,
 1558,
 1559,
 1560,
 1561,
 1562,
 1563,
 1564,
 1565,
 1566,
 1567,
 1568,
 1569,
 1570,
 1571,
 1572,
 1573,
 1574,
 1575,
 1576,
 1577,
 1578,
 1579,
 1580,
 1581,
 1582,
 1583,
 1584,
 1585,
 1586,
 1587,
 1588,
 1589,
 1590,
 1591,
 1592,
 1593,
 1594,
 1595,
 1596,
 1597,
 1598,
 1599,
 1600,
 1601,
 1602,
 1603,

In [199]:
submit = []

for index in range(1461, 1461+1459):
    if index in set(predict_pd.loc[:, ['Id']]['Id'].values):
        price = predict_pd[predict_pd.Id == index]['prediction'].values[0]
    else:
        price = avg_predicted_SalePrice
    submit.append([index, price])

In [200]:
print submit[0]
print submit[1458]

[1461, 154178.89999999999]
[2919, 252402.0]


In [201]:
with open("submission.csv", "wb") as f:
    writer = csv.writer(f)
    writer.writerow(['Id', 'SalePrice'])
    writer.writerows(submit)

In [None]:
# Complete

In [80]:
# side attempts

model.featureImportances

SparseVector(328, {0: 0.0064, 1: 0.0103, 2: 0.0164, 3: 0.2726, 4: 0.0067, 5: 0.0218, 6: 0.0067, 7: 0.0039, 8: 0.0166, 9: 0.0005, 10: 0.0048, 11: 0.0269, 12: 0.0385, 13: 0.0124, 14: 0.0001, 15: 0.0718, 16: 0.002, 17: 0.0018, 18: 0.0291, 19: 0.0015, 20: 0.0046, 21: 0.001, 22: 0.0088, 23: 0.0097, 24: 0.0191, 25: 0.1313, 26: 0.0233, 27: 0.0074, 28: 0.0046, 29: 0.0005, 30: 0.0001, 31: 0.0003, 32: 0.0064, 33: 0.0, 34: 0.0044, 35: 0.0021, 36: 0.0007, 37: 0.0003, 38: 0.0035, 39: 0.0004, 40: 0.0001, 41: 0.0003, 42: 0.0005, 43: 0.0005, 44: 0.0001, 45: 0.0, 46: 0.0, 47: 0.0, 48: 0.0, 49: 0.0, 51: 0.0, 53: 0.0021, 54: 0.0017, 55: 0.0001, 56: 0.0, 57: 0.0001, 61: 0.0001, 62: 0.0001, 63: 0.0001, 64: 0.0005, 65: 0.0002, 66: 0.0, 68: 0.0006, 69: 0.0, 70: 0.0016, 71: 0.0004, 75: 0.0007, 76: 0.0001, 77: 0.0002, 78: 0.0001, 79: 0.0, 80: 0.0004, 81: 0.0005, 82: 0.0003, 83: 0.0001, 84: 0.0003, 85: 0.0008, 86: 0.0015, 87: 0.0, 88: 0.0001, 89: 0.0001, 90: 0.0001, 91: 0.0004, 92: 0.0001, 93: 0.0001, 94: 0.0, 