# Connect to Hive

In [2]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = "team23"

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

sc = spark.sparkContext

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib64/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [2]:
spark

# list Hive databases

In [3]:
print(spark.catalog.listDatabases())
spark.sql("SHOW DATABASES;").show()

[Database(name='default', description='Default Hive database', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/apps/hive/warehouse'), Database(name='root_db', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/root/root_db'), Database(name='team0_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team0/project/hive/warehouse'), Database(name='team12_hive_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team12/project/hive/warehouse'), Database(name='team13_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team13/project/hive/warehouse'), Database(name='team14_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team14/project/hive/warehouse'), Database(name='team15_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team15/project/hive/warehouse'), Database(name='team16_projectdb', description

In [4]:
spark.sql("USE team23_projectdb;")
spark.sql("SHOW TABLES;").show()

+----------------+----------------+-----------+
|       namespace|       tableName|isTemporary|
+----------------+----------------+-----------+
|team23_projectdb|airbnb_part_buck|      false|
|team23_projectdb|      q1_results|      false|
|team23_projectdb|      q2_results|      false|
|team23_projectdb|      q3_results|      false|
|team23_projectdb|      q4_results|      false|
|team23_projectdb|      q5_results|      false|
+----------------+----------------+-----------+



# Specify the input and output features

In [5]:
# We will use the following features
# Excluded 'thumbnail_url' and 'id' because it has no valuable information to extract
# Exclude? host-related attributes since they contain little information about listing itself
# Excluded 'first_review', 'host_response_rate', 'last_review' 'review_scores_rating'  because of large amount of Null values
features = ['property_type', 'room_type', 'amenities', 'accommodates', 'bathrooms', 'bed_type', 'cancellation_policy',\
            'cleaning_fee', 'city', 'description', 'host_has_profile_pic', 'host_identity_verified','host_since',\
            'instant_bookable', 'latitude', 'longitude', 'name', 'neighbourhood', 'number_of_reviews', 'zipcode', 'beds', 'bedrooms']

# The output/target of our model
label = 'log_price'

# Read hive tables

In [6]:
# make display fancy
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [7]:
airbnb = spark.read.format("avro").table('team23_projectdb.airbnb_part_buck')
airbnb

id,log_price,property_type,room_type,amenities,accommodates,bathrooms,bed_type,cancellation_policy,cleaning_fee,city,description,first_review,host_has_profile_pic,host_identity_verified,host_response_rate,host_since,instant_bookable,last_review,latitude,longitude,name,neighbourhood,number_of_reviews,review_scores_rating,thumbnail_url,zipcode,beds,bedrooms
10381948,4.454347296253506,Apartment,Entire home/apt,"{Internet,""Wirele...",4,1.0,Real Bed,moderate,True,NYC,Unfurnished 3rd f...,2017-07-04,True,True,100.0,2013-11-27,False,2017-07-04,40.64538447381832,-73.95399805001759,Flatbush Townhous...,Flatbush,1,100.0,https://a0.muscac...,11226.0,1.0,3
1454258,6.163314804034643,House,Entire home/apt,"{TV,""Cable TV"",In...",6,3.0,Real Bed,strict,True,SF,Large family hom...,2011-06-15,True,True,90.0,2011-06-02,False,2014-07-24,37.7181254491016,-122.40485476143424,Large sunny famil...,Visitacion Valley,31,91.0,https://a0.muscac...,94134.0,4.0,3
4387007,5.6167710976665735,House,Entire home/apt,"{TV,""Cable TV"",In...",11,2.0,Real Bed,strict,True,LA,Located just 2 mi...,2015-02-09,True,True,100.0,2014-09-27,False,2017-04-14,33.793134469297684,-118.20185410325244,Convention Center...,,57,92.0,https://a0.muscac...,90806.0,4.0,3
13539571,5.988961416889862,Apartment,Entire home/apt,"{TV,""Cable TV"",In...",9,2.5,Real Bed,strict,True,NYC,Luxury and modern...,2016-03-28,True,True,100.0,2012-11-16,False,2017-09-10,40.73758260955226,-74.00553133706515,NYC Dreaming Luxu...,West Village,34,90.0,https://a0.muscac...,10014.0,5.0,3
11225342,5.700443573390688,House,Entire home/apt,"{TV,""Cable TV"",In...",6,2.0,Real Bed,strict,True,DC,Chic private rowh...,2014-04-28,True,True,100.0,2014-01-21,False,2017-04-23,38.90133326787189,-76.98488420278608,Chic 3BR Rowhouse...,Trinidad,48,87.0,,20002.0,3.0,3
3384720,5.003946305945459,House,Entire home/apt,"{TV,""Cable TV"",In...",7,2.5,Real Bed,strict,True,LA,Presenting the Se...,2017-01-24,True,True,100.0,2012-08-06,False,2017-04-07,33.972498276982506,-118.3712900225893,Sparkling new 3/2...,,12,93.0,https://a0.muscac...,90045.0,5.0,3
13278515,6.214608098422191,Apartment,Entire home/apt,"{TV,""Cable TV"",In...",8,2.0,Real Bed,strict,False,NYC,You are renting a...,2017-02-11,True,True,100.0,2013-07-31,True,2017-02-11,40.815879954255536,-73.93984152930561,HUGE BEAUTIFUL 3 ...,Harlem,1,80.0,https://a0.muscac...,10037.0,3.0,3
17380802,6.476972362889682,Apartment,Entire home/apt,"{TV,""Cable TV"",In...",8,2.0,Real Bed,strict,True,NYC,This is an extrem...,2016-05-23,True,True,90.0,2014-04-21,False,2017-09-13,40.78586739584127,-73.94763737232486,Family Friendly 3...,East Harlem,27,93.0,https://a0.muscac...,10128.0,5.0,3
18566816,4.867534450455582,House,Entire home/apt,"{""Wireless Intern...",4,1.0,Real Bed,moderate,True,LA,(Shared home with...,2016-08-14,True,True,,2015-06-02,False,2016-08-27,34.10872996857901,-118.1794665487113,"Comfortable, 3br ...",Hermon,2,70.0,https://a0.muscac...,90042.0,3.0,3
18176798,5.272999558563747,House,Entire home/apt,"{TV,""Cable TV"",In...",5,2.0,Real Bed,strict,True,LA,Beautiful fully u...,2016-10-04,True,True,100.0,2014-06-12,False,2017-04-24,33.75906350605493,-118.13844496573584,Beautiful home in...,Long Beach,21,100.0,https://a0.muscac...,90803.0,4.0,3


In [8]:
# calculate uniques for a column (unused)
def uniques(df, col):
    return list(map(lambda x: x[col], airbnb.select(col).distinct().collect()))

# count null values across columns
def cnull(df):
    return {col:df.filter(df[col].isNull()).count() for col in df.columns}

display(cnull(airbnb))
airbnb.printSchema()

{'id': 0,
 'log_price': 0,
 'property_type': 0,
 'room_type': 0,
 'amenities': 0,
 'accommodates': 0,
 'bathrooms': 200,
 'bed_type': 0,
 'cancellation_policy': 0,
 'cleaning_fee': 0,
 'city': 0,
 'description': 0,
 'first_review': 15864,
 'host_has_profile_pic': 188,
 'host_identity_verified': 188,
 'host_response_rate': 18299,
 'host_since': 188,
 'instant_bookable': 0,
 'last_review': 15827,
 'latitude': 0,
 'longitude': 0,
 'name': 0,
 'neighbourhood': 6872,
 'number_of_reviews': 0,
 'review_scores_rating': 16722,
 'thumbnail_url': 8216,
 'zipcode': 966,
 'beds': 131,
 'bedrooms': 91}

root
 |-- id: integer (nullable = true)
 |-- log_price: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- amenities: string (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- bathrooms: float (nullable = true)
 |-- bed_type: string (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- cleaning_fee: boolean (nullable = true)
 |-- city: string (nullable = true)
 |-- description: string (nullable = true)
 |-- first_review: date (nullable = true)
 |-- host_has_profile_pic: boolean (nullable = true)
 |-- host_identity_verified: boolean (nullable = true)
 |-- host_response_rate: float (nullable = true)
 |-- host_since: date (nullable = true)
 |-- instant_bookable: boolean (nullable = true)
 |-- last_review: date (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 

# Feature selection

In [9]:
import pyspark.sql.functions as F
# drop rows with missing values
df2 = airbnb.select(features + [label]).na.drop()
df2 = df2.withColumnRenamed('log_price', 'label')
df2

property_type,room_type,amenities,accommodates,bathrooms,bed_type,cancellation_policy,cleaning_fee,city,description,host_has_profile_pic,host_identity_verified,host_since,instant_bookable,latitude,longitude,name,neighbourhood,number_of_reviews,zipcode,beds,bedrooms,label
Apartment,Entire home/apt,"{Internet,""Wirele...",4,1.0,Real Bed,moderate,True,NYC,Unfurnished 3rd f...,True,True,2013-11-27,False,40.64538447381832,-73.95399805001759,Flatbush Townhous...,Flatbush,1,11226.0,1.0,3,4.454347296253506
House,Entire home/apt,"{TV,""Cable TV"",In...",6,3.0,Real Bed,strict,True,SF,Large family hom...,True,True,2011-06-02,False,37.7181254491016,-122.40485476143424,Large sunny famil...,Visitacion Valley,31,94134.0,4.0,3,6.163314804034643
Apartment,Entire home/apt,"{TV,""Cable TV"",In...",9,2.5,Real Bed,strict,True,NYC,Luxury and modern...,True,True,2012-11-16,False,40.73758260955226,-74.00553133706515,NYC Dreaming Luxu...,West Village,34,10014.0,5.0,3,5.988961416889862
House,Entire home/apt,"{TV,""Cable TV"",In...",6,2.0,Real Bed,strict,True,DC,Chic private rowh...,True,True,2014-01-21,False,38.90133326787189,-76.98488420278608,Chic 3BR Rowhouse...,Trinidad,48,20002.0,3.0,3,5.700443573390688
Apartment,Entire home/apt,"{TV,""Cable TV"",In...",8,2.0,Real Bed,strict,False,NYC,You are renting a...,True,True,2013-07-31,True,40.815879954255536,-73.93984152930561,HUGE BEAUTIFUL 3 ...,Harlem,1,10037.0,3.0,3,6.214608098422191
Apartment,Entire home/apt,"{TV,""Cable TV"",In...",8,2.0,Real Bed,strict,True,NYC,This is an extrem...,True,True,2014-04-21,False,40.78586739584127,-73.94763737232486,Family Friendly 3...,East Harlem,27,10128.0,5.0,3,6.476972362889682
House,Entire home/apt,"{""Wireless Intern...",4,1.0,Real Bed,moderate,True,LA,(Shared home with...,True,True,2015-06-02,False,34.10872996857901,-118.1794665487113,"Comfortable, 3br ...",Hermon,2,90042.0,3.0,3,4.867534450455582
House,Entire home/apt,"{TV,""Cable TV"",In...",5,2.0,Real Bed,strict,True,LA,Beautiful fully u...,True,True,2014-06-12,False,33.75906350605493,-118.13844496573584,Beautiful home in...,Long Beach,21,90803.0,4.0,3,5.272999558563747
House,Entire home/apt,"{TV,""Wireless Int...",8,2.0,Real Bed,strict,True,LA,Within walking di...,True,True,2016-05-03,False,34.05597780443645,-118.1188218344203,Charming house 3b...,Monterey Park,12,91755.0,5.0,3,5.3706380281276624
House,Entire home/apt,"{TV,""Cable TV"",""W...",6,2.0,Real Bed,strict,True,DC,Fully Restored Fo...,True,True,2012-11-08,False,38.88354324515544,-77.00085622063551,Home In The Shado...,Capitol Hill,8,20003.0,4.0,3,5.476463551931511


In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Word2Vec, Tokenizer, RegexTokenizer
from pyspark.sql.functions import col

categoricalCols = ['property_type', 'room_type', 'bed_type', 'cancellation_policy', 'city', 'neighbourhood', 'zipcode']
textCols = ['name', 'description']
dateCols = ['host_since']
booleanCols = ['cleaning_fee', 'host_has_profile_pic', 'host_identity_verified', 'instant_bookable']
numericalCols = ['accommodates', 'bathrooms', 'number_of_reviews', 'beds', 'bedrooms']
geoCols = [['latitude', 'longitude']]
jsonCols = ['amenities']

# cast all boolean and numerical columns to the same type
numericalCols += booleanCols
for c in numericalCols:
    df2 = df2.withColumn(c, df2[c].cast('float'))
df2

property_type,room_type,amenities,accommodates,bathrooms,bed_type,cancellation_policy,cleaning_fee,city,description,host_has_profile_pic,host_identity_verified,host_since,instant_bookable,latitude,longitude,name,neighbourhood,number_of_reviews,zipcode,beds,bedrooms,label
Loft,Entire home/apt,"{Internet,""Wirele...",16.0,2.0,Real Bed,strict,1.0,NYC,Welcome Home! Hea...,1.0,1.0,2014-03-01,0.0,40.70858224218985,-73.95301425964071,2 Penthouse Lofts...,Williamsburg,44.0,11211.0,10.0,7.0,6.507277712385013
Townhouse,Entire home/apt,"{TV,""Cable TV"",In...",16.0,4.0,Real Bed,strict,1.0,NYC,Newly renovated a...,1.0,1.0,2014-06-10,1.0,40.71496144556247,-73.9459327654385,JUST LISTED 7bd/4...,Williamsburg,7.0,11211.0,9.0,7.0,6.796823718274855
House,Entire home/apt,"{TV,Internet,""Wir...",12.0,6.0,Real Bed,moderate,1.0,LA,Our house is clos...,1.0,1.0,2016-05-17,0.0,33.97361865838191,-118.41135788062414,Spacious with Vie...,Westchester/Playa...,9.0,90045.0,6.0,7.0,6.429719478039137
Villa,Entire home/apt,"{TV,""Cable TV"",In...",10.0,8.0,Real Bed,strict,0.0,LA,Our beautiful Est...,1.0,1.0,2014-08-01,0.0,34.167797954394736,-118.50355353313894,SUMMER VACATION V...,Encino,1.0,91316.0,9.0,7.0,7.495541943884256
Other,Entire home/apt,"{TV,""Cable TV"",In...",16.0,4.0,Real Bed,moderate,1.0,LA,Great for busines...,1.0,1.0,2014-02-09,0.0,34.0056580274334,-118.43327536192491,West LA Compound ...,Mar Vista,5.0,90066.0,10.0,7.0,6.762729506931878
Apartment,Entire home/apt,"{TV,Internet,""Wir...",12.0,5.0,Real Bed,strict,1.0,LA,Rent our entire b...,1.0,1.0,2014-04-14,0.0,33.99078924403168,-118.47600494529516,7BR/4.5BA---Large...,Venice,20.0,90291.0,10.0,7.0,6.826545223556594
House,Entire home/apt,"{""Cable TV"",Inter...",16.0,5.0,Real Bed,strict,1.0,NYC,6 bedrooms total ...,1.0,1.0,2011-11-01,0.0,40.59192978891966,-74.06476426213608,Wedding guests ac...,South Beach,1.0,10305.0,15.0,7.0,6.437751649736401
House,Entire home/apt,"{TV,""Cable TV"",In...",16.0,4.0,Real Bed,strict,1.0,NYC,"Huge 7 bedrooms ,...",1.0,0.0,2014-08-20,0.0,40.68542277116447,-73.95183895659102,New York I Love Y...,Bedford-Stuyvesant,87.0,11216.0,11.0,7.0,6.956545443151567
House,Entire home/apt,"{TV,""Cable TV"",In...",16.0,2.0,Real Bed,strict,1.0,LA,Luxurious 4 bedro...,1.0,0.0,2015-06-01,0.0,34.09580113492421,-118.348376862567,W Hollywood Dream,Hollywood,99.0,90046.0,10.0,7.0,5.857933154483459
Condominium,Entire home/apt,"{TV,Internet,""Wir...",16.0,4.0,Real Bed,strict,1.0,Boston,Beautifully redes...,1.0,0.0,2017-09-07,0.0,42.32916951239184,-71.09288095270476,Spacious 3-Floor ...,Roxbury,0.0,2119.0,13.0,7.0,5.703782474656202


# Feature extraction

In [11]:
# Custom transformer for date features in YYYY-MM-DD format
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType
from pyspark.ml.linalg import Vectors, VectorUDT
import pyspark.sql.functions as F
import math
    
class YMDTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):  
    @keyword_only
    def __init__(self, inputCol: str = "input", outputCol: str = "output"):
        super(YMDTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.set_params(**kwargs)
    
    @keyword_only
    def set_params(self, inputCol: str = "input", outputCol: str = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)

  
    def _transform(self, df: DataFrame):
        input_col = self.getInputCol()
        output_col = self.getOutputCol()
        
        DAY_PERIOD = 31
        MONTH_PERIOD = 12
        
        # split the data and cast to float
        input_col = F.split(df[input_col], '-').cast("array<float>")
        
        # apply transform to day
        d_sin = F.sin(2 * math.pi * F.element_at(input_col, 3) / DAY_PERIOD)
        d_cos = F.cos(2 * math.pi * F.element_at(input_col, 3) / DAY_PERIOD)
        
        # apply transform to month
        m_sin = F.sin(2 * math.pi * F.element_at(input_col, 2) / MONTH_PERIOD)
        m_cos = F.cos(2 * math.pi * F.element_at(input_col, 2) / MONTH_PERIOD)
        
        # year remains as is
        y = F.element_at(input_col, 1)
        
        # pack everything into a vector for VectorAssembler
        atov = F.udf(lambda l: Vectors.dense(l), VectorUDT())
        res = F.array(d_sin, d_cos, m_sin, m_cos, y)
        res = atov(res)
        
        return df.withColumn(output_col, res)

ymd_transformers = [ YMDTransformer(inputCol=c, outputCol="{0}_transformed".format(c)) for c in dateCols ]

In [12]:
# Custom transformer for date features in YYYY-MM-DD format
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCols, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType
from pyspark.ml.linalg import Vectors, VectorUDT
import pyspark.sql.functions as F
import math
    
class ECEFTransformer(Transformer, HasInputCols, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):  
    @keyword_only
    def __init__(self, inputCols: str = "input", outputCol: str = "output"):
        super(ECEFTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.set_params(**kwargs)
    
    @keyword_only
    def set_params(self, inputCols: str = "input", outputCol: str = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)

  
    def _transform(self, df: DataFrame):
        input_cols = self.getInputCols()
        output_col = self.getOutputCol()
        
        a = 6378137.0;               # WGS-84 semi-major axis
        e2 = 6.6943799901377997e-3;  # WGS-84 first eccentricity squared
        
        lat = df[input_cols[0]]
        lon = df[input_cols[1]]
        
        n = a / F.sqrt(1 - e2 * F.sin(lat) * F.sin(lat));
        x = n * F.cos(lat) * F.cos(lon);    # ECEF x
        y = n * F.cos(lat) * F.sin(lon);    # ECEF y
        z = (n * (1 - e2 ))* F.sin(lat);    # ECEF z
        
        # pack everything into a vector for VectorAssembler
        atov = F.udf(lambda l: Vectors.dense(l), VectorUDT())
        res = F.array(x, y, z)
        res = atov(res)
        
        return df.withColumn(output_col, res)

ecef_transformers = [ ECEFTransformer(inputCols=c, outputCol="{0}_transformed".format(c)) for c in geoCols ]

In [27]:
from pyspark.ml.feature import PCA

# Tokenize textual features by words
tokenizers = [ RegexTokenizer(inputCol=c, outputCol="{0}_tokens".format(c), pattern=" ") for c in textCols ]
tokenizers += [ RegexTokenizer(inputCol=c, outputCol="{0}_tokens".format(c), pattern="[\",{}]+") for c in jsonCols ]

# Vectorize them
vectorizers = [ Word2Vec(vectorSize=50, seed=42, minCount=1, inputCol=tokenizer.getOutputCol(), outputCol="{0}_vectorized".format(tokenizer.getOutputCol())) for tokenizer in tokenizers ]

# Create String indexer to assign index for the string fields where each unique string will get a unique index
# String Indexer is required as an input for One-Hot Encoder 
# We set the case as `skip` for any string out of the input strings
indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categoricalCols ]

# Encode the strings using One Hot encoding
# default setting: dropLast=True ==> For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ]

# This will concatenate the input cols into a single column.
assembler = VectorAssembler(inputCols= \
                            [encoder.getOutputCol() for encoder in encoders] +\
                            [vectorizer.getOutputCol() for vectorizer in vectorizers] +\
                            [ymd.getOutputCol() for ymd in ymd_transformers] +\
                            [ecef.getOutputCol() for ecef in ecef_transformers] +\
                            numericalCols, outputCol= "features")

# Apply PCA to reduce dimetionalty and reduce computation time
pca = PCA(k=200, inputCol='features', outputCol='components')

# You can create a pipeline to use only a single fit and transform on the data.
pipeline = Pipeline(stages=ymd_transformers + ecef_transformers + tokenizers + vectorizers + indexers + encoders + [assembler] + [pca])


# Fit the pipeline ==> This will call the fit functions for all transformers if exist
model = pipeline.fit(df2)
# Fit the pipeline ==> This will call the transform functions for all transformers
data = model.transform(df2)

display(data)

# We delete all features and keep only the features and label columns
transformed = data.select(["components", "label"])
transformed = transformed.withColumnRenamed('components', 'features')


from pyspark.ml.feature import VectorIndexer

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4
# distinct values are treated as continuous.
#featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=1000).fit(data)
#transformed = featureIndexer.transform(data)

# Display the output Spark DataFrame
display(transformed)

property_type,room_type,amenities,accommodates,bathrooms,bed_type,cancellation_policy,cleaning_fee,city,description,host_has_profile_pic,host_identity_verified,host_since,instant_bookable,latitude,longitude,name,neighbourhood,number_of_reviews,zipcode,beds,bedrooms,label,host_since_transformed,"['latitude', 'longitude']_transformed",name_tokens,description_tokens,amenities_tokens,name_tokens_vectorized,description_tokens_vectorized,amenities_tokens_vectorized,property_type_indexed,room_type_indexed,bed_type_indexed,cancellation_policy_indexed,city_indexed,neighbourhood_indexed,zipcode_indexed,property_type_indexed_encoded,room_type_indexed_encoded,bed_type_indexed_encoded,cancellation_policy_indexed_encoded,city_indexed_encoded,neighbourhood_indexed_encoded,zipcode_indexed_encoded,features,components
Loft,Entire home/apt,"{Internet,""Wirele...",16.0,2.0,Real Bed,strict,1.0,NYC,Welcome Home! Hea...,1.0,1.0,2014-03-01,0.0,40.70858224218985,-73.95301425964071,2 Penthouse Lofts...,Williamsburg,44.0,11211.0,10.0,7.0,6.507277712385013,[0.20129852008866...,[-791989.69041115...,"[2, penthouse, lo...","[welcome, home!, ...","[internet, wirele...",[-0.0423881512833...,[0.11991416706521...,[0.04649122953414...,4.0,0.0,0.0,0.0,0.0,0.0,0.0,"(31,[4],[1.0])","(2,[0],[1.0])","(4,[0],[1.0])","(4,[0],[1.0])","(5,[0],[1.0])","(617,[0],[1.0])","(709,[0],[1.0])","(1539,[4,31,33,37...",[3921718.14534187...
Townhouse,Entire home/apt,"{TV,""Cable TV"",In...",16.0,4.0,Real Bed,strict,1.0,NYC,Newly renovated a...,1.0,1.0,2014-06-10,1.0,40.71496144556247,-73.9459327654385,JUST LISTED 7bd/4...,Williamsburg,7.0,11211.0,9.0,7.0,6.796823718274855,[0.89780453957074...,[-748161.61146844...,"[just, listed, 7b...","[newly, renovated...","[tv, cable tv, in...",[-1.8260947295597...,[0.03021800236009...,[0.07850961913836...,3.0,0.0,0.0,0.0,0.0,0.0,0.0,"(31,[3],[1.0])","(2,[0],[1.0])","(4,[0],[1.0])","(4,[0],[1.0])","(5,[0],[1.0])","(617,[0],[1.0])","(709,[0],[1.0])","(1539,[3,31,33,37...",[3900146.47848771...
House,Entire home/apt,"{TV,Internet,""Wir...",12.0,6.0,Real Bed,moderate,1.0,LA,Our house is clos...,1.0,1.0,2016-05-17,0.0,33.97361865838191,-118.41135788062414,Spacious with Vie...,Westchester/Playa...,9.0,90045.0,6.0,7.0,6.429719478039137,[-0.2993631229733...,[-3014926.0629762...,"[spacious, with, ...","[our, house, is, ...","[tv, internet, wi...",[-0.0133022523950...,[0.06996398760184...,[-0.0779974117227...,1.0,0.0,0.0,2.0,1.0,80.0,183.0,"(31,[1],[1.0])","(2,[0],[1.0])","(4,[0],[1.0])","(4,[2],[1.0])","(5,[1],[1.0])","(617,[80],[1.0])","(709,[183],[1.0])","(1539,[1,31,33,39...",[4825476.62661572...
Villa,Entire home/apt,"{TV,""Cable TV"",In...",10.0,8.0,Real Bed,strict,0.0,LA,Our beautiful Est...,1.0,1.0,2014-08-01,0.0,34.167797954394736,-118.50355353313894,SUMMER VACATION V...,Encino,1.0,91316.0,9.0,7.0,7.495541943884256,[0.20129852008866...,[-3774707.8528800...,"[summer, vacation...","[our, beautiful, ...","[tv, cable tv, in...",[0.17738503217697...,[0.05599493139155...,[-0.1705998602238...,11.0,0.0,0.0,0.0,1.0,179.0,277.0,"(31,[11],[1.0])","(2,[0],[1.0])","(4,[0],[1.0])","(4,[0],[1.0])","(5,[1],[1.0])","(617,[179],[1.0])","(709,[277],[1.0])","(1539,[11,31,33,3...",[3887210.89930795...
Other,Entire home/apt,"{TV,""Cable TV"",In...",16.0,4.0,Real Bed,moderate,1.0,LA,Great for busines...,1.0,1.0,2014-02-09,0.0,34.0056580274334,-118.43327536192491,West LA Compound ...,Mar Vista,5.0,90066.0,10.0,7.0,6.762729506931878,[0.96807711886620...,[-3174376.8049815...,"[west, la, compou...","[great, for, busi...","[tv, cable tv, in...",[0.01113446812087...,[0.03177414992988...,[-0.0665556309887...,5.0,0.0,0.0,2.0,1.0,47.0,36.0,"(31,[5],[1.0])","(2,[0],[1.0])","(4,[0],[1.0])","(4,[2],[1.0])","(5,[1],[1.0])","(617,[47],[1.0])","(709,[36],[1.0])","(1539,[5,31,33,39...",[4671115.91595961...
Apartment,Entire home/apt,"{TV,Internet,""Wir...",12.0,5.0,Real Bed,strict,1.0,LA,Rent our entire b...,1.0,1.0,2014-04-14,0.0,33.99078924403168,-118.47600494529516,7BR/4.5BA---Large...,Venice,20.0,90291.0,10.0,7.0,6.826545223556594,[0.29936312297335...,[-3329019.0518974...,[7br/4.5ba---larg...,"[rent, our, entir...","[tv, internet, wi...",[0.03382659827669...,[0.07593776972045...,[-0.0605223029851...,0.0,0.0,0.0,0.0,1.0,8.0,2.0,"(31,[0],[1.0])","(2,[0],[1.0])","(4,[0],[1.0])","(4,[0],[1.0])","(5,[1],[1.0])","(617,[8],[1.0])","(709,[2],[1.0])","(1539,[0,31,33,37...",[4620673.86958238...
House,Entire home/apt,"{""Cable TV"",Inter...",16.0,5.0,Real Bed,strict,1.0,NYC,6 bedrooms total ...,1.0,1.0,2011-11-01,0.0,40.59192978891966,-74.06476426213608,Wedding guests ac...,South Beach,1.0,10305.0,15.0,7.0,6.437751649736401,[0.20129852008866...,[-1453725.5138234...,"[wedding, guests,...","[6, bedrooms, tot...","[cable tv, intern...",[0.01901713025290...,[0.03585164933303...,[-0.1589858303112...,1.0,0.0,0.0,0.0,0.0,147.0,370.0,"(31,[1],[1.0])","(2,[0],[1.0])","(4,[0],[1.0])","(4,[0],[1.0])","(5,[0],[1.0])","(617,[147],[1.0])","(709,[370],[1.0])","(1539,[1,31,33,37...",[4292922.80498946...
House,Entire home/apt,"{TV,""Cable TV"",In...",16.0,4.0,Real Bed,strict,1.0,NYC,"Huge 7 bedrooms ,...",1.0,0.0,2014-08-20,0.0,40.68542277116447,-73.95183895659102,New York I Love Y...,Bedford-Stuyvesant,87.0,11216.0,11.0,7.0,6.956545443151567,[-0.7907757369376...,[-782008.41925998...,"[new, york, i, lo...","[huge, 7, bedroom...","[tv, cable tv, in...",[-4.7254695424011...,[-0.1364727475559...,[-1.8537812866270...,1.0,0.0,0.0,0.0,0.0,1.0,19.0,"(31,[1],[1.0])","(2,[0],[1.0])","(4,[0],[1.0])","(4,[0],[1.0])","(5,[0],[1.0])","(617,[1],[1.0])","(709,[19],[1.0])","(1539,[1,31,33,37...",[4033758.86158228...
House,Entire home/apt,"{TV,""Cable TV"",In...",16.0,2.0,Real Bed,strict,1.0,LA,Luxurious 4 bedro...,1.0,0.0,2015-06-01,0.0,34.09580113492421,-118.348376862567,W Hollywood Dream,Hollywood,99.0,90046.0,10.0,7.0,5.857933154483459,[0.20129852008866...,[-2931201.2307163...,"[w, hollywood, dr...","[luxurious, 4, be...","[tv, cable tv, in...",[0.13825953503449...,[0.07409816060979...,[-0.1152639814342...,1.0,0.0,0.0,0.0,1.0,6.0,6.0,"(31,[1],[1.0])","(2,[0],[1.0])","(4,[0],[1.0])","(4,[0],[1.0])","(5,[1],[1.0])","(617,[6],[1.0])","(709,[6],[1.0])","(1539,[1,31,33,37...",[4549773.02132050...
Condominium,Entire home/apt,"{TV,Internet,""Wir...",16.0,4.0,Real Bed,strict,1.0,Boston,Beautifully redes...,1.0,0.0,2017-09-07,0.0,42.32916951239184,-71.09288095270476,Spacious 3-Floor ...,Roxbury,0.0,2119.0,13.0,7.0,5.703782474656202,[0.98846832432811...,[208353.497414018...,"[spacious, 3-floo...","[beautifully, red...","[tv, internet, wi...",[0.03610078289057...,[0.12238188929820...,[0.39891987561713...,2.0,0.0,0.0,0.0,4.0,94.0,158.0,"(31,[2],[1.0])","(2,[0],[1.0])","(4,[0],[1.0])","(4,[0],[1.0])","(5,[4],[1.0])","(617,[94],[1.0])","(709,[158],[1.0])","(1539,[2,31,33,37...",[-5514980.9567123...


features,label
[4216889.54765810...,4.454347296253506
[-404103.25870228...,6.163314804034643
[3706716.19225232...,5.988961416889862
[6168033.18712171...,5.700443573390688
[3400674.87152927...,6.214608098422191
[3545277.91198766...,6.476972362889682
[4852668.75137374...,4.867534450455582
[5834389.12451625...,5.272999558563747
[5138755.54652896...,5.3706380281276624
[6183080.42069642...,5.476463551931511


# Split the dataset

In [28]:
#  split the data into 60% training and 40% test (it is not stratified)
(train_data, test_data) = transformed.randomSplit([0.6, 0.4], seed = 10)

In [29]:
# replaced coalesce(1) with repartition(1) to fix OoM issue
def run(command):
    import os
    return os.popen(command).read()

train_data.select("features", "label")\
    .repartition(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/train/*.json > data/train.json")

test_data.select("features", "label")\
    .repartition(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/test/*.json > data/test.json")

''

# First model

## Build a model

In [30]:
from pyspark.ml.regression import LinearRegression
# Create Linear Regression Model
lr = LinearRegression()

# Fit the data to the pipeline stages
model_lr = lr.fit(train_data)

## Predict for test data

In [31]:
predictions = model_lr.transform(test_data)
predictions

features,label,prediction
[4092002.94095381...,5.703782474656202,6.609680783409566
[-5598619.2167411...,4.663439094112067,4.567990083934333
[-5516777.1140283...,5.068904202220232,4.760195424020349
[-5509897.6597192...,4.84418708645859,5.084511207748029
[-5507316.1699486...,4.859812404361672,5.354349962143585
[-5486422.9653420...,5.105945473900579,5.332780828305253
[-5484119.1422463...,5.298317366548036,4.96465056951555
[-5483563.1568445...,5.272999558563747,5.0843308370725
[-5483261.7356413...,4.382026634673881,4.977633761743668
[-5480665.5947066...,4.941642422609304,4.5361501685313


## Evaluate the model

In [32]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator1_rmse.evaluate(predictions)
r2 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse))
print("R^2 on test data = {}".format(r2))

Root Mean Squared Error (RMSE) on test data = 0.4142084050540102
R^2 on test data = 0.6573771287986057


## Hyperparameter optimization

In [33]:
model_lr.params

[Param(parent='LinearRegression_8c16f6fe6f4a', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'),
 Param(parent='LinearRegression_8c16f6fe6f4a', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'),
 Param(parent='LinearRegression_8c16f6fe6f4a', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'),
 Param(parent='LinearRegression_8c16f6fe6f4a', name='featuresCol', doc='features column name.'),
 Param(parent='LinearRegression_8c16f6fe6f4a', name='fitIntercept', doc='whether to fit an intercept term.'),
 Param(parent='LinearRegression_8c16f6fe6f4a', name='labelCol', doc='label column name.'),
 Param(parent='LinearRegression_8c16f6fe6f4a', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'),
 Param(parent='LinearRegression_8c16f6fe6f4a', name='m

In [36]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

import numpy as np


grid = ParamGridBuilder()
grid = grid.addGrid(
                    model_lr.aggregationDepth, [2, 3, 4])\
                    .addGrid(model_lr.regParam, np.logspace(1e-3,1e-1)
                    )\
                    .build()

cv = CrossValidator(estimator = lr, 
                    estimatorParamMaps = grid, 
                    evaluator = evaluator1_rmse,
                    parallelism = 5,
                    numFolds=2)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

LinearRegressionModel: uid=LinearRegression_8c16f6fe6f4a, numFeatures=200

## Best model 1


In [37]:
from pprint import pprint
model1 = bestModel
pprint(model1.extractParamMap())

{Param(parent='LinearRegression_8c16f6fe6f4a', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LinearRegression_8c16f6fe6f4a', name='maxBlockSizeInMB', doc='maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0.'): 0.0,
 Param(parent='LinearRegression_8c16f6fe6f4a', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35,
 Param(parent='LinearRegression_8c16f6fe6f4a', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'): 'squaredError',
 Param(parent='LinearRegression_8c16f6fe6f4a', name='featuresCol', doc='features column nam

## Save the model to HDFS

In [38]:
model1.write().overwrite().save("project/models/model1")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model1 models/model1")

''

## Predict for test data using best model1

In [39]:
predictions = model1.transform(test_data)
predictions.show()

+--------------------+------------------+------------------+
|            features|             label|        prediction|
+--------------------+------------------+------------------+
|[-5523165.9650641...| 6.284134161070802|  5.27120250710247|
|[-5393038.8320565...| 5.886104031450156| 5.244698573415519|
|[-4679810.7749005...| 5.857933154483459| 5.237130931730095|
|[-4630082.3504710...| 5.579729825986222| 5.200419586227103|
|[-4620077.9808390...|5.5174528964647065| 5.291853860233713|
|[-4561480.8234697...| 6.396929655216146|5.2922197352126235|
|[-451373.79478320...|6.7900972355139055| 5.600734495954131|
|[-325396.03081678...| 6.907755278982138| 5.375813622726128|
|[-305443.43101485...| 6.163314804034643| 5.355330076374893|
|[-236543.99034439...| 6.212606095751518|5.3767963482511085|
|[-208017.94062362...| 7.244227515603349| 5.533447989092201|
|[-197571.92144895...| 7.090076835776093|5.4316750203713315|
|[3224395.32075969...| 5.521460917862246| 5.056450068522624|
|[3338447.99353148...| 3

In [40]:
predictions.select("label", "prediction")\
    .repartition(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model1_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model1_predictions.csv/*.csv > output/model1_predictions.csv")

''

## Evaluate the best model1

In [41]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse1 = evaluator1_rmse.evaluate(predictions)
r21 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse1))
print("R^2 on test data = {}".format(r21))

Root Mean Squared Error (RMSE) on test data = 0.5358906391132227
R^2 on test data = 0.4288600024400767


# Second model

## Build a model

In [42]:
from pyspark.ml.regression import GBTRegressor

# Create Linear Regression Model
gbt = GBTRegressor()

# Fit the data to the pipeline stages
model_gbt = gbt.fit(train_data)

## Predict for test data

In [43]:
predictions = model_gbt.transform(test_data)
predictions.show()

+--------------------+------------------+------------------+
|            features|             label|        prediction|
+--------------------+------------------+------------------+
|[-5523611.7212095...| 5.703782474656202|5.8057687547071755|
|[-254136.61278797...| 6.173786103901937| 6.060227357130425|
|[3733466.24061386...| 7.170119543449628| 6.417290068619777|
|[3882454.45059727...| 5.910796644040527| 5.524609594051106|
|[4038950.30573646...| 7.506591780070841|6.7448110895546645|
|[4955889.36152668...|5.6594822157596205| 6.040936109546295|
|[4115770.35759027...| 6.922643891475888|6.5693232039236475|
|[4150211.00937351...|6.3543700407973525| 5.659284619094498|
|[4483907.05002948...| 6.907755278982138| 6.237497493649422|
|[5471987.17468020...|6.1675164908883415| 6.069312263637892|
|[6106895.27768601...| 5.337538079701318| 5.617621084751361|
|[6129178.02453418...| 6.618738983517219| 6.718529204718719|
|[-295834.60538753...| 6.684611727667928| 6.084722910696591|
|[-292426.35351912...|7.

## Evaluate the model

In [44]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

Root Mean Squared Error (RMSE) on test data = 0.4373428289003199
R^2 on test data = 0.6312340497763067


## Hyperparameter optimization

In [45]:
model_gbt.params

[Param(parent='GBTRegressor_a06be0548f05', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'),
 Param(parent='GBTRegressor_a06be0548f05', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'),
 Param(parent='GBTRegressor_a06be0548f05', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'a

In [46]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

import numpy as np


grid = ParamGridBuilder()
grid = grid.addGrid(model_gbt.maxDepth, [2, 5]).addGrid(model_gbt.lossType, ['squared', 'absolute']).build()

cv = CrossValidator(estimator = gbt, 
                    estimatorParamMaps = grid, 
                    evaluator = evaluator2_rmse,
                    parallelism = 5,
                    numFolds=2)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

GBTRegressionModel: uid=GBTRegressor_a06be0548f05, numTrees=20, numFeatures=200

## Best model 2


In [47]:
from pprint import pprint
model2 = bestModel
pprint(model2.extractParamMap())

{Param(parent='GBTRegressor_a06be0548f05', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)), 'n' (when n is in the range (0, 1.0], use n * number of features. When n is in the range (1, number of features), use n features). default = 'auto'"): 'all',
 Param(parent='GBTRegressor_a06be0548f05', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 1,
 Param(parent='GBTRegressor_a06be0548f05', name='maxDepth', doc='Maximum depth of the tree.

## Save the model to HDFS

In [48]:
model2.write().overwrite().save("project/models/model2")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model2 models/model2")

''

## Predict for test data using best model2

In [50]:
predictions = model2.transform(test_data)
predictions.show()

+--------------------+------------------+------------------+
|            features|             label|        prediction|
+--------------------+------------------+------------------+
|[3683073.50203780...| 6.902742737158593| 6.739651836835824|
|[4038950.30573646...| 7.506591780070841| 6.797082014255193|
|[4955889.36152668...|5.6594822157596205| 5.506779268102148|
|[3686470.67574270...| 6.745236349484363| 5.692495510158493|
|[4115770.35759027...| 6.922643891475888| 6.615959432371952|
|[5471987.17468020...|6.1675164908883415| 5.679520933665446|
|[-5555179.3625704...|5.6167710976665735| 5.888261122837999|
|[4349103.27884350...| 6.214608098422191| 6.347087894129916|
|[6114522.00728311...| 5.501258210544727| 6.642786485635996|
|[-4267877.7361769...| 6.684611727667928| 5.414385113226421|
|[-4077398.0068237...|5.7071102647488745|5.5401765765942885|
|[-647730.25797904...|6.5352412710136605| 6.029274362620962|
|[4979127.88724229...| 6.214608098422191| 5.937287559867565|
|[-4568590.4857054...| 6

In [49]:
predictions.select("label", "prediction")\
    .repartition(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model2_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model2_predictions.csv/*.csv > output/model2_predictions.csv")

''

## Evaluate the best model2

In [51]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

Root Mean Squared Error (RMSE) on test data = 0.43410677637287337
R^2 on test data = 0.6377512835770616


# Compare best models

In [52]:
models = [[str(model1),rmse1, r21], [str(model2),rmse2, r22]]

df = spark.createDataFrame(models, ["model", "RMSE", "R2"])
df.show(truncate=False)

+-------------------------------------------------------------------------------+-------------------+------------------+
|model                                                                          |RMSE               |R2                |
+-------------------------------------------------------------------------------+-------------------+------------------+
|LinearRegressionModel: uid=LinearRegression_8c16f6fe6f4a, numFeatures=200      |0.5358906391132227 |0.4288600024400767|
|GBTRegressionModel: uid=GBTRegressor_a06be0548f05, numTrees=20, numFeatures=200|0.43410677637287337|0.6377512835770616|
+-------------------------------------------------------------------------------+-------------------+------------------+



In [53]:
df.repartition(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/evaluation.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation.csv/*.csv > output/evaluation.csv")

''