# Price Prediction model using PySpark

Assuming I am working with Terabytes of Data, Pandas cannot handle such a large dataset due to lack of multiprocessing support, memory and speed.

It is a tough decision to choose in between Dask and PySpark. However, In addition to other differences, PySpark is an all-in-one ecosystem which can handle the aggressive requirements with its MLlib, Structured data processing API.

In [1]:
from pyspark.context import SparkContext
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import (OneHotEncoderEstimator, StringIndexer,
                                VectorAssembler)
from pyspark.ml.linalg import Vectors
from pyspark.ml.pipeline import Estimator, Transformer
from pyspark.ml.regression import GBTRegressionModel, GBTRegressor
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, skewness
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator

print ("Successfully imported Libraries")

Successfully imported Libraries


### Creating Spark Session

In [2]:
spark = SparkSession.builder.appName('pricing_model').getOrCreate()
spark

### Import Data

In a situation where we are working with big data, the connection will be to S3 bucket or cloud storage, depending on the storage infrasturucture we want decide to use.

In this case, i will read the data from my local

In [3]:
DATA_PATH = 'listings_processed.parquet'

def import_data(DATA_PATH):
    raw_data = spark.read.parquet(DATA_PATH)
    return raw_data
raw_data = import_data(DATA_PATH)

### Data Exploration

#### skewness
In statistics, skewness is a measure of the asymmetry of the probability distribution of a real-valued random variable about its mean. in other words, it is the degree of distortion from the normal distribution. The skewness value can be positive or negative, or undefined. Unsurprisingly, our data is positvely skewed 


In [4]:
raw_data.select(skewness('price')).show()

+------------------+
|   skewness(price)|
+------------------+
|23.313894822887782|
+------------------+



#### Summary statistics
This is just to help us get the distribution and iunderstanding of our target variable

In [5]:
raw_data.select('price').describe().show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|             20114|
|   mean|146.12210400715918|
| stddev|120.79440354929586|
|    min|               0.0|
|    max|            8000.0|
+-------+------------------+



In [6]:
raw_data.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()

+-------+-----------------+------------------+------------------+------------------+------------------+
|summary|      square_feet|             price|          bedrooms|         bathrooms|      cleaning_fee|
+-------+-----------------+------------------+------------------+------------------+------------------+
|  count|              423|             20114|             20107|             20105|             16490|
|   mean| 514.113475177305|146.12210400715918|1.4492962649823444|1.1294951504600845| 38.91455427531837|
| stddev|531.6553745894398|120.79440354929586|0.8953236140779876| 0.365187604471003|23.016842492553195|
|    min|                0|               0.0|                 0|               0.0|               0.0|
|    max|             3229|            8000.0|                12|              15.0|             531.0|
+-------+-----------------+------------------+------------------+------------------+------------------+



### Data Cleaning

#### Removing Outliers

97% of our data falls between price range of 50 and 750, in order to get unbiased model, I will remove outliers

In [7]:
dataset_filtered = raw_data.filter("price >= 50 AND price <= 750 and bathrooms > 0.0")

### Dealing with Missing Values

Here, i did the following

- Convert all the boolean variable to binary response (0,1)
- Fill the missing values in the following columns with Zero
    - security_deposit
    - extra_people
    - cleaning_fee
    - All the review score columns
    The assumption here is that: When these variable are missing, they are not applicable to the landloard

- Calculation for Square feet missing values: The Assumption here is that When the square feet is less than or equals 100 and bedroom is 0, the square feet will be 350 and else the square feet will be 380 multiply by the number of bedrooms


In [8]:
dataset_filtered.registerTempTable("data")

