## Step 0: Setup Spark

### Configure Spark for Your Notebook
* You may need to Reconnect and/or Restart the Kernel to pick up changes.
* This examples uses the local Spark Master `--master local[1]`
* In production, you would use the PipelineIO Spark Master `--master spark://apachespark-master-2-1-0:7077`

In [None]:
import os

master = '--master local[1]'
#master = '--master spark://apachespark-master-2-1-0:7077'
conf = '--conf spark.cores.max=1 --conf spark.executor.memory=512m'
#packages = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1'
packages = ''
jars = '--jars lib/jpmml-sparkml-package-1.0-SNAPSHOT.jar'
py_files = '--py-files lib/jpmml.py,lib/pio_bundler.py'

os.environ['PYSPARK_SUBMIT_ARGS'] = master \
  + ' ' + conf \
  + ' ' + packages \
  + ' ' + jars \
  + ' ' + py_files \
  + ' ' + 'pyspark-shell'

print(os.environ['PYSPARK_SUBMIT_ARGS'])

### Import Spark Libraries

In [2]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression

### Create Spark Session
This may take a minute or two.  Please be patient.

In [3]:
from pyspark.sql import SparkSession

spark_session = SparkSession.builder.getOrCreate()

## Step 1: Load Training Data into Spark Cluster

### Read Data from Public S3 Bucket
* AWS credentials are not needed.
* We're asking Spark to infer the schema
* The data has a header
* Using `bzip2` because it's a splittable compression file format

In [4]:
df = spark_session.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("airbnb.csv.bz2")
#    .load("s3a://datapalooza/airbnb/airbnb.csv.bz2")


print(df.head())

