### Apartment price regression testing...


In [1]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.sql import SparkSession
from pyspark.mllib.regression import LabeledPoint


def loadMongoRDD(spark, db, collection):
    '''
    Download data from mongodb and store it in RDD format
    '''

    dataRDD = spark.read.format("mongo") \
        .option('uri', f"mongodb://10.4.41.48/{db}.{collection}") \
        .load() \
        .rdd \
        .cache()

    return dataRDD


spark = SparkSession \
    .builder \
    .master(f"local[*]") \
    .appName("myApp") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .getOrCreate()

rdd = loadMongoRDD(spark, db='formatted', collection='data')
rdd.take(1)
rdd.count()

22/06/17 00:44:23 WARN Utils: Your hostname, m1Mac.local resolves to a loopback address: 127.0.0.1; using 192.168.0.159 instead (on interface en0)
22/06/17 00:44:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/efwerr/.ivy2/cache
The jars for the packages stored in: /Users/efwerr/.ivy2/jars
:: loading settings :: url = jar:file:/Users/efwerr/miniforge3/envs/bdm_env/lib/python3.10/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ebdf3d3c-a128-43a4-8426-d8c81f5f8a53;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 163ms :: artifacts dl 3ms
	

3905

### Cheating with pyspark DF

In [113]:
idealistaDF = rdd.toDF().select('Price', 
                                'Floor',
                                'Bedrooms',
                                'Rooms',
                                'Size',
                                'PropertyType',
                                'District Code',
                                'Neighborhood Code').fillna('unavailable')
idealistaDF.printSchema()

root
 |-- Price: double (nullable = true)
 |-- Floor: string (nullable = false)
 |-- Bedrooms: long (nullable = true)
 |-- Rooms: long (nullable = true)
 |-- Size: double (nullable = true)
 |-- PropertyType: string (nullable = false)
 |-- District Code: string (nullable = false)
 |-- Neighborhood Code: string (nullable = false)



In [114]:
#   ##  import the required libraries
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType

#   ##  gather the distinct values
distinct_values = idealistaDF.select("Floor")\
                    .distinct()\
                    .rdd\
                    .flatMap(lambda x: x).collect()

#   ##  for each of the gathered values create a new column 
for distinct_value in distinct_values:
    function = udf(lambda item: 
                   1 if item == distinct_value else 0, 
                   IntegerType())
    new_column_name = "Floor"+'_'+distinct_value
    idealistaDF = idealistaDF.withColumn(new_column_name, function(col("Floor")))

#   ##  gather the distinct values
distinct_values = idealistaDF.select("PropertyType")\
                    .distinct()\
                    .rdd\
                    .flatMap(lambda x: x).collect()

#   ##  for each of the gathered values create a new column 
for distinct_value in distinct_values:
    function = udf(lambda item: 
                   1 if item == distinct_value else 0, 
                   IntegerType())
    new_column_name = "PropertyType"+'_'+distinct_value
    idealistaDF = idealistaDF.withColumn(new_column_name, function(col("PropertyType")))

idealistaDF.printSchema()



root
 |-- Price: double (nullable = true)
 |-- Floor: string (nullable = false)
 |-- Bedrooms: long (nullable = true)
 |-- Rooms: long (nullable = true)
 |-- Size: double (nullable = true)
 |-- PropertyType: string (nullable = false)
 |-- District Code: string (nullable = false)
 |-- Neighborhood Code: string (nullable = false)
 |-- Floor_7: integer (nullable = true)
 |-- Floor_en: integer (nullable = true)
 |-- Floor_-1: integer (nullable = true)
 |-- Floor_11: integer (nullable = true)
 |-- Floor_3: integer (nullable = true)
 |-- Floor_8: integer (nullable = true)
 |-- Floor_st: integer (nullable = true)
 |-- Floor_16: integer (nullable = true)
 |-- Floor_43: integer (nullable = true)
 |-- Floor_5: integer (nullable = true)
 |-- Floor_6: integer (nullable = true)
 |-- Floor_9: integer (nullable = true)
 |-- Floor_bj: integer (nullable = true)
 |-- Floor_1: integer (nullable = true)
 |-- Floor_10: integer (nullable = true)
 |-- Floor_4: integer (nullable = true)
 |-- Floor_12: integer

                                                                                

In [115]:
len(idealistaDF.columns)

34

In [20]:
# Analyzing 'Floor'
floorRDD = rdd.map(lambda x: (x['Floor'], 1)).reduceByKey(lambda a,b: a+b)
floorRDD.take(100)

[('7', 100),
 ('5', 293),
 ('2', 525),
 (None, 507),
 ('3', 500),
 ('bj', 389),
 ('4', 360),
 ('1', 779),
 ('en', 150),
 ('6', 143),
 ('st', 6),
 ('8', 90),
 ('10', 24),
 ('9', 25),
 ('16', 1),
 ('11', 4),
 ('13', 4),
 ('43', 1),
 ('-1', 2),
 ('12', 2)]

