In [1]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.types import DateType
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("PricePrediction").getOrCreate()

In [8]:
!pip show pyspark

Name: pyspark
Version: 3.0.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.7/dist-packages
Requires: py4j
Required-by: 


In [2]:
# load data
df = spark.read.format('csv').option('header',True).option('multiLine', True).load('AB_NYC_2019.csv')
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: string (nullable = true)



In [3]:
# drop redundant columns and filter outliers
df = df.drop('id', 'host_name', 'host_id')
df = df.filter(df.price<800)
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: string (nullable = true)



In [4]:
# After data loading all columns have string dtype. Transform to correct types
from pyspark.sql.types import DateType
df = df.withColumn('latitude_num', df['latitude'].cast('float')).drop('latitude') \
    .withColumn('longitude_num', df['longitude'].cast('float')).drop('longitude') \
    .withColumn('number_of_reviews_int', df['number_of_reviews'].cast('int')).drop('number_of_reviews') \
    .withColumn('minimum_nights_int', df['minimum_nights'].cast('int')).drop('minimum_nights') \
    .withColumn('reviews_per_month_num', df['reviews_per_month'].cast('float')).drop('reviews_per_month') \
    .withColumn('calculated_host_listings_count_num', df['calculated_host_listings_count'].cast('int')).drop('calculated_host_listings_count') \
    .withColumn('availability_365_int', df['availability_365'].cast('int')).drop('availability_365') \
    .withColumn('last_review_date', df['last_review'].cast(DateType())).drop('last_review') \
    .withColumn('price_num', df['price'].cast('float')).drop('price')
#.withColumn("host_id_int", df['host_id'].cast('int')).drop('host_id') 
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- latitude_num: float (nullable = true)
 |-- longitude_num: float (nullable = true)
 |-- number_of_reviews_int: integer (nullable = true)
 |-- minimum_nights_int: integer (nullable = true)
 |-- reviews_per_month_num: float (nullable = true)
 |-- calculated_host_listings_count_num: integer (nullable = true)
 |-- availability_365_int: integer (nullable = true)
 |-- last_review_date: date (nullable = true)
 |-- price_num: float (nullable = true)



In [5]:
# Categorize string categorical columns into integers
from pyspark.ml.feature import StringIndexer, StringIndexerModel
stringIndexer = StringIndexer(inputCols=["neighbourhood_group", 'neighbourhood', 'room_type'],
                              outputCols=["neighbourhood_group_int", 'neighbourhood_int', 'room_type_int'],
                              stringOrderType="frequencyDesc")
stringIndexer_model = stringIndexer.fit(df)
df = stringIndexer_model.transform(df)
df = df.drop(*["neighbourhood_group", 'neighbourhood', 'room_type'])

In [6]:
# stringIndexerPath = './stringIndexer'
# stringIndexer_model.save(stringIndexerPath)
# stringIndexer_model = StringIndexerModel.load(stringIndexerPath)

Py4JJavaError: An error occurred while calling o158.save.
: java.io.IOException: Path ./stringIndexer already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
# One-hot encode them
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderModel
ohe = OneHotEncoder(inputCols=["neighbourhood_group_int", 'neighbourhood_int', 'room_type_int'],
                    outputCols=["neighbourhood_group_vec", 'neighbourhood_vec', 'room_type_vec'])
ohe_model = ohe.fit(df)
encoded = ohe_model.transform(df)
df = encoded.drop(*["neighbourhood_group_int", 'neighbourhood_int', 'room_type_int'])

In [None]:
# ohe_model_path = './ohe'
# ohe_model.save(ohe_model_path)
# ohe_model = OneHotEncoderModel.load(ohe_model_path)

In [None]:
# Some feature ingineering
from pyspark.sql.functions import datediff, to_date, lit, length, when
df = df.withColumn("minimum_nights", when(df["minimum_nights_int"] > 30, 30).otherwise(df["minimum_nights_int"])).drop('minimum_nights_int')
df = df.withColumn('name_length', length('name')).drop('name')
df = df.withColumn("days_from_review", 
                   datediff(to_date(lit("2020-01-01")),
                            to_date("last_review_date","yyyy-MM-dd"))).drop('last_review_date')
