### Step 0: Load libraries and data


In [22]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression

In [23]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [24]:
df = spark.read.format("csv") \
  .option("inferSchema", "true").option("header", "true") \
  .load("s3a://datapalooza/airbnb/airbnb.csv.bz2")

print(df.take(10))

[Row(id=5731498, name='A 2-bdrm house in Plaka of Athens', space='Ideally located a unique house in a very peaceful neighborhood of Plaka, near Acropolis. It is a traditional house in the heart of the historical center of Athens, in Plaka. The kitchen is fully equipped with oven, fridge with freezer. Cutlery, dishes and pans, kettle, espresso coffee maker (espresso capsules are provided), toaster. There is also a vacuum cleaner and a laundry machine. One big closet will make your stay more comfortable. Bed linen, towels and bath amenities are provided. Moreover, the apartment is fully airconditioned. The apartment is very close to a greek traditional tavernas, a pharmacy, banks and public transport.  Airport or any other transport is available upon demand at an additional but very reasonable cost. ', price='120.0', bathrooms='1.0', bedrooms='2.0', room_type='Entire home/apt', square_feet=None, host_is_super_host='0.0', city='Athina', state=None, cancellation_policy='moderate', security

In [25]:
from pyspark.sql.types import FloatType

df = df.withColumn("priceAsDouble", df["price"].cast("double"))

datasetFiltered = df.filter("priceAsDouble >= 50 AND priceAsDouble <= 750 and bathrooms > 0.0")
print(df.count())
print(datasetFiltered.count())

198454
152219


### Step 1: Standardize the data for our demo 

In [26]:
datasetFiltered.registerTempTable("df")

datasetImputed = spark.sql("""
    select
        id,
        city,
        case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
            then state
            else 'Other'
        end as state,
        space,
        priceAsDouble,
        bathrooms,
        bedrooms,
        room_type,
        host_is_super_host,
        cancellation_policy,
        case when security_deposit is null
            then 0.0
            else security_deposit
        end as security_deposit,
        price_per_bedroom,
        case when number_of_reviews is null
            then 0.0
            else number_of_reviews
        end as number_of_reviews,
        case when extra_people is null
            then 0.0
            else extra_people
        end as extra_people,
        instant_bookable,
        case when cleaning_fee is null
            then 0.0
            else cleaning_fee
        end as cleaning_fee,
        case when review_scores_rating is null
            then 80.0
            else review_scores_rating
        end as review_scores_rating,
        case when square_feet is not null and square_feet > 100
            then square_feet
            when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
            then 350.0
            else 380 * bedrooms
        end as square_feet
    from df
    where bedrooms is not null
""")


datasetImputed.select("square_feet", "priceAsDouble", "bedrooms", "bathrooms", "cleaning_fee").describe().show()

+-------+------------------+
|summary|     priceAsDouble|
+-------+------------------+
|  count|            151770|
|   mean|130.99831982605258|
| stddev| 89.57902021660226|
|    min|              50.0|
|    max|             750.0|
+-------+------------------+



### Step 1.1: Take a look at some summary statistics of the data


In [27]:
# Most popular cities (original dataset)

spark.sql("""
    select 
        state,
        count(*) as n,
        avg(priceAsDouble) as avg_price,
        max(priceAsDouble) as max_price
    from df
    group by state
    order by count(*) desc
""").show()

+-------------+-----+------------------+---------+
|        state|    n|         avg_price|max_price|
+-------------+-----+------------------+---------+
|           NY|22955| 146.0189065563058|    750.0|
|           CA|20766|157.32211306944043|    750.0|
|Île-de-France|18858|107.44060876020787|    750.0|
|         null| 9301|116.13869476400387|    750.0|
|          NSW| 7214|167.28306071527587|    750.0|
|       Berlin| 6056| 80.59081902245707|    650.0|
|Noord-Holland| 4311|127.82161911389468|    750.0|
|          VIC| 4286| 143.5459636024265|    750.0|
|           IL| 3563|141.61829918607916|    690.0|
|North Holland| 3558|   133.59724564362|    700.0|
|           ON| 3380|128.96065088757396|    750.0|
|           TX| 3108| 195.0749678249678|    750.0|
|           WA| 2704|131.45007396449705|    750.0|
|           DC| 2607|136.58841580360567|    720.0|
|           BC| 2577| 132.9813736903376|    750.0|
|    Catalonia| 2551|107.19090552724421|    720.0|
|       Québec| 2505|105.414770

In [28]:
# Most expensive popular cities (original dataset)

spark.sql("""
    select 
        city,
        count(*) as n,
        avg(priceAsDouble) as avg_price,
        max(priceAsDouble) as max_price
    from df
    group by city
    order by avg(priceAsDouble) desc
""").filter("n>25").show()

+-------------------+---+------------------+---------+
|               city|  n|         avg_price|max_price|
+-------------------+---+------------------+---------+
|         Palm Beach| 26| 348.7692307692308|    701.0|
|        Watsonville| 38| 313.3157894736842|    670.0|
|             Malibu|136| 280.9852941176471|    750.0|
|             Avalon| 39| 259.5128205128205|    701.0|
|           Capitola| 35|             246.4|    650.0|
|           Tamarama| 72|             238.5|    750.0|
|    Manhattan Beach|109|232.10091743119267|    700.0|
|Rancho Palos Verdes| 39|230.02564102564102|    750.0|
|       Avalon Beach| 38|229.60526315789474|    620.0|
|            Newport| 52| 223.8653846153846|    750.0|
|      Darling Point| 29|221.51724137931035|    623.0|
|        Middle Park| 34| 212.7941176470588|    671.0|
|            Balmain| 55|212.56363636363636|    712.0|
|        North Bondi|180|206.68333333333334|    750.0|
|             Bronte|144|203.70833333333334|    750.0|
|        Q

### Step 2: Define continous and categorical features


In [29]:
# Step 2. Create our feature pipeline and train it on the entire dataset
continuous_features = ["bathrooms", "bedrooms", "security_deposit", "cleaning_fee", "extra_people", "number_of_reviews", "square_feet", "review_scores_rating"]

categorical_features = ["room_type", "host_is_super_host", "cancellation_policy", "instant_bookable", "state"]

#all_features = continuous_features + categorical_features

In [30]:
dataset_imputed = datasetImputed.persist()

### Step 3: Split data into training and validation 

In [31]:
[training_dataset, validation_dataset] = dataset_imputed.randomSplit([0.7, 0.3])

### Step 4: Continous Feature Pipeline

In [32]:
#continuous_feature_assembler = VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")

#continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features", \
#                                           withStd=True, withMean=False)

### Step 5: Categorical Feature Pipeline

In [33]:
categorical_feature_indexers = [StringIndexer(inputCol=x, outputCol="{}_index".format(x)) for x in categorical_features]

categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), outputCol="oh_encoder_{}".format(x.getOutputCol() )) for x in categorical_feature_indexers]

