# Modeling Heating Problems in Manhattan

In this notebook we will continue to use the data from the first notebook, a combination of NYC 311 Complaints data and PLUTO land use and geographic data. The purpose of this notebook is to demonstrate the methodology of building a predictive model with Spark.

***

## Read the Data
We previously saved our data in the Object Store; the following code provides the appropriate credentials to be able to access and read our data.

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('NewYorkCase').getOrCreate()

In [3]:
modelData=spark.read.csv('../datasets/mnModelData.csv',header=True,inferSchema=True)

In [5]:
modelData.show()

+-----+-------------+-------+---------+----------+-----------+---------+--------+
|  Zip|ZipHeatingCnt|Borough|BldgCount|avgBldgAge|avgBldgArea|  avgXcrd| avgYCrd|
+-----+-------------+-------+---------+----------+-----------+---------+--------+
|10012|            1|     MN|     1168|     143.0|    23483.0| 984755.0|203548.0|
|10011|            1|     MN|     2080|     145.0|    26692.0| 984425.0|209172.0|
|10014|            1|     MN|     1959|     148.0|    17391.0| 982940.0|206799.0|
|10463|            1|     MN|      220|     204.0|    15722.0|1008961.0|258548.0|
|10003|            3|     MN|     1881|     139.0|    28940.0| 987540.0|205715.0|
|10040|            1|     MN|      429|     188.0|    40602.0|1003705.0|251912.0|
|10022|            3|     MN|     1101|     189.0|    82191.0| 993309.0|215562.0|
|10065|            1|     MN|     1052|     127.0|    42394.0| 993913.0|217993.0|
|10035|            3|     MN|     1514|     443.0|    13521.0|1001455.0|231364.0|
|10028|         

In [6]:
modelData.printSchema()

root
 |-- Zip: integer (nullable = true)
 |-- ZipHeatingCnt: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- BldgCount: integer (nullable = true)
 |-- avgBldgAge: double (nullable = true)
 |-- avgBldgArea: double (nullable = true)
 |-- avgXcrd: double (nullable = true)
 |-- avgYCrd: double (nullable = true)



## Feature Selection

We decided to keep things simple for this example problem so don't have too many to choose from but let's evaluate our features to see which should be included. We can use Pearson's correlation to do this. This calculation takes two numeric columns of equal length and returns a value between 1 and -1. If something close to 1 is returned, there is a strong positive correlation, meaning that if the value in one of the columns increases, so will the value in the other column. A -1 value would indicate the opposite, as the value in one column increases, the other would decrease. A result close to 0 would indicate there is no clear relationship.

In [7]:
modelData.stat.corr("ZipHeatingCnt", "BldgCount")

0.018134457383525877

In [8]:
modelData.stat.corr("ZipHeatingCnt", "avgBldgAge")

0.1455650158192505

In [9]:
modelData.stat.corr("ZipHeatingCnt", "avgBldgArea")

0.001339762980715712

This shows that there is a medium negative correlation between the number of complaints and the average building area; that the larger the average building area, the lower the number of complaints. To improve the model, it might be worth understanding what times of buildings these are, e.g. are they business rather than residential?

There are very low correlations for both building count and average building age (if any) indicating they are likely to only add little value if we include them in our model and we definitely couldn't use these as on there own to predict the number of complaints.

We will go ahead with all three features.

## Random Forest Regression Modeling

The target variable we wish to predict is a linear value, therefore Random Forest Regression is an ideal candidate.

We're not going to do any further Feature Extraction or Transformations in this notebook so let's remind ourselves of our input data we will be using

In [10]:
from pyspark.ml.feature import VectorAssembler
FeatureAssembler=VectorAssembler(inputCols=['avgBldgArea','avgBldgAge','BldgCount'], outputCol="features")


In [11]:
#modelDataTran = FeatureAssembler.transform(modelData)
#modelDataTran = modelDataTran.select(['features', 'ZipHeatingCnt'])
#modelDataTran.show(10,False)

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
 
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
#featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(modelDataTran)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = modelData.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="features",labelCol='ZipHeatingCnt')
#rf = RandomForestRegressor(featuresCol="features",labelCol='ZipHeatingCnt')

# Chain indexer and forest in a Pipeline
#pipeline = Pipeline(stages=[featureIndexer, rf])
pipeline = Pipeline(stages=[FeatureAssembler, rf])

# Train model.  This also runs the indexer.
RF_Model = pipeline.fit(trainingData)
#RF_Model = rf.fit(trainingData)


In [141]:
preds_train = RF_Model.transform(trainingData)
preds_train.select("prediction","ZipHeatingCnt","features").show(5,False)