# Fill NaNs and Nulls
df = df.fillna({'days_from_review': 0,
                'reviews_per_month_num': 0,
                'name_length': 0,
                'calculated_host_listings_count_num': 1,
                'number_of_reviews_int': 0 })
df = df.na.drop()

In [None]:
# df = df.drop("latitude_num",'longitude_num')

In [None]:
df.printSchema()

In [None]:
# from pyspark.sql.functions import isnan, when, count, col, isnull

# print(df.agg({"minimum_nights": "max"}).collect()[0][0])
# df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

In [None]:
df.columns

In [7]:
# create feature column for spark regressor
from pyspark.ml.feature import VectorAssembler
from copy import copy
inputCols = copy(df.columns)
inputCols.remove('price_num')
vectorAssembler = VectorAssembler(inputCols = inputCols,
                                  outputCol = 'features',
                                 )
df = vectorAssembler.transform(df)
df = df.select(['features', 'price_num'])
df.show(5,False)
df.printSchema()

IllegalArgumentException: Data type string of column name is not supported.
Data type date of column last_review_date is not supported.

In [58]:
vectorAssembler_path = './vectorAssembler'
vectorAssembler.save(vectorAssembler_path)
vectorAssembler = vectorAssembler.load(vectorAssembler_path)

In [60]:
# Vectorize feature column
from pyspark.ml.feature import VectorIndexer, VectorIndexerModel
featureIndexer = VectorIndexer(inputCol="features", outputCol="features_vec", maxCategories=230).fit(df)
df = featureIndexer.transform(df)
df = df.select(['features_vec', 'price_num'])
df.select('features_vec').show(5, False)