data_sql = spark.sql("""
    select
        host_id,
        price,
        bathrooms,
        bedrooms,
        room_type,
        property_type,
        case when host_is_superhost = True
            then 1.0
            else 0.0
        end as host_is_superhost,
        accommodates,
        cancellation_policy,
        minimum_nights,
        maximum_nights,
        availability_30,
        availability_60,
        availability_90,
        availability_365,
        case when security_deposit is null
            then 0.0
            else security_deposit
        end as security_deposit,
        case when number_of_reviews is null
            then 0.0
            else number_of_reviews
        end as number_of_reviews,
        case when extra_people is null
            then 0.0
            else extra_people
        end as extra_people,
        case when instant_bookable = True
            then 1.0
            else 0.0
        end as instant_bookable,
        case when cleaning_fee is null
            then 0.0
            else cleaning_fee
        end as cleaning_fee,
        case when review_scores_rating is null
            then 0.0
            else review_scores_rating
        end as review_scores_rating,
        case when review_scores_accuracy is null
            then 0.0
            else review_scores_accuracy
        end as review_scores_accuracy,
        case when review_scores_cleanliness is null
            then 0.0
            else review_scores_cleanliness
        end as review_scores_cleanliness,
        case when review_scores_checkin is null
            then 0.0
            else review_scores_checkin
        end as review_scores_checkin,
        case when review_scores_communication is null
            then 0.0
            else review_scores_communication
        end as review_scores_communication,
        case when review_scores_location is null
            then 0.0
            else review_scores_location
        end as review_scores_location,
        case when review_scores_value is null
            then 0.0
            else review_scores_value
        end as review_scores_value,
        case when square_feet is not null and square_feet > 100
            then square_feet
            when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
            then 350.0
            else 380 * bedrooms
        end as square_feet,
        case when bathrooms >= 2
            then 1.0
            else 0.0
        end as n_bathrooms_more_than_two,
        case when amenity_wifi = True
            then 1.0
            else 0.0
        end as amenity_wifi,
        case when amenity_heating = True
            then 1.0
            else 0.0
        end as amenity_heating,
        case when amenity_essentials = True
            then 1.0
            else 0.0
        end as amenity_essentials,
        case when amenity_kitchen = True
            then 1.0
            else 0.0
        end as amenity_kitchen,
        case when amenity_tv = True
            then 1.0
            else 0.0
        end as amenity_tv,
        case when amenity_smoke_detector = True
            then 1.0
            else 0.0
        end as amenity_smoke_detector,
        case when amenity_washer = True
            then 1.0
            else 0.0
        end as amenity_washer,
        case when amenity_hangers = True
            then 1.0
            else 0.0
        end as amenity_hangers,
        case when amenity_laptop_friendly_workspace = True
            then 1.0
            else 0.0
        end as amenity_laptop_friendly_workspace,
        case when amenity_iron = True
            then 1.0
            else 0.0
        end as amenity_iron,
        case when amenity_shampoo = True
            then 1.0
            else 0.0
        end as amenity_shampoo,
        case when amenity_hair_dryer = True
            then 1.0
            else 0.0
        end as amenity_hair_dryer,
        case when amenity_family_kid_friendly = True
            then 1.0
            else 0.0
        end as amenity_family_kid_friendly,
        case when amenity_dryer = True
            then 1.0
            else 0.0
        end as amenity_dryer,
        case when amenity_fire_extinguisher = True
            then 1.0
            else 0.0
        end as amenity_fire_extinguisher,
        case when amenity_hot_water = True
            then 1.0
            else 0.0
        end as amenity_hot_water,
        case when amenity_internet = True
            then 1.0
            else 0.0
        end as amenity_internet,
        case when amenity_cable_tv = True
            then 1.0
            else 0.0
        end as amenity_cable_tv,
        case when amenity_carbon_monoxide_detector = True
            then 1.0
            else 0.0
        end as amenity_carbon_monoxide_detector,
        case when amenity_first_aid_kit = True
            then 1.0
            else 0.0
        end as amenity_first_aid_kit,
        case when amenity_host_greets_you = True
            then 1.0
            else 0.0
        end as amenity_host_greets_you,
        case when amenity_translation_missing_en_hosting_amenity_50 = True
            then 1.0
            else 0.0
        end as amenity_translation_missing_en_hosting_amenity_50,
        case when amenity_private_entrance = True
            then 1.0
            else 0.0
        end as amenity_private_entrance,
        case when amenity_bed_linens = True
            then 1.0
            else 0.0
        end as amenity_bed_linens,
        case when amenity_refrigerator = True
            then 1.0
            else 0.0
        end as amenity_refrigerator
    from data
    where bedrooms is not null
""")

In [9]:
data_sql = data_sql.persist()