+------------------+-------------+----------------------+
|prediction        |ZipHeatingCnt|features              |
+------------------+-------------+----------------------+
|1367.7166666666667|1360         |[28940.0,139.0,1881.0]|
|163.95            |3            |[293074.0,264.0,119.0]|
|640.5383333333333 |7            |[348700.0,131.0,78.0] |
|435.58333333333337|23           |[150624.0,230.0,229.0]|
|1589.8079166666669|1609         |[23049.0,291.0,1343.0]|
+------------------+-------------+----------------------+
only showing top 5 rows



## Evaluation

In order to see how well our model works, we test is. Both our training data and test data have the correct results so we are able to compare the two for each row to understand the model's accuracy. 

In production, as you are trying to prediction something before it happens, you wouldn't have this so you would need to continue to test offline to ensure that your model is still valid.

We can going to generate predictions for both our training data and testing data to see the difference.

In [13]:
preds_train = RF_Model.transform(trainingData)
preds_train.select("prediction","ZipHeatingCnt","features").show(5,False)

+----------+-------------+----------------------+
|prediction|ZipHeatingCnt|features              |
+----------+-------------+----------------------+
|1.25      |1            |[26692.0,145.0,2080.0]|
|1.2       |1            |[51857.0,199.0,1409.0]|
|1.25      |1            |[97725.0,206.0,1100.0]|
|1.4       |1            |[18990.0,266.0,2258.0]|
|1.15      |1            |[31134.0,103.0,1207.0]|
+----------+-------------+----------------------+
only showing top 5 rows



In [14]:
# Make predictions.
predictions = RF_Model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "ZipHeatingCnt", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="ZipHeatingCnt", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = RF_Model.stages[1]
print(rfModel)  # summary only