Row(id=5731498, name='A 2-bdrm house in Plaka of Athens', space='Ideally located a unique house in a very peaceful neighborhood of Plaka, near Acropolis. It is a traditional house in the heart of the historical center of Athens, in Plaka. The kitchen is fully equipped with oven, fridge with freezer. Cutlery, dishes and pans, kettle, espresso coffee maker (espresso capsules are provided), toaster. There is also a vacuum cleaner and a laundry machine. One big closet will make your stay more comfortable. Bed linen, towels and bath amenities are provided. Moreover, the apartment is fully airconditioned. The apartment is very close to a greek traditional tavernas, a pharmacy, banks and public transport.  Airport or any other transport is available upon demand at an additional but very reasonable cost. ', price='120.0', bathrooms='1.0', bedrooms='2.0', room_type='Entire home/apt', square_feet=None, host_is_super_host='0.0', city='Athina', state=None, cancellation_policy='moderate', security_

In [5]:
print(df.count())

198576


### Clean, Filter, and Summarize the Data

In [6]:
df_filtered = df.filter("price >= 50 AND price <= 750 AND bathrooms > 0.0 AND bedrooms is not null")

df_filtered.registerTempTable("df_filtered")

df_final = spark_session.sql("""
    select
        id,
        city,
        case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
            then state
            else 'Other'
        end as state,
        space,
        cast(price as double) as price,
        cast(bathrooms as double) as bathrooms,
        cast(bedrooms as double) as bedrooms,
        room_type,
        host_is_super_host,
        cancellation_policy,
        cast(case when security_deposit is null
            then 0.0
            else security_deposit
        end as double) as security_deposit,
        price_per_bedroom,
        cast(case when number_of_reviews is null
            then 0.0
            else number_of_reviews
        end as double) as number_of_reviews,
        cast(case when extra_people is null
            then 0.0
            else extra_people
        end as double) as extra_people,
        instant_bookable,
        cast(case when cleaning_fee is null
            then 0.0
            else cleaning_fee
        end as double) as cleaning_fee,
        cast(case when review_scores_rating is null
            then 80.0
            else review_scores_rating
        end as double) as review_scores_rating,
        cast(case when square_feet is not null and square_feet > 100
            then square_feet
            when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
            then 350.0
            else 380 * bedrooms
        end as double) as square_feet
    from df_filtered
""").persist()

df_final.registerTempTable("df_final")

df_final.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()

+-------+-----------------+------------------+------------------+------------------+-----------------+
|summary|      square_feet|             price|          bedrooms|         bathrooms|     cleaning_fee|
+-------+-----------------+------------------+------------------+------------------+-----------------+
|  count|           151864|            151864|            151864|            151864|           151864|
|   mean|545.5920823895063|131.00769109202972|1.3336998893747036|1.1988786019069695|37.25118527103198|
| stddev|363.2346181084825| 89.59372969879449|0.8460907193971746|0.4836515839573332|  42.625502170779|
|    min|            104.0|              50.0|               0.0|               0.5|              0.0|
|    max|          32292.0|             750.0|              10.0|               8.0|            700.0|
+-------+-----------------+------------------+------------------+------------------+-----------------+



In [7]:
print(df_final.count())

151864


In [8]:
print(df_final.schema)

StructType(List(StructField(id,IntegerType,true),StructField(city,StringType,true),StructField(state,StringType,true),StructField(space,StringType,true),StructField(price,DoubleType,true),StructField(bathrooms,DoubleType,true),StructField(bedrooms,DoubleType,true),StructField(room_type,StringType,true),StructField(host_is_super_host,StringType,true),StructField(cancellation_policy,StringType,true),StructField(security_deposit,DoubleType,true),StructField(price_per_bedroom,StringType,true),StructField(number_of_reviews,DoubleType,true),StructField(extra_people,DoubleType,true),StructField(instant_bookable,StringType,true),StructField(cleaning_fee,DoubleType,true),StructField(review_scores_rating,DoubleType,true),StructField(square_feet,DoubleType,true)))


In [9]:
# Most popular cities

spark_session.sql("""
    select 
        state,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by state
    order by count(*) desc
""").show()

+------+-----+------------------+---------+
| state|   ct|         avg_price|max_price|
+------+-----+------------------+---------+
| Other|87467|122.00503046863389|    750.0|
|    NY|22899| 145.9446264028997|    750.0|
|    CA|20750|157.40173493975902|    750.0|
|Berlin| 6034|  80.6433543254889|    650.0|
|    IL| 3552|141.46903153153153|    690.0|
|    TX| 3108|195.25611325611325|    750.0|
|    WA| 2700| 131.4962962962963|    750.0|
|    DC| 2590|136.64015444015445|    720.0|
|    OR| 1954|114.02661207778915|    700.0|
|London|  810|108.84444444444445|    600.0|
+------+-----+------------------+---------+



In [10]:
# Most expensive popular cities

spark_session.sql("""
    select 
        city,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by city
    order by avg(price) desc
""").filter("ct > 25").show()

+-------------------+---+------------------+---------+
|               city| ct|         avg_price|max_price|
+-------------------+---+------------------+---------+
|         Palm Beach| 26| 348.7692307692308|    701.0|
|        Watsonville| 38| 313.3157894736842|    670.0|
|             Malibu|136| 280.9852941176471|    750.0|
|             Avalon| 38|262.42105263157896|    701.0|
|           Capitola| 35|             246.4|    650.0|
|           Tamarama| 72|             238.5|    750.0|
|    Manhattan Beach|109|232.10091743119267|    700.0|
|Rancho Palos Verdes| 39|230.02564102564102|    750.0|
|       Avalon Beach| 38|229.60526315789474|    620.0|
|            Newport| 52| 223.8653846153846|    750.0|
|      Darling Point| 29|221.51724137931035|    623.0|
|        Middle Park| 34| 212.7941176470588|    671.0|
|            Balmain| 55|212.56363636363636|    712.0|
|        North Bondi|180|206.68333333333334|    750.0|
|             Bronte|144|203.70833333333334|    750.0|
|        Q

## Step 2: Feature Engineering

In [11]:
continuous_features = ["bathrooms", \
                       "bedrooms", \
                       "security_deposit", \
                       "cleaning_fee", \
                       "extra_people", \
                       "number_of_reviews", \
                       "square_feet", \
                       "review_scores_rating"]

categorical_features = ["room_type", \
                        "host_is_super_host", \
                        "cancellation_policy", \
                        "instant_bookable", \
                        "state"]

### Continuous Features

In [12]:
continuous_feature_assembler = VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")

continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features", \
                                           withStd=True, withMean=False)

### Categorical Features

In [13]:
categorical_feature_indexers = [StringIndexer(inputCol=x, \
                                              outputCol="{}_index".format(x)) \
                                for x in categorical_features]

categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), \
                                                      outputCol="oh_encoder_{}".format(x.getOutputCol() )) \
                                        for x in categorical_feature_indexers]

