In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext

In [3]:
spark = SparkSession.builder.appName('MLPipeline').getOrCreate()

In [4]:
df = spark.read.csv('data.csv', inferSchema=True, header=True)

df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- statezip: string (nullable = true)
 |-- country: string (nullable = true)



In [5]:
df.show()

+-------------------+---------+--------+---------+-----------+--------+------+----------+----+---------+----------+-------------+--------+------------+--------------------+----------------+--------+-------+
|               date|    price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|sqft_above|sqft_basement|yr_built|yr_renovated|              street|            city|statezip|country|
+-------------------+---------+--------+---------+-----------+--------+------+----------+----+---------+----------+-------------+--------+------------+--------------------+----------------+--------+-------+
|2014-05-02 00:00:00| 313000.0|     3.0|      1.5|       1340|    7912|   1.5|         0|   0|        3|      1340|            0|    1955|        2005|18810 Densmore Ave N|       Shoreline|WA 98133|    USA|
|2014-05-02 00:00:00|2384000.0|     5.0|      2.5|       3650|    9050|   2.0|         0|   4|        5|      3370|          280|    1921|           0|     709 W Blaine St|

In [6]:
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------+--------+-------+
|summary|            price|          bedrooms|         bathrooms|       sqft_living|          sqft_lot|            floors|          waterfront|               view|         condition|        sqft_above|     sqft_basement|         yr_built|     yr_renovated|           street|        city|statezip|country|
+-------+-----------------+------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------+--------+-------+
|  count|             4600|              4600|              4600|              4600| 

In [7]:
summary_stats = df.summary()

In [13]:
summary_stats.columns

['summary',
 'price',
 'bedrooms',
 'bathrooms',
 'sqft_living',
 'sqft_lot',
 'floors',
 'waterfront',
 'view',
 'condition',
 'sqft_above',
 'sqft_basement',
 'yr_built',
 'yr_renovated',
 'street',
 'city',
 'statezip',
 'country']

In [27]:
from pyspark.sql.functions import format_number, format_string
summary_stats.select(summary_stats['summary'], summary_stats['price'].cast('float'), 
                     summary_stats['bedrooms'].cast('float'), summary_stats['bathrooms'].cast('float'),
                    summary_stats['sqft_living'].cast('float'), summary_stats['sqft_lot'].cast('float'),
                    summary_stats['sqft_basement'].cast('float'), summary_stats['view'].cast('float'),
                    summary_stats['sqft_above'].cast('float'), summary_stats['condition'].cast('float'),
                    summary_stats['yr_renovated'].cast('integer')).show()

+-------+---------+---------+----------+-----------+---------+-------------+----------+----------+----------+------------+
|summary|    price| bedrooms| bathrooms|sqft_living| sqft_lot|sqft_basement|      view|sqft_above| condition|yr_renovated|
+-------+---------+---------+----------+-----------+---------+-------------+----------+----------+----------+------------+
|  count|   4600.0|   4600.0|    4600.0|     4600.0|   4600.0|       4600.0|    4600.0|    4600.0|    4600.0|        4600|
|   mean| 551963.0|3.4008696| 2.1608152|   2139.347|14852.516|     312.0815|0.24065217| 1827.2654|  3.451739|         808|
| stddev| 563834.7|0.9088481|0.78378105|   963.2069|35884.438|    464.13724| 0.7784047|   862.169|0.67722976|         979|
|    min|      0.0|      0.0|       0.0|      370.0|    638.0|          0.0|       0.0|     370.0|       1.0|           0|
|    25%| 322500.0|      3.0|      1.75|     1460.0|   5000.0|          0.0|       0.0|    1190.0|       3.0|           0|
|    50%|460886.

In [29]:
# drop missing values
df = df.na.drop()

In [34]:
from pyspark.sql.functions import when

df = df.withColumn('bigger_house', when(df['sqft_lot'] > 14852.516, 1).otherwise(0))

In [31]:
df.groupBy('view').count().show()

+----+-----+
|view|count|
+----+-----+
|   1|   69|
|   3|  116|
|   4|   70|
|   2|  205|
|   0| 4140|
+----+-----+



In [32]:
df.groupBy('bedrooms').count().show()

+--------+-----+
|bedrooms|count|
+--------+-----+
|     8.0|    2|
|     0.0|    2|
|     7.0|   14|
|     1.0|   38|
|     4.0| 1531|
|     3.0| 2032|
|     2.0|  566|
|     6.0|   61|
|     5.0|  353|
|     9.0|    1|
+--------+-----+



In [35]:
df.groupBy('bigger_house').count().show()

+------------+-----+
|bigger_house|count|
+------------+-----+
|           1|  752|
|           0| 3848|
+------------+-----+