In [21]:
# Analyzing 'Operation'
operationRDD = rdd.map(lambda x: (x['Operation'], 1)).reduceByKey(lambda a,b: a+b)
operationRDD.take(100)

[('sale', 3905)]

In [22]:
# Analyzing 'Bedrooms'
bedroomsRDD = rdd.map(lambda x: (x['Bedrooms'], 1)).reduceByKey(lambda a,b: a+b)
bedroomsRDD.take(100)

[(1, 1811),
 (0, 2),
 (3, 335),
 (2, 1550),
 (4, 121),
 (6, 21),
 (7, 14),
 (8, 6),
 (5, 44),
 (9, 1)]

In [78]:
# Analyzing 'Rooms'
roomsRDD = rdd.map(lambda x: (x['Rooms'], 1)).reduceByKey(lambda a,b: a+b)
roomsRDD.take(100)

[(3, 1450),
 (2, 953),
 (0, 68),
 (6, 67),
 (5, 248),
 (1, 271),
 (4, 789),
 (7, 22),
 (8, 14),
 (10, 13),
 (11, 2),
 (12, 1),
 (13, 2),
 (9, 4),
 (15, 1)]

In [23]:
# Analyzing 'District Code'
districtRDD = rdd.map(lambda x: (x['District Name'], 1)).reduceByKey(lambda a,b: a+b)
districtRDD.take(100)

[('Sant Martí', 1),
 ('Ciutat Vella', 347),
 ('Nou Barris', 32),
 ('Sant Andreu', 10),
 ('Sants-Montjuïc', 1567),
 ('Sarrià-Sant Gervasi', 352),
 ('Horta-Guinardó', 111),
 ('Les Corts', 495),
 ('Eixample', 896),
 ('Gràcia', 94)]

In [88]:
# Analyzing 'propertyType'
propertyRDD = rdd.map(lambda x: (x['PropertyType'], 1)).reduceByKey(lambda a,b: a+b)
propertyRDD.take(50)

[('flat', 3340),
 ('studio', 62),
 ('penthouse', 224),
 ('chalet', 155),
 ('duplex', 123),
 ('countryHouse', 1)]

In [116]:

labelRDD = idealistaDF.rdd.map(lambda x: LabeledPoint(x['Price'], [x["Bedrooms"], x["Rooms"], x["Size"], x["District Code"], 
                                                        x["Neighborhood Code"], x[8], x[9],
                                                        x[10], x[11], x[12], x[13], x[14], x[15], x[16], x[17], x[18], x[19], 
                                                        x[20], x[21], x[22], x[23], x[24], x[25], x[26], x[27], x[28], x[29], 
                                                        x[30], x[31], x[32], x[33]]))


labelRDD.take(1)

[LabeledPoint(320000.0, [1.0,3.0,88.0,10.0,64.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0])]

In [103]:
labelRDD.count()

3905

In [117]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = labelRDD.randomSplit([0.7, 0.3], seed=42)

# Train a RandomForest model.
model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                    numTrees=5, maxDepth=7, maxBins=300, seed=42)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() / float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression forest model:')
print(model.toDebugString())


Test Mean Squared Error = 208891452731.2823
Learned regression forest model:
TreeEnsembleModel regressor with 5 trees

  Tree 0:
    If (feature 0 <= 3.5)
     If (feature 2 <= 132.5)
      If (feature 0 <= 1.5)
       If (feature 3 <= 2.5)
        If (feature 27 <= 0.5)
         If (feature 29 <= 0.5)
          If (feature 2 <= 81.5)
           Predict: 324083.3333333333
          Else (feature 2 > 81.5)
           Predict: 522500.0
         Else (feature 29 > 0.5)
          If (feature 20 <= 0.5)
           Predict: 343681.2921348315
          Else (feature 20 > 0.5)
           Predict: 299730.0
        Else (feature 27 > 0.5)
         If (feature 18 <= 0.5)
          Predict: 143545.45454545456
         Else (feature 18 > 0.5)
          Predict: 349000.0
       Else (feature 3 > 2.5)
        If (feature 4 <= 18.5)
         If (feature 1 <= 1.5)
          If (feature 27 <= 0.5)
           Predict: 176583.75
          Else (feature 27 > 0.5)
           Predict: 143026.66666666666
    

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def loadMongoDF(db, collection):
    '''
    Download data from mongodb and store it in DF format
    '''
    spark = SparkSession \
        .builder \
        .master(f"local[*]") \
        .appName("myApp") \
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
        .getOrCreate()

    dataDF = spark.read.format("mongo") \
        .option('uri', f"mongodb://10.4.41.48/{db}.{collection}") \
        .load()

    return dataDF, spark