### Assemble Continuous and Categorical Features

In [14]:
feature_cols_lr = [x.getOutputCol() \
                   for x in categorical_feature_one_hot_encoders]
feature_cols_lr.append("scaled_continuous_features")

feature_assembler_lr = VectorAssembler(inputCols=feature_cols_lr, \
                                       outputCol="features_lr")

## Step 3: Train the Pipeline

In [15]:
linear_regression = LinearRegression(featuresCol="features_lr", \
                                     labelCol="price", \
                                     predictionCol="price_prediction", \
                                     maxIter=10, \
                                     regParam=0.3, \
                                     elasticNetParam=0.8)

estimators_lr = \
  [continuous_feature_assembler, continuous_feature_scaler] \
  + categorical_feature_indexers + categorical_feature_one_hot_encoders \
  + [feature_assembler_lr] + [linear_regression]

pipeline = Pipeline(stages=estimators_lr)

pipeline_model = pipeline.fit(df_final)

print(pipeline_model)

PipelineModel_4c6487842fe5f0df9f58


In [16]:
pipeline_model.write().overwrite().save("airbnb.parquet")

In [17]:
type(df_final.schema)

pyspark.sql.types.StructType

In [18]:
import dill as pickle

In [19]:
with open('airbnb.schema', 'wb') as pkl_file:
    pickle.dump(df_final.schema, pkl_file)

In [20]:
!cat airbnb.schema

�cpyspark.sql.types
StructType
q )�q}q(X   fieldsq]q(cpyspark.sql.types
StructField
q)�q}q(X   nullableq�X   metadataq	}q
X   dataTypeqcpyspark.sql.types
IntegerType
q)�qX   nameqX   idqubh)�q}q(h�h	}qhcpyspark.sql.types
StringType
q)�qhX   cityqubh)�q}q(h�h	}qhh)�qhX   statequbh)�q}q(h�h	}qhh)�qhX   spacequbh)�q }q!(h�h	}q"hcpyspark.sql.types
DoubleType
q#)�q$hX   priceq%ubh)�q&}q'(h�h	}q(hh#)�q)hX	   bathroomsq*ubh)�q+}q,(h�h	}q-hh#)�q.hX   bedroomsq/ubh)�q0}q1(h�h	}q2hh)�q3hX	   room_typeq4ubh)�q5}q6(h�h	}q7hh)�q8hX   host_is_super_hostq9ubh)�q:}q;(h�h	}q<hh)�q=hX   cancellation_policyq>ubh)�q?}q@(h�h	}qAhh#)�qBhX   security_depositqCubh)�qD}qE(h�h	}qFhh)�qGhX   price_per_bedroomqHubh)�qI}qJ(h�h	}qKhh#)�qLhX   number_of_reviewsqMubh)�qN}qO(h�h	}qPhh#)�qQhX   extra_peopleqRubh)�qS}qT(h�h	}qUhh)�qVhX   instant_bookableqWubh)�qX}qY(h�h	}qZhh#)�q[hX   cleaning_f

## SERVE FROM HERE

In [27]:
restored_pipeline_model = PipelineModel.read().load("airbnb.parquet")

In [30]:
print(restored_pipeline_model.schema)

AttributeError: 'PipelineModel' object has no attribute 'schema'

## Step 4:  Export the Pipeline Model

In [33]:
df_final.schema.json()

'{"fields":[{"metadata":{},"name":"id","nullable":true,"type":"integer"},{"metadata":{},"name":"city","nullable":true,"type":"string"},{"metadata":{},"name":"state","nullable":true,"type":"string"},{"metadata":{},"name":"space","nullable":true,"type":"string"},{"metadata":{},"name":"price","nullable":true,"type":"double"},{"metadata":{},"name":"bathrooms","nullable":true,"type":"double"},{"metadata":{},"name":"bedrooms","nullable":true,"type":"double"},{"metadata":{},"name":"room_type","nullable":true,"type":"string"},{"metadata":{},"name":"host_is_super_host","nullable":true,"type":"string"},{"metadata":{},"name":"cancellation_policy","nullable":true,"type":"string"},{"metadata":{},"name":"security_deposit","nullable":true,"type":"double"},{"metadata":{},"name":"price_per_bedroom","nullable":true,"type":"string"},{"metadata":{},"name":"number_of_reviews","nullable":true,"type":"double"},{"metadata":{},"name":"extra_people","nullable":true,"type":"double"},{"metadata":{},"name":"instan