+----------+-------------+--------------------+
|prediction|ZipHeatingCnt|            features|
+----------+-------------+--------------------+
|      1.25|            3|[28940.0,139.0,18...|
|       1.2|            1|[23483.0,143.0,11...|
|      1.35|            1|[31299.0,190.0,16...|
|      1.55|            1|[17391.0,148.0,19...|
|      1.25|            2|[37667.0,122.0,11...|
+----------+-------------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 1.4346
RandomForestRegressionModel (uid=rfr_23762f363ea4) with 20 trees


RMSE values are the same unit as the label feature. Therefore, one way to understand if these are good values, would be to look at the average label value; if a prediction that was smaller or larger by the RMSE value is deemed acceptable, then it is good.

In [15]:
from pyspark.sql.functions import col, avg
preds_train.agg(avg(col("ZipHeatingCnt"))).show()

+------------------+
|avg(ZipHeatingCnt)|
+------------------+
|1.4166666666666667|
+------------------+



Ok, so we shouldn't use this predictive model in practice. Since we only have 43 rows in total, let's have a look at all the label and predictions to see how far off they are

In [16]:
preds_train.withColumn('diff', col('prediction')-col('ZipHeatingCnt')).show()

+-----+-------------+-------+---------+----------+-----------+---------+--------+--------------------+----------+--------------------+
|  Zip|ZipHeatingCnt|Borough|BldgCount|avgBldgAge|avgBldgArea|  avgXcrd| avgYCrd|            features|prediction|                diff|
+-----+-------------+-------+---------+----------+-----------+---------+--------+--------------------+----------+--------------------+
|10011|            1|     MN|     2080|     145.0|    26692.0| 984425.0|209172.0|[26692.0,145.0,20...|      1.25|                0.25|
|10016|            1|     MN|     1409|     199.0|    51857.0| 989921.0|210948.0|[51857.0,199.0,14...|       1.2| 0.19999999999999996|
|10019|            1|     MN|     1100|     206.0|    97725.0| 988181.0|217985.0|[97725.0,206.0,11...|      1.25|                0.25|
|10027|            1|     MN|     2258|     266.0|    18990.0| 998178.0|234378.0|[18990.0,266.0,22...|       1.4|  0.3999999999999999|
|10028|            1|     MN|     1207|     103.0|    3



## Unseen Data

Now we are replicating what would happen in practice, new data would be passed to the model and a prediction returned. This is the value that would be actioned on in production, i.e. the appropriate 311 agency would allocate the number of staff to particular zipcodes to meet the expected demand

As before, let's read the data from Object store that we previously saved in the first notebook

In [17]:
mndf=spark.read.csv('../datasets/mnAgrData.csv',header=True,inferSchema=True)

In [18]:
mndf.count()

56

In [19]:
mndf.printSchema()


root
 |-- Borough: string (nullable = true)
 |-- ZipCode: integer (nullable = true)
 |-- BldgCount: integer (nullable = true)
 |-- avgBldgArea: double (nullable = true)
 |-- avgBldgAge: double (nullable = true)
 |-- avgXcrd: double (nullable = true)
 |-- avgYcrd: double (nullable = true)



In [20]:
#mndfTran = FeatureAssembler.transform(mndf.filter(col('avgBldgAge')!='null'))
mndfTran = mndf.filter(col('avgBldgAge')!='null')
predMN = RF_Model.transform(mndfTran)
predMN.printSchema()

root
 |-- Borough: string (nullable = true)
 |-- ZipCode: integer (nullable = true)
 |-- BldgCount: integer (nullable = true)
 |-- avgBldgArea: double (nullable = true)
 |-- avgBldgAge: double (nullable = true)
 |-- avgXcrd: double (nullable = true)
 |-- avgYcrd: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [21]:
predMN.show()

+-------+-------+---------+-----------+----------+---------+--------+--------------------+----------+
|Borough|ZipCode|BldgCount|avgBldgArea|avgBldgAge|  avgXcrd| avgYcrd|            features|prediction|
+-------+-------+---------+-----------+----------+---------+--------+--------------------+----------+
|     MN|  10119|        2|  1179439.0|      72.0| 988320.0|214761.0|[1179439.0,72.0,2.0]|     1.575|
|     MN|  10069|       18|   325768.0|     571.0| 987269.0|222199.0|[325768.0,571.0,1...|      1.45|
|     MN|  10003|     1881|    28940.0|     139.0| 987540.0|205715.0|[28940.0,139.0,18...|      1.25|
|     MN|  10017|      490|   158786.0|     197.0| 991710.0|213682.0|[158786.0,197.0,4...|      1.65|
|     MN|   null|      169|      357.0|    1958.0| 997644.0|217413.0|[357.0,1958.0,169.0]|      1.45|
|     MN|  10019|     1100|    97725.0|     206.0| 988181.0|217985.0|[97725.0,206.0,11...|      1.25|
|     MN|  10002|     1707|    25474.0|     231.0| 987075.0|200573.0|[25474.0,231.

## Save Model in ML repository

In [22]:
from dsx_ml.ml import save

model_name = 'NYC-RF-Model'
save(name = model_name,
     model = RF_Model,
     algorithm_type = 'Regression',
     test_data = testData)



Using TensorFlow backend.


{'path': '/user-home/999/DSX_Projects/NYCVK/models/NYC-RF-Model/1',
 'scoring_endpoint': 'https://dsxl-api/v3/project/score/Python27/spark-2.0/NYCVK/NYC-RF-Model/1'}

## Note above path and scoring_endpoint that you will use in following cell for REST API call
For e.g. {'path': '/user-home/999/DSX_Projects/NYCVK/models/NYC-RF-Model/1',
 'scoring_endpoint': 'https://dsxl-api/v3/project/score/Python27/spark-2.0/NYCVK/NYC-RF-Model/1'}
 

## Test Model in Web UI

Step1 Click on the project name "NYC-case-new" 

Step2 Go to "Model" part, find the mode "NYC-RF-Model_v3" and then select "Test".
<img src="../datasets/ModelTest.png">

Step3 Input value for each features and then click on "Submit".
<img src="../datasets/TestResult.png">

## Test Model in Restful API

In [23]:
modelData.show(2)

+-----+-------------+-------+---------+----------+-----------+--------+--------+
|  Zip|ZipHeatingCnt|Borough|BldgCount|avgBldgAge|avgBldgArea| avgXcrd| avgYCrd|
+-----+-------------+-------+---------+----------+-----------+--------+--------+
|10012|            1|     MN|     1168|     143.0|    23483.0|984755.0|203548.0|
|10011|            1|     MN|     2080|     145.0|    26692.0|984425.0|209172.0|
+-----+-------------+-------+---------+----------+-----------+--------+--------+
only showing top 2 rows



In [24]:
json_payload = [{
    "Zip":10012,
    "Borough":"MN",
    "BldgCount":1168,
    "avgBldgAge":143.0,
    "avgBldgArea":23483.0,
    "avgXcrd":984755.0,
    "avgYCrd":203548.0
}]

In [26]:
import requests, json
from pprint import pprint

scoring_endpoint = 'https://dsxl-api/v3/project/score/Python27/spark-2.0/NYCVK/NYC-RF-Model/1'

header_online = {'Content-Type': 'application/json', 'Authorization':os.environ['DSX_TOKEN']}

response_scoring = requests.post(scoring_endpoint, json=json_payload, headers=header_online)

response_dict = json.loads(response_scoring.content)
print("Prediction")

n = 1
for response in response_dict['object']['output']['predictions']:
    print("{}. {}".format(n,response))
    n+=1

Prediction
1. 1