In [10]:
processed_data = data_sql.na.drop()

To make prediction, diffrent datatypes require different preprocessing techniques, so here, i get the column for:
- Categorical Variables
- Numberical Variable which consist of Double and Decimal datatypes


In [11]:
cat_cols = [f.name for f in processed_data.schema.fields if isinstance(f.dataType, StringType)]
num_cols = [f.name for f in processed_data.schema.fields if isinstance(f.dataType, IntegerType)]
decimal_cols = [f.name for f in processed_data.schema.fields if isinstance(f.dataType, DecimalType)]
double_cols = [f.name for f in processed_data.schema.fields if isinstance(f.dataType, DoubleType)]

In [12]:
num_features = num_cols + decimal_cols + double_cols

### Data Preprocessing for Pipeline

Here, i apply the following methods
- String Indexer - This is used in Machine Learning algorithm to identify column as categorical variable, it converts the categorical column to numeric data and still keeping the categorical context.
- One Hot Encodeing - This is a representation of categorical variables as binary vectors. The categorical values be mapped to integer values. 
- Vector Assembling - The last step in the Pipeline, this is to combine all of the columns containing our features into a single column. This has to be done before modeling can take place because every Spark modeling routine expects the data to be in this form. You can do this by storing each of the values from a column as an entry in a vector. Then, from the model's point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to.

In [13]:
stages = []

for x in cat_cols:
    cat_string_indexer = StringIndexer(inputCol = x, outputCol = x + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[cat_string_indexer.getOutputCol()], outputCols=[x + "encode"])
    stages += [cat_string_indexer, encoder]

assembler_inputs = [c + "encode" for c in cat_cols] + num_features
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages += [assembler]                              

### The Spark Pipeline

The Spark Pipeline is a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. 

My pipeline is in the following sequence StringIndexer > OneHotEncoderEstimator > VectorAssember

In [14]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(processed_data)
df = pipelineModel.transform(processed_data)

#### Train and Validation set split

In [15]:
[training_data, validation_data] = df.randomSplit([0.7, 0.3])

## Modelling 

### Training the model

In [16]:
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'price', maxIter=10)
model = gbt.fit(training_data)

### Get Prediction

In [17]:
gbt_predictions = model.transform(validation_data)
gbt_predictions.select('prediction', 'price', 'features').show(30)

+------------------+-----+--------------------+
|        prediction|price|            features|
+------------------+-----+--------------------+
| 78.12178682457298| 78.0|(87,[0,2,33,36,37...|
| 98.58590164618633|100.0|(87,[0,2,33,36,37...|
| 125.6894673684008|125.0|(87,[0,2,32,36,37...|
| 78.28748129822688| 80.0|(87,[1,4,32,36,37...|
| 74.41588097277969| 75.0|(87,[0,2,33,36,37...|
|  97.2833287091994| 95.0|(87,[0,2,32,36,37...|
| 98.58590164618633|100.0|(87,[0,2,32,36,37...|
| 98.58590164618633|100.0|(87,[0,2,33,36,37...|
|164.39502869446346|160.0|(87,[0,8,33,36,37...|
|118.88180112091761|120.0|(87,[1,2,32,36,37...|
|227.40137868950112|225.0|(87,[0,2,33,36,37...|
| 64.41760033855675| 65.0|(87,[1,2,33,36,37...|
|130.34811644810253|130.0|(87,[0,2,32,36,37...|
|118.88180112091761|120.0|(87,[0,2,32,36,37...|
| 87.14351857949916| 85.0|(87,[0,2,33,36,37...|
|150.79300417376956|149.0|(87,[0,2,33,36,37...|
|194.87018811810051|199.0|(87,[0,2,32,36,37...|
|118.88180112091761|115.0|(87,[0,2,33,36

In [18]:
#model.save("model") 

### Evaluation of the Model

In [19]:
evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")

In [20]:
rmse = evaluator.evaluate(gbt_predictions)

In [21]:
print("Root Mean Squared Error (RMSE) on validation data = %g" % rmse)

Root Mean Squared Error (RMSE) on validation data = 13.8992


After tunning the parameters, the best RMSE i got was 13.8992

Check the readme for the steps in building the API and server for scalability and reliability