# MLeap.deploy() Demo

To set-up running Spark 2.0 (required for this demo) from a Jupyter notebook, follow these [instructions](https://github.com/combust-ml/mleap/wiki/Setting-up-a-Spark-2.0-notebook-with-MLeap-and-Toree).

This demo will show you how to:
1. Load the research dataset from s3
2. Construct a feature transformer pipeline using commonly available transformers in Spark
3. Deploy your model to a public model server hosted on the combust.ml cloud using .deploy()

NOTE: To run the actual deploy step you have to either:
1. Get a key from combust.ml - it's easy, just email us!
2. Fire up the combust cloud server on your local machine - also easy, send us an email and we'll send you a docker image.

In [1]:
!pip install --upgrade pip > /dev/null
!pip install findspark > /dev/null
!pip install jip > /dev/null
!pip install mleap > /dev/null

In [3]:
import findspark
findspark.init()

## Background on the Dataset

The dataset used for the demo was pulled together from individual cities' data found [here](http://insideairbnb.com/get-the-data.html). We've also gone ahead and pulled the individual datasets and relevant features into this [research dataset](https://s3-us-west-2.amazonaws.com/mleap-demo/datasources/airbnb.avro) stored as avro.

### Step 0: Load libraries and data


In [4]:
from mleap import pyspark

from pyspark.ml.linalg import Vectors
from mleap.pyspark.spark_support import SimpleSparkSerializer
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

In [5]:
!curl https://s3-us-west-2.amazonaws.com/mleap-demo/datasources/airbnb.avro -o /tmp/airbnb.avro

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  132M  100  132M    0     0  44.6M      0  0:00:02  0:00:02 --:--:-- 44.6M


In [6]:
!hadoop fs -mkdir -p /datasets/airbnb
!hadoop fs -put /tmp/airbnb.avro /datasets/airbnb
!hadoop fs -ls /datasets/airbnb

Found 1 items
-rw-r--r--   2 root hadoop  139027407 2018-11-13 17:33 /datasets/airbnb/airbnb.avro


In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

In [8]:
df = spark.read.format("com.databricks.spark.avro").load("/datasets/airbnb/airbnb.avro")
df.take(1)

[Row(id=u'1949687', name=u'Delectable Victorian Flat for two', space=u'A unique Victorian development of special architectural and historic interest as it combines housing and workshops. This is a thriving artist community, friendly and close to the centre of the city.', price=80.0, bathrooms=1.0, bedrooms=1.0, room_type=u'Entire home/apt', square_feet=None, host_is_superhost=0.0, city=u' London', state=u' London', cancellation_policy=u'moderate', security_deposit=100.0, cleaning_fee=20.0, extra_people=10.0, minimum_nights=3, first_review=u'2014-01-03', instant_bookable=0.0, number_of_reviews=8, review_scores_rating=94.0, price_per_bedroom=80.0)]

In [9]:
datasetFiltered = df.filter("price >= 50 AND price <= 750 and bathrooms > 0.0")
print(df.count())
print(datasetFiltered.count())

389255
321588


### Step 1: Standardize the data for our demo 

In [10]:
datasetFiltered.registerTempTable("df")

datasetImputed = spark.sql("""
    select
        id,
        city,
        case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
            then state
            else 'Other'
        end as state,
        space,
        price,
        bathrooms,
        bedrooms,
        room_type,
        host_is_superhost,
        cancellation_policy,
        case when security_deposit is null
            then 0.0
            else security_deposit
        end as security_deposit,
        price_per_bedroom,
        case when number_of_reviews is null
            then 0.0
            else number_of_reviews
        end as number_of_reviews,
        case when extra_people is null
            then 0.0
            else extra_people
        end as extra_people,
        instant_bookable,
        case when cleaning_fee is null
            then 0.0
            else cleaning_fee
        end as cleaning_fee,
        case when review_scores_rating is null
            then 80.0
            else review_scores_rating
        end as review_scores_rating,
        case when square_feet is not null and square_feet > 100
            then square_feet
            when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
            then 350.0
            else 380 * bedrooms
        end as square_feet,
        case when bathrooms >= 2
            then 1.0
            else 0.0
        end as n_bathrooms_more_than_two
    from df
    where bedrooms is not null
""")


datasetImputed.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()

+-------+------------------+------------------+------------------+------------------+-----------------+
|summary|       square_feet|             price|          bedrooms|         bathrooms|     cleaning_fee|
+-------+------------------+------------------+------------------+------------------+-----------------+
|  count|            321588|            321588|            321588|            321588|           321588|
|   mean| 546.7441757777032|131.54961006007687|1.3352426085550455| 1.199068373198005|37.64188340360959|
| stddev|363.39839582373816| 90.10912788720013|0.8466586601060777|0.4830590051262755|42.64237791484603|
|    min|             104.0|              50.0|               0.0|               0.5|              0.0|
|    max|           32292.0|             750.0|              10.0|               8.0|            700.0|
+-------+------------------+------------------+------------------+------------------+-----------------+



### Step 1.1: Take a look at some summary statistics of the data


In [11]:
# Most popular cities (original dataset)

spark.sql("""
    select 
        state,
        count(*) as n,
        cast(avg(price) as decimal(12,2)) as avg_price,
        max(price) as max_price
    from df
    group by state
    order by count(*) desc
""").show()

+-------------+-----+---------+---------+
|        state|    n|avg_price|max_price|
+-------------+-----+---------+---------+
|           NY|48362|   146.75|    750.0|
|           CA|44716|   158.76|    750.0|
|Île-de-France|40732|   107.74|    750.0|
|       London|17532|   117.71|    750.0|
|          NSW|14416|   167.96|    750.0|
|       Berlin|13098|    81.01|    650.0|
|Noord-Holland| 8890|   128.56|    750.0|
|          VIC| 8636|   144.49|    750.0|
|North Holland| 7636|   134.60|    700.0|
|           IL| 7544|   141.85|    750.0|
|           ON| 7186|   129.05|    750.0|
|           TX| 6702|   196.59|    750.0|
|           WA| 5858|   132.48|    750.0|
|    Catalonia| 5748|   106.39|    720.0|
|           BC| 5522|   133.14|    750.0|
|           DC| 5476|   136.56|    720.0|
|       Québec| 5116|   104.98|    700.0|
|    Catalunya| 4570|    99.36|    675.0|
|       Veneto| 4486|   131.71|    700.0|
|           OR| 4330|   114.02|    700.0|
+-------------+-----+---------+---

In [12]:
# Most expensive popular cities (original dataset)

spark.sql("""
    select 
        city,
        count(*) as n,
        cast(avg(price) as decimal(12,2)) as avg_price,
        max(price) as max_price
    from df
    group by city
    order by avg(price) desc
""").filter("n>25").show()

+-------------------+---+---------+---------+
|               city|  n|avg_price|max_price|
+-------------------+---+---------+---------+
|         Palm Beach| 56|   372.11|    701.0|
|        Watsonville| 78|   307.85|    670.0|
|  Pacific Palisades| 34|   295.18|    695.0|
|             Malibu|302|   280.42|    750.0|
|      Bilgola Beach| 30|   261.13|    601.0|
|      Playa Del Rey| 34|   255.76|    599.0|
|             Avalon| 80|   255.65|    701.0|
|Sydney Olympic Park| 40|   250.55|    520.0|
|           Tamarama|148|   247.45|    750.0|
|           Capitola| 72|   246.50|    650.0|
|    Manhattan Beach|240|   234.23|    700.0|
|       Avalon Beach| 82|   232.98|    620.0|
|            Del Mar| 38|   232.84|    650.0|
|         Birchgrove| 32|   228.63|    601.0|
|          Mona Vale| 52|   227.00|    572.0|
|       Venice Beach| 62|   224.45|    699.0|
|Rancho Palos Verdes| 82|   223.68|    750.0|
|      Darling Point| 60|   221.43|    623.0|
|    North Curl Curl| 26|   220.77

### Step 2: Define continous and categorical features


In [13]:
# Step 2. Create our feature pipeline and train it on the entire dataset
continuous_features = ["bathrooms", "bedrooms", "security_deposit", "cleaning_fee", "extra_people", "number_of_reviews", "square_feet", "review_scores_rating"]

categorical_features = ["room_type", "host_is_superhost", "cancellation_policy", "instant_bookable", "state"]

all_features = continuous_features + categorical_features

In [14]:
dataset_imputed = datasetImputed.persist()

### Step 3: Split data into training and validation 

In [15]:
[training_dataset, validation_dataset] = dataset_imputed.randomSplit([0.7, 0.3])

### Step 4: Continous Feature Pipeline

In [16]:
continuous_feature_assembler= VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")

continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features",\
                                           withStd=True, withMean=False)

### Step 5: Categorical Feature Pipeline

In [17]:
categorical_feature_indexers = [StringIndexer(inputCol=x, outputCol="{}_index".format(x)) for x in categorical_features]

categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), outputCol="oh_encoder_{}".format(x.getOutputCol() )) for x in categorical_feature_indexers]