In [40]:
from pyspark.sql.types import _parse_datatype_json_string
schema = _parse_datatype_json_string(df_final.schema.json())
print(schema)

StructType(List(StructField(id,IntegerType,true),StructField(city,StringType,true),StructField(state,StringType,true),StructField(space,StringType,true),StructField(price,DoubleType,true),StructField(bathrooms,DoubleType,true),StructField(bedrooms,DoubleType,true),StructField(room_type,StringType,true),StructField(host_is_super_host,StringType,true),StructField(cancellation_policy,StringType,true),StructField(security_deposit,DoubleType,true),StructField(price_per_bedroom,StringType,true),StructField(number_of_reviews,DoubleType,true),StructField(extra_people,DoubleType,true),StructField(instant_bookable,StringType,true),StructField(cleaning_fee,DoubleType,true),StructField(review_scores_rating,DoubleType,true),StructField(square_feet,DoubleType,true)))


In [41]:
from pyspark.ml.common import _py2java

schema_as_java = _py2java(spark_session, schema)
print(schema_as_java)

{'_needSerializeAnyField': False, 'names': ['id', 'city', 'state', 'space', 'price', 'bathrooms', 'bedrooms', 'room_type', 'host_is_super_host', 'cancellation_policy', 'security_deposit', 'price_per_bedroom', 'number_of_reviews', 'extra_people', 'instant_bookable', 'cleaning_fee', 'review_scores_rating', 'square_feet'], 'fields': [{'metadata': {}, 'nullable': True, 'dataType': {'__class__': 'pyspark.sql.types.IntegerType'}, 'name': 'id', '__class__': 'pyspark.sql.types.StructField'}, {'metadata': {}, 'nullable': True, 'dataType': {'__class__': 'pyspark.sql.types.StringType'}, 'name': 'city', '__class__': 'pyspark.sql.types.StructField'}, {'metadata': {}, 'nullable': True, 'dataType': {'__class__': 'pyspark.sql.types.StringType'}, 'name': 'state', '__class__': 'pyspark.sql.types.StructField'}, {'metadata': {}, 'nullable': True, 'dataType': {'__class__': 'pyspark.sql.types.StringType'}, 'name': 'space', '__class__': 'pyspark.sql.types.StructField'}, {'metadata': {}, 'nullable': True, 'da

In [58]:
from pyspark.ml.common import _py2java
from pyspark.ml import PipelineModel
from pyspark.sql.types import StructType
import dill as pickle

spark_df_schema_as_json = df_final.schema.toInternal(df_final.schema)
with open('model.schema', 'wb') as pkl_file:
    pickle.dump(spark_df_schema_as_json, pkl_file)

pipeline_model.write().overwrite().save('model.parquet')

## SERVE FROM HERE
with open('model.schema', 'rb') as pkl_file:
    from pyspark.sql.types import _parse_datatype_json_string
    restored_spark_df_schema_as_json = pickle.load(pkl_file)
    print(type(restored_spark_df_schema_as_json))
    restored_spark_df_schema = restored_spark_df_schema_as_json.fromInternal(restored_spark_df_schema_as_json)
#_parse_datatype_json_string(restored_spark_df_schema_as_json)
    #print(restored_spark_df_schema)
#   restored_spark_df_schema_as_java = _py2java(spark_session, restored_spark_df_schema)

restored_spark_pipeline_model = PipelineModel.read().load('model.parquet')
restored_spark_pipeline_model_as_java = restored_spark_pipeline_model._to_java()

print(spark_session._jvm.org.jpmml.sparkml.ConverterUtil.toPMMLByteArray(restored_spark_df_schema,
                                                                          restored_spark_pipeline_model_as_java))

<class 'tuple'>


AttributeError: 'tuple' object has no attribute 'fromInternal'

In [24]:
import pio_bundler

pio_bundler.bundle(spark_session, df_final, pipeline_model_restore)

Py4JError: An error occurred while calling o358.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:272)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)



In [25]:
from jpmml import toPMMLBytes

model = toPMMLBytes(spark_session, df_final, pipeline_model_restore)

with open('airbnb.model', 'wb') as fh:
    fh.write(model)

In [26]:
!cat airbnb.model

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<PMML xmlns="http://www.dmg.org/PMML-4_3" version="4.3">
	<Header>
		<Application/>
		<Timestamp>2017-05-21T02:37:48Z</Timestamp>
	</Header>
	<DataDictionary>
		<DataField name="bathrooms" optype="continuous" dataType="double"/>
		<DataField name="bedrooms" optype="continuous" dataType="double"/>
		<DataField name="security_deposit" optype="continuous" dataType="double"/>
		<DataField name="cleaning_fee" optype="continuous" dataType="double"/>
		<DataField name="extra_people" optype="continuous" dataType="double"/>
		<DataField name="number_of_reviews" optype="continuous" dataType="double"/>
		<DataField name="square_feet" optype="continuous" dataType="double"/>
		<DataField name="review_scores_rating" optype="continuous" dataType="double"/>
		<DataField name="room_type" optype="categorical" dataType="string">
			<Value value="Entire home/apt"/>
			<Value value="Private room"/>
			<Value value="Shared room"/>
		

## Step 5:  Deploy the Pipeline Model

### Option 1 of 2:  Deploy with PipelineIO CLI!

In [None]:
%%bash

pip install -q pio-cli==0.37

Configure CLI for Model Deployment

In [None]:
%%bash

pio init-model --model-server-url=http://prediction-pmml.demo.pipeline.io --model-type=pmml \
    --model-namespace=cli --model-name=airbnb

In [None]:
%%bash

pio deploy --model-version=v0 airbnb.model

### Deployment Option 2:  REST API

In [None]:
import requests

deploy_url = 'http://prediction-pmml.demo.pipeline.io/api/v1/model/deploy/pmml/rest/airbnb/v0'

files = {'file': open('airbnb.model', 'rb')}

response = requests.post(deploy_url, files=files)

print("Success! %s" % response.text)

## Step 6:  Predict With Deployed Pipeline Model

### Setup Prediction Inputs

In [None]:
import json

data = {"bathrooms":2.0, 
        "bedrooms":2.0, 
        "security_deposit":175.00, 
        "cleaning_fee":25.0, 
        "extra_people":1.0, 
        "number_of_reviews": 2.0, 
        "square_feet": 250.0, 
        "review_scores_rating": 2.0, 
        "room_type": "Entire home/apt", 
        "host_is_super_host": "0.0", 
        "cancellation_policy": "flexible", 
        "instant_bookable": "1.0", 
        "state": "CA" }

json_data = json.dumps(data)

with open('airbnb-predict-inputs.json', 'wt') as fh:
    fh.write(json_data)

### Predict with CLI
Note:  Run again if you see a fallback on the first try. (bug)

In [None]:
%%bash

pio predict --model-version=v0 \
            --model-input-filename=airbnb-predict-inputs.json

### Predict with REST

In [None]:
import json

# Note:  You may need to run this twice.
#        A fallback will trigger the first time. (Bug)
predict_url = 'http://prediction-pmml.demo.pipeline.io/api/v1/model/predict/pmml/rest/airbnb/v0'

headers = {'content-type': 'application/json'}

response = requests.post(predict_url, 
                         data=json_data, 
                         headers=headers)

print(response.text)

## Step 7:  Monitor Model Servers through Dashboards

### Fallbacks and Circuit Breaker [Dashboard](http://hystrix.demo.pipeline.io/hystrix-dashboard/monitor/monitor.html?streams=%5B%7B%22name%22%3A%22Model%20Servers%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%5D)

In [None]:
%%html

<iframe width=800 height=600 src="http://hystrix.demo.pipeline.io/hystrix-dashboard/monitor/monitor.html?streams=%5B%7B%22name%22%3A%22Model%20Servers%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%5D"></iframe>

### Grafana Prediction Metrics [Dashboard](http://grafana.demo.pipeline.io)

In [None]:
%%html

<iframe width=800 height=600 src="http://grafana.demo.pipeline.io"></iframe>