## --------------- KPI 3 DF --------------- 
## --> predict number of rooms given a price and neigbirhood_id
dataDF, spark = loadMongoDF(db='formatted', collection='data')
subsetDF = dataDF.select('Neighborhood Id', 'Price', 'Rooms') \
                .withColumnRenamed("Neighborhood Id","Neighborhood_ID")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="Rooms", outputCol="indexedRooms").fit(subsetDF)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="Neighborhood_ID", outputCol="indexedNeighborhoodID", maxCategories=4).fit(subsetDF)

# Split the data into training and test sets (30% held out for testing)
(trainingDataDF, testDataDF) = subsetDF.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedRooms", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingDataDF)

# Make predictions.
predictions = model.transform(testDataDF)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only



                                                                                

AnalysisException: cannot resolve '`Neighbordhood Id`' given input columns: [Bathrooms, Date, District Code, District Name, Floor, Increased POP, Increased RFD, Latitude, Longitude, Monthly Price (€/month), Monthly Price Increase, Neighborhood, Neighborhood Code, Neighborhood Id, Operation, POP, Price, Property Code, PropertyType, RFD, Rooms, Size, Surface Price (€/m2), Surface Price Increase, _id];;
'Project [Rooms#95L, Price#91, 'Neighbordhood Id]
+- Relation[Bathrooms#75L,Date#76,District Code#77,District Name#78,Floor#79,Increased POP#80,Increased RFD#81,Latitude#82,Longitude#83,Monthly Price (€/month)#84,Monthly Price Increase#85,Neighborhood#86,Neighborhood Code#87,Neighborhood Id#88,Operation#89,POP#90,Price#91,Property Code#92,PropertyType#93,RFD#94,Rooms#95L,Size#96,Surface Price (€/m2)#97,Surface Price Increase#98,_id#99] MongoRelation(MongoRDD[14] at RDD at MongoRDD.scala:51,Some(StructType(StructField(Bathrooms,LongType,true), StructField(Date,StringType,true), StructField(District Code,StringType,true), StructField(District Name,StringType,true), StructField(Floor,StringType,true), StructField(Increased POP,DoubleType,true), StructField(Increased RFD,DoubleType,true), StructField(Latitude,DoubleType,true), StructField(Longitude,DoubleType,true), StructField(Monthly Price (€/month),DoubleType,true), StructField(Monthly Price Increase,DoubleType,true), StructField(Neighborhood,StringType,true), StructField(Neighborhood Code,StringType,true), StructField(Neighborhood Id,StringType,true), StructField(Operation,StringType,true), StructField(POP,DoubleType,true), StructField(Price,DoubleType,true), StructField(Property Code,StringType,true), StructField(PropertyType,StringType,true), StructField(RFD,DoubleType,true), StructField(Rooms,LongType,true), StructField(Size,DoubleType,true), StructField(Surface Price (€/m2),DoubleType,true), StructField(Surface Price Increase,DoubleType,true), StructField(_id,StructType(StructField(oid,StringType,true)),true))))


In [17]:
import pyspark.sql.functions as F

subsetDF = dataDF.select('Neighborhood Id', 'Price', 'Rooms') \
                .withColumnRenamed("Neighborhood Id","Neighborhood_ID")
subsetDF.printSchema()

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol='Neighborhood_ID', outputCol='Neighborhood_ID'+"_OneHot").fit(subsetDF) for column in ['Neighborhood_ID']]

pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(subsetDF).transform(subsetDF)
df_indexed.show()

#modelDF.printSchema()

root
 |-- Neighborhood_ID: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Rooms: long (nullable = true)



22/06/17 02:34:37 ERROR MongoRDD: 
-----------------------------
-----------------------------

Partitioning using the 'DefaultMongoPartitioner$' failed.

Please check the stacktrace to determine the cause of the failure or check the Partitioner API documentation.
Note: Not all partitioners are suitable for all toplogies and not all partitioners support views.%n

-----------------------------

22/06/17 02:34:37 WARN DAGScheduler: Creating new stage failed due to exception - job: 7
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=10.4.41.48:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:177)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(

Py4JJavaError: An error occurred while calling o227.fit.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=10.4.41.48:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:177)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:147)
	at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:98)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:278)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:182)
	at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:194)
	at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:163)
	at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:158)
	at com.mongodb.spark.MongoConnector.$anonfun$hasSampleAggregateOperator$1(MongoConnector.scala:234)
	at com.mongodb.spark.MongoConnector.$anonfun$withDatabaseDo$1(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
	at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:234)
	at com.mongodb.spark.rdd.partitioner.DefaultMongoPartitioner.partitions(DefaultMongoPartitioner.scala:33)
	at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:135)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:292)
	at org.apache.spark.scheduler.DAGScheduler.createShuffleMapStage(DAGScheduler.scala:410)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateShuffleMapStage(DAGScheduler.scala:379)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$getOrCreateParentStages$1(DAGScheduler.scala:491)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:48)
	at scala.collection.SetLike.map(SetLike.scala:104)
	at scala.collection.SetLike.map$(SetLike.scala:104)
	at scala.collection.mutable.AbstractSet.map(Set.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:490)
	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:477)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1009)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2196)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940)
	at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
	at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:241)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:145)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [12]:
labelIndexer = StringIndexer(inputCol="Rooms", outputCol="indexedRooms").fit(subsetDF)

22/06/17 02:13:39 ERROR MongoRDD: 
-----------------------------
-----------------------------

Partitioning using the 'DefaultMongoPartitioner$' failed.

Please check the stacktrace to determine the cause of the failure or check the Partitioner API documentation.
Note: Not all partitioners are suitable for all toplogies and not all partitioners support views.%n

-----------------------------

22/06/17 02:13:39 WARN DAGScheduler: Creating new stage failed due to exception - job: 4
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=10.4.41.48:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:177)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(

Py4JJavaError: An error occurred while calling o137.fit.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=10.4.41.48:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:177)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:147)
	at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:98)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:278)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:182)
	at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:194)
	at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:163)
	at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:158)
	at com.mongodb.spark.MongoConnector.$anonfun$hasSampleAggregateOperator$1(MongoConnector.scala:234)
	at com.mongodb.spark.MongoConnector.$anonfun$withDatabaseDo$1(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
	at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:234)
	at com.mongodb.spark.rdd.partitioner.DefaultMongoPartitioner.partitions(DefaultMongoPartitioner.scala:33)
	at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:135)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:292)
	at org.apache.spark.scheduler.DAGScheduler.createShuffleMapStage(DAGScheduler.scala:410)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateShuffleMapStage(DAGScheduler.scala:379)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$getOrCreateParentStages$1(DAGScheduler.scala:491)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:48)
	at scala.collection.SetLike.map(SetLike.scala:104)
	at scala.collection.SetLike.map$(SetLike.scala:104)
	at scala.collection.mutable.AbstractSet.map(Set.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:490)
	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:477)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1009)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2196)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940)
	at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
	at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:241)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:145)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
labelIndexer

In [None]:
## --------------- KPI 3 DF --------------- 
## --> predict number of rooms given a price and neigbirhood_id
dataDF, spark = loadMongoDF(db='formatted', collection='data')
subsetDF = dataDF.select('Rooms', 'Price', 'Neighbordhood Id')

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="Rooms", outputCol="indexedRooms").fit(dataDF)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="Neighborhood Id", outputCol="indexedNeighborhoodID", maxCategories=4).fit(dataDF)

# Split the data into training and test sets (30% held out for testing)
(trainingDataDF, testDataDF) = dataDF.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingDataDF)

# Make predictions.
predictions = model.transform(testDataDF)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

In [None]:
## --------------- KPI 3 RDD --------------- 
## --> predict number of rooms given a price and neigbirhood_id
rdd_data, spark = loadMongoRDD(db='formatted', collection='data')
# we do one-hot-encoding for the categoric variable "Neighborhood Id"
df1 = rdd_data.map(lambda x: (x['Rooms'], x['Price'], x['Neighborhood Id'])).toDF(['Rooms', 'Price','Neighborhood Id'])
categories_id = df1.select('Neighborhood Id').distinct().rdd.flatMap(lambda x: x).collect()
exprs = [F.when(F.col('Neighborhood Id') == category, 1).otherwise(0).alias(category) for category in categories_id]
df2 = df1.select('Rooms','Price', *exprs)
# we store the df with the hot encode done to a RDD again
rdd3 = df2.rdd

# we start the model
labelRDD = rdd3.map(lambda x: LabeledPoint(x[0], [x[1:]]))
(trainingData, testData) = labelRDD.randomSplit([0.7, 0.3], seed=42)

numClasses = len(df1.select('Rooms').distinct().rdd.flatMap(lambda x: x).collect()) + 1
model = RandomForest.trainClassifier(trainingData, 
                                        numClasses=numClasses, 
                                        categoricalFeaturesInfo={}, 
                                        numTrees=3, 
                                        featureSubsetStrategy="auto", 
                                        impurity='gini', 
                                        maxDepth=4, 
                                        maxBins=32)

predictions = model.predict(testData.map(lambda x: x.features))

labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

testErr = labelsAndPredictions.filter(lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))

# save the final model
spark.stop()
sc = SparkContext(appName="rf")
model.save(sc, 'exploitation/RFModel')