### Step 6: Assemble our features and feature pipeline


Note that we have slightly different feature pipelines for LR and RF. This is done purely for demonstration purposes, whereas your actual models should scale continuous features for the RF model as well.

In [34]:
featureColsLr = [x.getOutputCol() for x in categorical_feature_one_hot_encoders]
#featureColsLr.append("scaled_continuous_features")

featureAssemblerLr = VectorAssembler(inputCols=featureColsLr, outputCol="features_lr")

In [35]:
#[continuous_feature_assembler, continuous_feature_scaler] 
estimatorsLr = \
  categorical_feature_indexers + categorical_feature_one_hot_encoders + [featureAssemblerLr]

featurePipeline = Pipeline(stages=estimatorsLr)

sparkFeaturePipelineModel = featurePipeline.fit(dataset_imputed)

print(sparkFeaturePipelineModel)

PipelineModel_42b186a54e09f3ac6378


### Step 7: Train a Linear Regression Model

In [39]:
# Step 3.2 Create our linear regression model

linearRegression = LinearRegression(featuresCol="features_lr", labelCol="priceAsDouble", predictionCol="price_prediction", maxIter=10, regParam=0.3, elasticNetParam=0.8)

#linearRegression = LinearRegression(featuresCol="scaled_continuous_features", labelCol="price", predictionCol="price_prediction", maxIter=10, regParam=0.3, elasticNetParam=0.8)

pipeline_lr = [featurePipeline, linearRegression]

sparkPipelineEstimatorLr = Pipeline(stages=pipeline_lr)

sparkPipelineLr = sparkPipelineEstimatorLr.fit(dataset_imputed)

In [42]:
print(sparkPipelineLr)

PipelineModel_4056bec6f32e66152b53


In [43]:
from jpmml import toPMMLBytes

pmmlBytes = toPMMLBytes(spark, dataset_imputed, sparkPipelineLr)

pmmlBytes.decode("utf-8")

IllegalArgumentException: 'Transformer class org.apache.spark.ml.PipelineModel is not supported'