In [42]:
df.select('bigger_house', 'bedrooms', 'bathrooms', 'view', 'price').groupBy(['bigger_house'])\
        .agg({'price': 'mean'}).show()

+------------+-----------------+
|bigger_house|       avg(price)|
+------------+-----------------+
|           1|686039.8739938992|
|           0|525760.8528413132|
+------------+-----------------+



In [68]:
a = df.groupBy('bathrooms').mean('price').orderBy('avg(price)', desc=True).withColumnRenamed('avg(price)', 'avg_price')

a.select('bathrooms', format_number('avg_price', 2).alias('avg_price')).show()

+---------+------------+
|bathrooms|   avg_price|
+---------+------------+
|     0.75|  293,955.94|
|      1.0|  332,620.63|
|      1.5|  411,044.28|
|     1.75|  459,549.65|
|     1.25|  461,150.00|
|      2.0|  526,857.97|
|     2.25|  537,178.43|
|     5.75|  540,000.00|
|      2.5|  572,705.21|
|     2.75|  643,424.57|
|      3.0|  705,563.58|
|     3.75|  901,380.30|
|      3.5|  903,002.61|
|     3.25|  944,241.29|
|      5.0|  946,171.83|
|      4.0|  959,036.09|
|      0.0|1,195,324.00|
|      4.5|1,379,316.63|
|     4.25|1,396,876.35|
|     6.25|1,444,000.00|
+---------+------------+
only showing top 20 rows



In [70]:
print(df.columns)