### Step 6: Assemble our features and feature pipeline


In [18]:
estimatorsLr = [continuous_feature_assembler, continuous_feature_scaler] + categorical_feature_indexers+ categorical_feature_one_hot_encoders

featurePipeline = Pipeline(stages=estimatorsLr)

sparkFeaturePipelineModel = featurePipeline.fit(dataset_imputed)

print("Finished constructing the pipeline")

Finished constructing the pipeline


### Step 7: Train a Linear Regression Model

In [19]:
# Create our linear regression model

linearRegression = LinearRegression(featuresCol="scaled_continuous_features", labelCol="price", predictionCol="price_prediction", maxIter=10, regParam=0.3, elasticNetParam=0.8)

pipeline_lr = [sparkFeaturePipelineModel] + [linearRegression]

sparkPipelineEstimatorLr = Pipeline(stages = pipeline_lr)

sparkPipelineLr = sparkPipelineEstimatorLr.fit(dataset_imputed)

print("Complete: Training Linear Regression")

Complete: Training Linear Regression


Coefficients: aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.8)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features, current: scaled_continuous_features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: price)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
maxIter: max number of iterations (>= 0). (default: 100, current: 10)
predictionCol: prediction column name. (default: prediction, current: price_prediction)
regParam: regularization parameter (>= 0). (default: 0.0, current: 0.3)
solver: The solver algorithm for optimization. Supported

