In [458]:
from pyspark.sql import SparkSession

# Data represents house market information.
# Expects the user to predict the market value of the property using a regression model

In [459]:
sc = SparkSession.builder.getOrCreate()
train_path = 'house-prices-advanced-regression-techniques/train.csv'
test_path = 'house-prices-advanced-regression-techniques/test.csv'
train_data = sc.read.csv(train_path,header=True,inferSchema=True)
test_data = sc.read.csv(test_path,header=True,inferSchema=True)

# First I merge the train and test sets in order to do processing of both
# The Results from the train set is saved in the target variable for later model creation

In [460]:
from pyspark.sql import DataFrame

target = train_data.select('Id','SalePrice')
train_data = train_data.drop('SalePrice')

data = DataFrame.unionAll(train_data, test_data)

# First we need to replace the values of certain columns because the value 'NA' actually means something so we replace it with '0'
# All the data is explained in the data_description.txt file

In [461]:
from pyspark.sql.functions import regexp_replace

for column in [
    'Alley',
    'BsmtQual',
    'BsmtCond',
    'BsmtExposure',
    'BsmtFinType1',
    'BsmtFinType2',
    'FireplaceQu',
    'GarageType',
    'GarageFinish',
    'GarageQual',
    'GarageCond',
    'PoolQC',
    'Fence',
    'MiscFeature'
]:
    data = data.withColumn(column, regexp_replace(column, 'NA', '0'))

# Next step is to replace all the missing values that don't mean anything
# with an average value from each column,
# also I'm not sure why the columns changed it;'s type, so let's change them back to desired type

In [462]:
from pyspark.sql.functions import mean, col
from pyspark.sql.types import IntegerType
for column in ['LotFrontage','MasVnrArea',
                      'BsmtFinSF1','BsmtFinSF2',
                      'BsmtUnfSF', 'TotalBsmtSF',
                      'BsmtFullBath', 'GarageYrBlt',
                      'BsmtHalfBath', 'GarageCars']:
    me = data.select(mean(col(column)).alias('avg')).collect()[0]['avg']
    me = str(int(me))
    data = data.withColumn(column, regexp_replace(column, 'NA', me))
    data = data.withColumn(column,col(column).cast(IntegerType()))

# Filling all the missing values with the mean values of each column

In [463]:
for column, types in data.dtypes:
    if types == 'int':
        me = data.select(mean(col(column)).alias('avg')).collect()[0]['avg']
        me = int(me)
        data.na.fill(value=me,subset=[column])

# Now we aggregate to find out the count of missing values

In [464]:
from pyspark.sql.functions import count, when, isnan
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas()

Unnamed: 0,Id,MSSubClass,MSZoning,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,Utilities,...,ScreenPorch,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


# Now let's perform 3 aggregations with the preprocessed data
# Aggregations :
# - Find average Year of property on the market "YearBuilt"
# - Find mean Lot Area "LotArea"
# - Find skewness of Total square feet of basement area "TotalBsmtSF"

In [465]:
from pyspark.sql.functions import avg, mean, skewness
from time import time
t0 = time()

print("Average Year of property on the market: " + str(data.select(avg("YearBuilt")).collect()[0][0]))
print("Mean Lot Area: " + str(data.select(mean("LotArea")).collect()[0][0]))
print("Skewness of Total square feet of basement area: " + str(data.select(skewness("TotalBsmtSF")).collect()[0][0]))

print("Dataframe aggregations performed in " + str(time() - t0) + " seconds")

Average Year of property on the market: 1971.3127783487496
Mean Lot Area: 10168.11408016444
Skewness of Total square feet of basement area: 1.1624855612191176
Dataframe aggregations performed in 0.3689737319946289 seconds


# Now let's perform 2 different aggregations with the preprocessed data using groupBy
# Aggregations :
# - Find max Lot Area "LotArea" for each type of "MSZoning": Identifies the general zoning classification of the sale.
# - Find sum of area connected to the street "LotFrontage" for each type of "Neighborhood": Physical locations within Ames city limits

In [466]:
t0 = time()
data.groupby('MSZoning').max().select('MSZoning','max(LotArea)').show()
data.groupby('Neighborhood').sum().select('Neighborhood','sum(LotFrontage)').show()

print("SQL dataframe aggregations with groupby performed in " + str(time() - t0) + " seconds")

+--------+------------+
|MSZoning|max(LotArea)|
+--------+------------+
| C (all)|       18000|
|      RH|       12155|
|      FV|       13162|
|      RL|      215245|
|      RM|       33120|
|      NA|       56600|
+--------+------------+

+------------+----------------+
|Neighborhood|sum(LotFrontage)|
+------------+----------------+
|     Veenker|            1704|
|     BrkSide|            6197|
|     NPkVill|             729|
|     NridgHt|           13929|
|     NoRidge|            6121|
|      NWAmes|           10103|
|     OldTown|           14837|
|     Gilbert|           11963|
|     Somerst|           11837|
|     Crawfor|            7186|
|       NAmes|           32896|
|      IDOTRR|            5829|
|     Edwards|           13014|
|      Sawyer|           10963|
|     StoneBr|            3205|
|     CollgCr|           18937|
|       SWISU|            2875|
|     MeadowV|            1121|
|      Timber|            5661|
|     Blmngtn|            1490|
+------------+---------

In [467]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

data.createOrReplaceTempView("sql_table")



In [468]:
t0 = time()
sqlContext.sql('''SELECT MAX(LotArea), MSZoning FROM sql_table GROUP BY MSZoning''').collect()
sqlContext.sql('''SELECT SUM(LotFrontage), Neighborhood FROM sql_table GROUP BY Neighborhood''').collect()

print("SQL aggregations with groupby performed in " + str(time() - t0) + " seconds")

SQL aggregations with groupby performed in 0.5405409336090088 seconds