['date', 'price', 'bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'view', 'condition', 'sqft_above', 'sqft_basement', 'yr_built', 'yr_renovated', 'street', 'city', 'statezip', 'country', 'bigger_house']


In [72]:
# get the correlation
from pyspark.ml.stat import Correlation
import pandas as pd

x = df.columns[1:13]
corr_df = pd.DataFrame()

for i in x:
    corr = []
    for j in x:
        corr.append(round(df.stat.corr(i, j), 2))
        corr_df = pd.concat([corr_df, pd.Series(corr)], axis=1)
        
# corr_df.columns = x
# corr_df.insert(0, '', x)
# corr_df.set_index('')
corr_df

Unnamed: 0,0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,...,0.10,0.11,0.12,0.13,0.14,0.15,0.16,0.17,0.18,0.19
0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,...,0.02,0.02,0.02,0.02,0.02,0.02,0.02,0.02,0.02,0.02
1,,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,...,0.14,0.14,0.14,0.14,0.14,0.14,0.14,0.14,0.14,0.14
2,,,0.33,0.33,0.33,0.33,0.33,0.33,0.33,0.33,...,0.46,0.46,0.46,0.46,0.46,0.46,0.46,0.46,0.46,0.46
3,,,,0.43,0.43,0.43,0.43,0.43,0.43,0.43,...,,0.29,0.29,0.29,0.29,0.29,0.29,0.29,0.29,0.29
4,,,,,0.05,0.05,0.05,0.05,0.05,0.05,...,,,0.05,0.05,0.05,0.05,0.05,0.05,0.05,0.05
5,,,,,,0.15,0.15,0.15,0.15,0.15,...,,,,0.47,0.47,0.47,0.47,0.47,0.47,0.47
6,,,,,,,0.14,0.14,0.14,0.14,...,,,,,-0.02,-0.02,-0.02,-0.02,-0.02,-0.02
7,,,,,,,,0.23,0.23,0.23,...,,,,,,-0.06,-0.06,-0.06,-0.06,-0.06
8,,,,,,,,,0.03,0.03,...,,,,,,,-0.4,-0.4,-0.4,-0.4
9,,,,,,,,,,0.37,...,,,,,,,,0.41,0.41,0.41


In [75]:
df.corr('price', 'bathrooms')

0.3271099182877348

In [76]:
df.corr('price', 'bedrooms')

0.20033628937567646

In [77]:
df.corr('price', 'condition')

0.03491453732641387

In [79]:
feature_cols = df.columns[2: 13]
print(feature_cols)

['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'view', 'condition', 'sqft_above', 'sqft_basement', 'yr_built']


In [83]:
for i in feature_cols:
    print(f'price and {i} corr score:: {round(df.corr("price", i),2)}')

price and bedrooms corr score:: 0.2
price and bathrooms corr score:: 0.33
price and sqft_living corr score:: 0.43
price and sqft_lot corr score:: 0.05
price and floors corr score:: 0.15
price and waterfront corr score:: 0.14
price and view corr score:: 0.23
price and condition corr score:: 0.03
price and sqft_above corr score:: 0.37
price and sqft_basement corr score:: 0.21
price and yr_built corr score:: 0.02


In [80]:
# feature Enginerring
from pyspark.sql.functions import pow
df = df.withColumn('all_rooms', df['bedrooms']+df['bathrooms'])
df = df.withColumn('bedrooms^2', pow(df['bedrooms'], 2))
df = df.withColumn('bathrooms^2', pow(df['bathrooms'], 2))

In [84]:
df = df.withColumn('sqft_living^2', pow(df['sqft_living'], 2))
df = df.withColumn('sqft_above^2', pow(df['sqft_above'], 2))

### Machice Learning Pipeline

The ML library here with PySpark is not yet on the same level as what scikit-learn provides but it will serve the purpose for now

In [85]:
df.columns

['date',
 'price',
 'bedrooms',
 'bathrooms',
 'sqft_living',
 'sqft_lot',
 'floors',
 'waterfront',
 'view',
 'condition',
 'sqft_above',
 'sqft_basement',
 'yr_built',
 'yr_renovated',
 'street',
 'city',
 'statezip',
 'country',
 'bigger_house',
 'all_rooms',
 'bedrooms^2',
 'bathrooms^2',
 'sqft_living^2',
 'sqft_above^2']

In [86]:
col_2_drop = ['price', 'street', 'city', 'date', 'statezip', 'country', 'condition', 'bedrooms', 'bathrooms']

In [88]:
train, test = df.randomSplit([.8, 0.2])

In [92]:
def regression_metrics(prediction):
    from pyspark.ml.evaluation import RegressionEvaluator
    evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(prediction)
    print("RMSE on test data = %g" % rmse)

In [98]:
from pyspark.ml.feature import VectorAssembler, Normalizer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.pipeline import Pipeline

# specify the stages
vec_assembler = VectorAssembler(inputCols=[i for i in df.columns if i not in col_2_drop], outputCol='features')
normalizer = Normalizer(inputCol='features', outputCol='features_norm')
lr = LinearRegression(featuresCol='features', labelCol='price')

# build the actual pipeline
pipe = Pipeline(stages=[vec_assembler, normalizer, lr])

# fit the data using the pipeline
model = pipe.fit(train)

In [99]:
prediction = model.transform(train)
regression_metrics(prediction)

RMSE on test data = 501676


In [100]:
regression_metrics(model.transform(test))

RMSE on test data = 482527


In [102]:
prediction.select(['price', 'features', 'features_norm', 'prediction']).show()

+-------------+--------------------+--------------------+------------------+
|        price|            features|       features_norm|        prediction|
+-------------+--------------------+--------------------+------------------+
|237227.857143|[2200.0,9397.0,2....|[3.21411809905429...| 535086.4179687053|
|     242500.0|[1200.0,9720.0,1....|[5.89248460208566...| 330298.6412654491|
|     260000.0|[1480.0,8625.0,1....|[4.77772794115911...|360717.66457646433|
|     275000.0|[1180.0,10277.0,1...|[5.99234043111752...| 270814.0957511896|
|     284000.0|[1800.0,23103.0,1...|[3.92831973159204...|452885.00970491115|
|     285000.0|[2090.0,10834.0,1...|[4.40596531231741...|484920.15456246585|
|     287200.0|[1850.0,19966.0,1...|[5.10638872021590...|419386.25289527234|
|     295000.0|[1630.0,1368.0,2....|[5.22192343453728...| 374119.9623330943|
|     300000.0|[2540.0,5050.0,2....|[2.78388425584094...|  614520.316567895|
|     313000.0|[1340.0,7912.0,1....|[5.27688598584067...| 394333.4855443053|

Since Performance is Generally Poor, I'll try scikit learn algorithms

Specifically GradientBoostingRegressor PS: pyspark also provide this but i just want to experiment. You never know what you'll find right??

In [104]:
from sklearn.ensemble import GradientBoostingRegressor
gbr = GradientBoostingRegressor(n_estimators=400, learning_rate=0.1)

After Much Exploration, Scikit Learn Algorithms don't accept DenseVector which pyspark returns hence back to pyspark :)

In [123]:
from pyspark.ml.regression import GBTRegressor
gbr = GBTRegressor(featuresCol='features_norm', labelCol='price')

# build the actual pipeline
pipe = Pipeline(stages=[vec_assembler, normalizer, gbr])

# fit the data using the pipeline
model = pipe.fit(train)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:58251)
Traceback (most recent call last):
  File "C:\Users\ABUTON\Desktop\spark\spark-2.4.4-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\ABUTON\Desktop\spark\spark-2.4.4-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:58251)

In [120]:
prediction = model.transform(train)
regression_metrics(prediction)

RMSE on test data = 230273


Much Better Now still can improve though