### Step 7.1: Train a Logistic Regression Model

In [20]:
# Create our logistic regression model

logisticRegression = LogisticRegression(featuresCol="scaled_continuous_features", labelCol="n_bathrooms_more_than_two", predictionCol="n_bathrooms_more_than_two_prediction", maxIter=10)

pipeline_log_r = [sparkFeaturePipelineModel] + [logisticRegression]

sparkPipelineEstimatorLogr = Pipeline(stages = pipeline_log_r)

sparkPipelineLogr = sparkPipelineEstimatorLogr.fit(dataset_imputed)

print("Complete: Training Logistic Regression")

Complete: Training Logistic Regression


In [46]:
dataset_imputed.head()

Row(id=u'1949687', city=u' London', state=u'Other', space=u'A unique Victorian development of special architectural and historic interest as it combines housing and workshops. This is a thriving artist community, friendly and close to the centre of the city.', price=80.0, bathrooms=1.0, bedrooms=1.0, room_type=u'Entire home/apt', host_is_superhost=0.0, cancellation_policy=u'moderate', security_deposit=100.0, price_per_bedroom=80.0, number_of_reviews=Decimal('8.0'), extra_people=10.0, instant_bookable=0.0, cleaning_fee=20.0, review_scores_rating=94.0, square_feet=380.0, n_bathrooms_more_than_two=Decimal('0.0'))

In [51]:
dataset_imputed.head()

Row(id=u'1949687', city=u' London', state=u'Other', space=u'A unique Victorian development of special architectural and historic interest as it combines housing and workshops. This is a thriving artist community, friendly and close to the centre of the city.', price=80.0, bathrooms=1.0, bedrooms=1.0, room_type=u'Entire home/apt', host_is_superhost=0.0, cancellation_policy=u'moderate', security_deposit=100.0, price_per_bedroom=80.0, number_of_reviews=Decimal('8.0'), extra_people=10.0, instant_bookable=0.0, cleaning_fee=20.0, review_scores_rating=94.0, square_feet=380.0, n_bathrooms_more_than_two=Decimal('0.0'))

In [50]:
sparkPipelineLogr.transform(dataset_imputed).head()

Row(id=u'1949687', city=u' London', state=u'Other', space=u'A unique Victorian development of special architectural and historic interest as it combines housing and workshops. This is a thriving artist community, friendly and close to the centre of the city.', price=80.0, bathrooms=1.0, bedrooms=1.0, room_type=u'Entire home/apt', host_is_superhost=0.0, cancellation_policy=u'moderate', security_deposit=100.0, price_per_bedroom=80.0, number_of_reviews=Decimal('8.0'), extra_people=10.0, instant_bookable=0.0, cleaning_fee=20.0, review_scores_rating=94.0, square_feet=380.0, n_bathrooms_more_than_two=Decimal('0.0'), unscaled_continuous_features=DenseVector([1.0, 1.0, 100.0, 20.0, 10.0, 8.0, 380.0, 94.0]), scaled_continuous_features=DenseVector([2.0701, 1.1811, 0.409, 0.469, 0.5352, 0.2859, 1.0457, 10.9593]), room_type_index=0.0, host_is_superhost_index=0.0, cancellation_policy_index=1.0, instant_bookable_index=0.0, state_index=0.0, oh_encoder_room_type_index=SparseVector(2, {0: 1.0}), oh_enc

### Step 8: Serialize the model to Bundle.ML

In [23]:
sparkPipelineLr.serializeToBundle("jar:file:/tmp/pyspark.lr.zip", sparkPipelineLr.transform(dataset_imputed))
sparkPipelineLogr.serializeToBundle("jar:file:/tmp/pyspark.logr.zip", dataset=sparkPipelineLogr.transform(dataset_imputed))

### Step 9 (Optional): Deserialize from Bundle.ML

In [24]:
sparkPipelineLr = PipelineModel.deserializeFromBundle("jar:file:/tmp/pyspark.lr.zip")

### Step 10 (Optional): Deploy to ModelServer

Python bindings for .deploy() are coming soon. For now, you may have to write a few lines of scala - demo for that can be found [here](https://github.com/combust-ml/mleap-demo/blob/master/lending-club/notebooks/airbnb-price-regression.ipynb).