+---------------------------------------------------------------------------------------------------------------------------------------------+
|features_vec                                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------------------------+
|(237,[0,1,2,3,4,5,7,61,232,234,235,236],[40.647491455078125,-73.97236633300781,9.0,0.20999999344348907,5.0,365.0,1.0,1.0,1.0,0.0,34.0,439.0])|
|(237,[0,1,2,3,4,5,6,20,231,234,235,236],[40.75362014770508,-73.98377227783203,45.0,0.3799999952316284,1.0,355.0,1.0,1.0,1.0,0.0,21.0,225.0]) |
|(237,[0,1,4,5,6,13,232,234,235],[40.80902099609375,-73.94190216064453,0.0,365.0,1.0,1.0,1.0,2.0,35.0])                                       |
|(237,[0,1,2,3,4,5,7,30,231,234,235,236],[40.68513870239258,-73.95976257324219,270.0,4.639999866485596,0.0,194.0,1.0,1.0,1.0,0.0,31.0,18

In [62]:
# split the data into train and test 
splits = df.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

In [63]:
# fit linear regression
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features_vec', labelCol='price_num')
lr_model = lr.fit(train_df)
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 79.387118
r2: 0.421572


In [64]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","price_num","features_vec").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="price_num",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+---------+--------------------+
|        prediction|price_num|        features_vec|
+------------------+---------+--------------------+
|177.57065077970765|    149.0|(237,[0,1,2,3,4,5...|
| 216.3503272705202|    300.0|(237,[0,1,2,3,4,5...|
|200.38307160902332|    104.0|(237,[0,1,2,3,4,5...|
|166.23831671215885|    299.0|(237,[0,1,2,3,4,5...|
|171.80952899411204|    200.0|(237,[0,1,2,3,4,5...|
+------------------+---------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.410451


In [65]:
model_path = '/opt/workspace/lr_model'
lr_model.save(model_path)

In [None]:
# Random forest
from pyspark.ml.evaluation import RegressionEvaluator
rfc = RandomForestRegressor(featuresCol = 'features_vec', labelCol='price_num', #numTrees=20,
                           maxDepth=30, maxBins=250)
rfc_model = rfc.fit(train_df)
rfc_predictions = rfc_model.transform(test_df)
rfc_predictions.select("prediction","price_num","features_vec").show(5)
rfc_evaluator = RegressionEvaluator(labelCol="price_num", predictionCol="prediction", metricName="r2")
print("R Squared (R2) on test data = %g" % rfc_evaluator.evaluate(rfc_predictions))

In [20]:
# from pyspark.ml.regression import RandomForestRegressionModel, LinearRegressionModel
# model_path = '/opt/workspace/lr_model'
# lr_model.save(model_path)
# model2 = LinearRegressionModel.load(model_path)

In [44]:
!rm -rf stringIndexer

In [24]:
model_path = '/opt/workspace/lr_model'
lr_model = LinearRegressionModel.load(model_path)
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction").show(5).

+------------------+
|        prediction|
+------------------+
| 158.4032963124846|
| 165.0361241964565|
|189.95015759149828|
|150.68205976478203|
|198.97464480284543|
+------------------+
only showing top 5 rows



In [17]:
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
schema = StructType([])
request = sqlContext.createDataFrame(sc.emptyRDD(), schema)
request = request\
.withColumn("category",F.lit('nation'))\
.withColumn("category_id",F.lit('nation'))\
.withColumn("bucket",F.lit(bucket))\
.withColumn("prop_count",F.lit(prop_count))\
.withColumn("event_count",F.lit(event_count))\
.withColumn("accum_prop_count",F.lit(accum_prop_count))\
.withColumn("accum_event_count",F.lit(accum_event_count))

In [None]:
from pyspark.ml.feature import OneHotEncoderModel, VectorAssembler, StringIndexerModel, VectorIndexerModel
vectorAssembler_path = './vectorAssembler'
ohe_model_path = './ohe'
stringIndexerPath = './stringIndexer'
featureIndexer_path = './featureIndexer'

feature_model = VectorIndexerModel.load(featureIndexer_path)
vectorAssembler = vectorAssembler.load(vectorAssembler_path)
ohe_model = OneHotEncoderModel.load(ohe_model_path)
stringIndexer_model = StringIndexerModel.load(stringIndexerPath)

In [None]:
df = stringIndexer_model.transform(df)
df = df.drop(*["neighbourhood_group", 'neighbourhood', 'room_type'])
df = ohe_model.transform(df)
df = df.drop(*["neighbourhood_group_int", 'neighbourhood_int', 'room_type_int'])

In [None]:
from pyspark.sql.functions import datediff, to_date, lit, length, when
df = df.withColumn("minimum_nights", when(df["minimum_nights_int"] > 30, 30).otherwise(df["minimum_nights_int"])).drop('minimum_nights_int')
df = df.withColumn('name_length', length('name')).drop('name')
df = df.fillna({'days_from_review': 0,
                'reviews_per_month_num': 0,
                'name_length': 0,
                'calculated_host_listings_count_num': 1,
                'number_of_reviews_int': 0 })

In [None]:
df = vectorAssembler.transform(df)
df = df.select(['features', 'price_num'])
df = featureIndexer.transform(df)
df = df.select(['features_vec', 'price_num'])

In [7]:
!pip install pyspark==3.0.*

Collecting pyspark==3.0.*
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 27 kB/s s eta 0:00:01                    | 8.4 MB 12.9 MB/s eta 0:00:1614.8 MB 12.9 MB/s eta 0:00:15█▊                             | 17.7 MB 12.9 MB/s eta 0:00:15 MB 12.9 MB/s eta 0:00:15 MB 12.9 MB/s eta 0:00:14                       | 26.6 MB 12.9 MB/s eta 0:00:14    | 30.3 MB 12.9 MB/s eta 0:00:14         | 34.1 MB 12.9 MB/s eta 0:00:14                        | 36.8 MB 12.9 MB/s eta 0:00:1300:13 eta 0:00:13                | 49.5 MB 12.9 MB/s eta 0:00:12ta 0:00:12ta 0:00:12[K     |█████████▍                      | 59.7 MB 60.2 MB/s eta 0:00:03              | 63.5 MB 60.2 MB/s eta 0:00:03    |██████████▋                     | 67.9 MB 60.2 MB/s eta 0:00:03 MB 60.2 MB/s eta 0:00:03      | 75.2 MB 60.2 MB/s eta 0:00:03██▍                   | 78.8 MB 60.2 MB/s eta 0:00:03.2 MB/s eta 0:00:030.2 MB/s eta 0:00:02�██████████                  | 89.9 MB 60.2 MB/s eta 0:00: