In [1]:
import mleap.feature
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.mleap.feature import OneHotEncoder
from mleap.spark.spark_support import SimpleSparkSerializer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression

In [2]:
from pyspark.ml.base import Transformer
import mleap.spark.spark_support

In [4]:
ps = SimpleSparkSerializer()
res = ps.deserializeFromBundle("/tmp/pyspark.lr")
res

pipeline_b19732905ec3

In [6]:
new_tf = PipelineModel.deserializeFromBundle("/tmp/pyspark.lr")

In [7]:
new_tf

PipelineModel_4940b73e72048a730973

In [6]:
df = spark.read.format("com.databricks.spark.avro").load("/tmp/airbnb.avro")


In [7]:
datasetFiltered = df.filter("price >= 50 AND price <= 750 and bathrooms > 0.0")
print(df.count())
print(datasetFiltered.count())

389255
321588


In [8]:
datasetFiltered.registerTempTable("df")

datasetImputed = spark.sql("""
    select
        id,
        city,
        case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
            then state
            else 'Other'
        end as state,
        space,
        price,
        bathrooms,
        bedrooms,
        room_type,
        host_is_superhost,
        cancellation_policy,
        case when security_deposit is null
            then 0.0
            else security_deposit
        end as security_deposit,
        price_per_bedroom,
        case when number_of_reviews is null
            then 0.0
            else number_of_reviews
        end as number_of_reviews,
        case when extra_people is null
            then 0.0
            else extra_people
        end as extra_people,
        instant_bookable,
        case when cleaning_fee is null
            then 0.0
            else cleaning_fee
        end as cleaning_fee,
        case when review_scores_rating is null
            then 80.0
            else review_scores_rating
        end as review_scores_rating,
        case when square_feet is not null and square_feet > 100
            then square_feet
            when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
            then 350.0
            else 380 * bedrooms
        end as square_feet
    from df
    where bedrooms is not null
""")


datasetImputed.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()

+-------+------------------+------------------+------------------+------------------+-----------------+
|summary|       square_feet|             price|          bedrooms|         bathrooms|     cleaning_fee|
+-------+------------------+------------------+------------------+------------------+-----------------+
|  count|            321588|            321588|            321588|            321588|           321588|
|   mean| 546.7441757777032|131.54961006007687|1.3352426085550455| 1.199068373198005|37.64188340360959|
| stddev|363.39839582374066| 90.10912788720098|0.8466586601060778|0.4830590051262673|42.64237791484579|
|    min|             104.0|              50.0|               0.0|               0.5|              0.0|
|    max|           32292.0|             750.0|              10.0|               8.0|            700.0|
+-------+------------------+------------------+------------------+------------------+-----------------+



In [9]:
# Most popular cities (original dataset)

spark.sql("""
    select 
        state,
        count(*) as n,
        cast(avg(price) as decimal(12,2)) as avg_price,
        max(price) as max_price
    from df
    group by state
    order by count(*) desc
""").show()

+-------------+-----+---------+---------+
|        state|    n|avg_price|max_price|
+-------------+-----+---------+---------+
|           NY|48362|   146.75|    750.0|
|           CA|44716|   158.76|    750.0|
|Île-de-France|40732|   107.74|    750.0|
|       London|17532|   117.71|    750.0|
|          NSW|14416|   167.96|    750.0|
|       Berlin|13098|    81.01|    650.0|
|Noord-Holland| 8890|   128.56|    750.0|
|          VIC| 8636|   144.49|    750.0|
|North Holland| 7636|   134.60|    700.0|
|           IL| 7544|   141.85|    750.0|
|           ON| 7186|   129.05|    750.0|
|           TX| 6702|   196.59|    750.0|
|           WA| 5858|   132.48|    750.0|
|    Catalonia| 5748|   106.39|    720.0|
|           BC| 5522|   133.14|    750.0|
|           DC| 5476|   136.56|    720.0|
|       Québec| 5116|   104.98|    700.0|
|    Catalunya| 4570|    99.36|    675.0|
|       Veneto| 4486|   131.71|    700.0|
|           OR| 4330|   114.02|    700.0|
+-------------+-----+---------+---

In [10]:
# Most expensive popular cities (original dataset)

spark.sql("""
    select 
        city,
        count(*) as n,
        cast(avg(price) as decimal(12,2)) as avg_price,
        max(price) as max_price
    from df
    group by city
    order by avg(price) desc
""").filter("n>25").show()

+-------------------+---+---------+---------+
|               city|  n|avg_price|max_price|
+-------------------+---+---------+---------+
|         Palm Beach| 56|   372.11|    701.0|
|        Watsonville| 78|   307.85|    670.0|
|  Pacific Palisades| 34|   295.18|    695.0|
|             Malibu|302|   280.42|    750.0|
|      Bilgola Beach| 30|   261.13|    601.0|
|      Playa Del Rey| 34|   255.76|    599.0|
|             Avalon| 80|   255.65|    701.0|
|Sydney Olympic Park| 40|   250.55|    520.0|
|           Tamarama|148|   247.45|    750.0|
|           Capitola| 72|   246.50|    650.0|
|    Manhattan Beach|240|   234.23|    700.0|
|       Avalon Beach| 82|   232.98|    620.0|
|            Del Mar| 38|   232.84|    650.0|
|         Birchgrove| 32|   228.63|    601.0|
|          Mona Vale| 52|   227.00|    572.0|
|       Venice Beach| 62|   224.45|    699.0|
|Rancho Palos Verdes| 82|   223.68|    750.0|
|      Darling Point| 60|   221.43|    623.0|
|    North Curl Curl| 26|   220.77

In [11]:
# Step 2. Create our feature pipeline and train it on the entire dataset
continuous_features = ["bathrooms", "bedrooms", "security_deposit", "cleaning_fee", "extra_people", "number_of_reviews", "square_feet", "review_scores_rating"]

categorical_features = ["room_type", "host_is_superhost", "cancellation_policy", "instant_bookable", "state"]

all_features = continuous_features + categorical_features

In [12]:
dataset_imputed = datasetImputed.persist()

In [13]:
[training_dataset, validation_dataset] = dataset_imputed.randomSplit([0.7, 0.3])

In [14]:
continuous_feature_assembler= VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")

continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features",\
                                           withStd=True, withMean=False)

In [15]:
categorical_feature_indexers = [StringIndexer(inputCol=x, outputCol="{}_index".format(x)) for x in categorical_features]

categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), outputCol="oh_encoder_{}".format(x.getOutputCol() )) for x in categorical_feature_indexers]


In [16]:
featureColsLr = [x.getOutputCol() for x in categorical_feature_one_hot_encoders]
featureColsLr.append("scaled_continuous_features")

featureAssemblerLr = VectorAssembler(inputCols=featureColsLr, outputCol="features_lr")


#estimatorsLr = [continuous_feature_assembler, continuous_feature_scaler] + categorical_feature_indexers + categorical_feature_one_hot_encoders + [featureAssemblerLr]
estimatorsLr = [continuous_feature_assembler, continuous_feature_scaler] + categorical_feature_indexers+ categorical_feature_one_hot_encoders

featurePipeline = Pipeline(stages=estimatorsLr)

sparkFeaturePipelineModel = featurePipeline.fit(dataset_imputed)

print("Finished constructing the pipeline")

Finished constructing the pipeline


In [17]:
sparkFeaturePipelineModel.transform(dataset_imputed)

DataFrame[id: string, city: string, state: string, space: string, price: double, bathrooms: double, bedrooms: double, room_type: string, host_is_superhost: double, cancellation_policy: string, security_deposit: double, price_per_bedroom: double, number_of_reviews: decimal(11,1), extra_people: double, instant_bookable: double, cleaning_fee: double, review_scores_rating: double, square_feet: double, unscaled_continuous_features: vector, scaled_continuous_features: vector, room_type_index: double, host_is_superhost_index: double, cancellation_policy_index: double, instant_bookable_index: double, state_index: double, oh_encoder_room_type_index: vector, oh_encoder_host_is_superhost_index: vector, oh_encoder_cancellation_policy_index: vector, oh_encoder_instant_bookable_index: vector, oh_encoder_state_index: vector]

In [18]:
# Step 3.2 Create our linear regression model

#linearRegression = LinearRegression(featuresCol="features_lr", labelCol="price", predictionCol="price_prediction", maxIter=10, regParam=0.3, elasticNetParam=0.8)

linearRegression = LinearRegression(featuresCol="scaled_continuous_features", labelCol="price", predictionCol="price_prediction", maxIter=10, regParam=0.3, elasticNetParam=0.8)

pipeline_lr = [sparkFeaturePipelineModel] + [linearRegression]

sparkPipelineEstimatorLr = Pipeline(stages = pipeline_lr)

sparkPipelineLr = sparkPipelineEstimatorLr.fit(dataset_imputed)

print("Complete: Training Linear Regression")

Complete: Training Linear Regression


In [19]:
sparkPipelineLr.serializeToBundle("/tmp/pyspark.lr")

In [22]:
new_tf = PipelineModel.deserializeFromBundle("/tmp/pyspark.lr")

Name: org.apache.toree.interpreter.broker.BrokerException
Message: Traceback (most recent call last):
  File "/var/folders/b2/swftcs0s4bvfpqfvn3rm0pzw0000gn/T/kernel-PySpark-72089548-3a6b-4381-a0e3-9c13e364f1ab/pyspark_runner.py", line 180, in <module>
    eval(compiled_code)
  File "<string>", line 1, in <module>
TypeError: deserializeFromBundle() takes exactly 1 argument (2 given)

StackTrace: org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:158)
org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:158)
scala.Option.foreach(Option.scala:257)
org.apache.toree.interpreter.broker.BrokerState.markFailure(BrokerState.scala:157)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.r