# Batch Predict
This notebook performs batch inference using the pre-trained, registered mlflow model. Use the **bci-avm-dask** cluster with the **Train** notebook
to see the model training.


<p align="center">
<img width=25% src="https://blockchainclimate.org/wp-content/uploads/2020/11/cropped-BCI_Logo_LR-400x333.png" alt="bciAVM" height="300"/>
</p>

[![PyPI](https://badge.fury.io/py/bciavm.svg?maxAge=2592000)](https://badge.fury.io/py/bciavm)
[![PyPI Stats](https://img.shields.io/badge/bciavm-avm-blue)](https://pypistats.org/packages/bciavm)


This notebook contains code to take a `mlflow` registered model and distribute its work with a `Dask` cluster. 
<table>
    <tr>
        <td>
            <img width=25% src="https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-resources/dask.png" width="300">
        </td>
    </tr>
</table>

The Blockchain & Climate Institute (BCI) is a progressive think tank providing leading expertise in the deployment of emerging technologies for climate and sustainability actions. 

As an international network of scientific and technological experts, BCI is at the forefront of innovative efforts, enabling technology transfers, to create a sustainable and clean global future.

# Automated Valuation Model (AVM) 

### About
AVM is a term for a service that uses mathematical modeling combined with databases of existing properties and transactions to calculate real estate values. 
The majority of automated valuation models (AVMs) compare the values of similar properties at the same point in time. 
Many appraisers, and even Wall Street institutions, use this type of model to value residential properties. (see [What is an AVM](https://www.investopedia.com/terms/a/automated-valuation-model.asp) Investopedia.com)

For more detailed info about the AVM, please read the **About** paper found here `resources/2021-BCI-AVM-About.pdf`.

### Valuation Process
<img src="resources/valuation_process.png" height="360" >

**Key Functionality**

* **Supervised algorithms** 
* **Tree-based & deep learning algorithms** 
* **Feature engineering derived from small clusters of similar properties** 
* **Ensemble (value blending) approaches** 

### Set the required AWS Environment Variables
```shell
export ACCESS_KEY=YOURACCESS_KEY
export SECRET_KEY=YOURSECRET_KEY
export BUCKET_NAME=bci-transition-risk-data
export TABLE_DIRECTORY=/dbfs/FileStore/tables/
```

### Next Steps
Read more about bciAVM on our [documentation page](https://blockchainclimate.org/thought-leadership/#blog):

### How does it relate to BCI Risk Modeling?
<img src="resources/bci_flowchart_2.png" height="280" >


### Technical & financial support for development provided by:
<a href="https://www.gcode.ai">
    <img width=15% src="https://staticfiles-img.s3.amazonaws.com/avm/gcode_logo.png" alt="GCODE.ai"  height="25"/>
</a>


### Install [from PyPI](https://pypi.org/project/bciavm/)
```shell
pip install bciavm
```

This notebook covers the following steps:
- Import data from your local machine into the Databricks File System (DBFS)
- Download data from s3
- Train a machine learning models (or more technically, multiple models in a stacked pipeline) on the dataset
- Register the model in MLflow

<hr>

## Environment Setup

In addition to the `bciavm` package, install the following additional non-builtin libraries:

* [dask-ml](https://github.com/dask/dask-ml)

In [3]:
import os
import time
import uuid
from datetime import datetime
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask
from dask.distributed import Client
import mlflow
from mlflow.tracking import MlflowClient
import bciavm
import re
from urllib.request import urlopen
import zipfile
from io import BytesIO
from dask.distributed import wait
import shutil
import gc

In [4]:
# shutil.rmtree('/dbfs/FileStore/tables/avm_output/')
_date = str(datetime.now())
# os.mkdir('/dbfs/FileStore/tables/avm/avm_output_'+_date)

In [5]:
spark.read.format("parquet").load("mnt/bct-transition-risk-data/epc_data/byLocation/DateRun_2021-02-07/{*}/domestic/certificates").createOrReplaceTempView("EPCData")

In [6]:
%sql
SELECT *
FROM EPCData

In [7]:
spark.createDataFrame(pd.read_csv("/dbfs/FileStore/tables/ukpostcodes.csv")).createOrReplaceTempView("sqlPostcodeLonLat")
spark.createDataFrame(pd.read_csv("/dbfs/FileStore/tables/postcode_outcodes.csv")).createOrReplaceTempView("sqlOutcodeLonLat")

In [8]:
%sql

CREATE OR REPLACE TEMPORARY VIEW EPCDataFeatures AS
SELECT BUILDING_REFERENCE_NUMBER AS unit_indx
,t1.POSTCODE
,SPLIT(t1.POSTCODE, " ")[0] AS POSTCODE_OUTCODE
,POSTTOWN AS POSTTOWN_e
,PROPERTY_TYPE AS PROPERTY_TYPE_e
,TOTAL_FLOOR_AREA AS TOTAL_FLOOR_AREA_e
,NUMBER_HEATED_ROOMS AS NUMBER_HEATED_ROOMS_e
,FLOOR_LEVEL AS FLOOR_LEVEL_e
,CASE WHEN t3.latitude IS NOT NULL THEN t3.latitude ELSE t4.latitude END AS Latitude_m
,CASE WHEN t3.longitude IS NOT NULL THEN t3.longitude ELSE t4.longitude END AS Longitude_m
,CASE WHEN CAST (RIGHT(LEFT(t1.POSTCODE, 2), 1) AS INT) IS NULL THEN LEFT(t1.POSTCODE, 2) ELSE LEFT(t1.POSTCODE, 1) END AS POSTCODE_AREA
,ROW_NUMBER() OVER (PARTITION BY BUILDING_REFERENCE_NUMBER ORDER BY INSPECTION_DATE DESC) AS rownum
FROM EPCData t1
LEFT JOIN sqlPostcodeLonLat t3 ON t1.POSTCODE = t3.Postcode
LEFT JOIN sqlOutcodeLonLat t4 ON SPLIT(t1.POSTCODE, " ")[0] = t4.postcode;

DROP TABLE IF EXISTS epcHomesToScore;

CREATE TABLE epcHomesToScore AS
SELECT unit_indx
,POSTCODE
,POSTCODE_OUTCODE
,POSTTOWN_e
,PROPERTY_TYPE_e
,TOTAL_FLOOR_AREA_e
,NUMBER_HEATED_ROOMS_e
,FLOOR_LEVEL_e
,Latitude_m
,Longitude_m
,POSTCODE_AREA
FROM EPCDataFeatures
WHERE rownum = 1;

SELECT *
FROM epcHomesToScore

In [9]:
data = spark.sql("SELECT * FROM epcHomesToScore").toPandas()

In [10]:
MLFLOW_TRACKING_URI = os.environ["MLFLOW_TRACKING_URI"]
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment('/Users/mike.casale@blockchainclimate.org/Experiments/batch-predict')

In [11]:
#TODO: merge w/ SQL preprocessing (above step)
data = bciavm.utils.bci_utils.preprocess_data(data.rename({'Postcode':'POSTCODE'},axis=1), 
                                              drop_outlier=False, 
                                              split_data=False)

data.to_csv('/dbfs/FileStore/tables/avm/epcPrice.csv')
data

In [12]:
input_example=data.dropna().sample(10)
input_example.dtypes

In [13]:
c = Client('127.0.0.1:8786')

print('waiting for workers...')
c.wait_for_workers(1)

print('done...')

In [14]:
def mlflow_load_model(pred_type=None, model_name='avm', model_version='Production'):
    """Loads model from mlflow.

    Returns:
        mlflow.pyfunc loaded model
    """
    if pred_type == 'conf':
      model_name='avm-conf'
      
    return mlflow.pyfunc.load_model(
        model_uri=f"models:/{model_name}/{model_version}"
    )   
  
model = mlflow_load_model()
conf_model = mlflow_load_model(pred_type='conf')

In [15]:
model.predict(input_example)

In [16]:
conf_model.predict(input_example)

In [17]:

def get_unics(data=None):
    """Gets sample=1 for each unique combination of POSTCODE_AREA + PROPERTY_TYPE_e
       This is used to compute the confidence for all other properties which share 
       the POSTCODE_AREA + PROPERTY_TYPE_e + NUMBER_HEATED_ROOMS_e + FLOOR_LEVEL_e combination

    Returns:
        pd.dataframe 
    """
    
    try: os.mkdir('/dbfs/FileStore/tables/avm/avm_conf/')
    except:pass
    
    
    if data is None:
        data = pd.read_csv('/dbfs/FileStore/tables/avm/epcPrice.csv')
    
    df = pd.DataFrame({})
    data['key'] = data['POSTCODE_AREA'] + data['PROPERTY_TYPE_e']
    unics = data['key'].unique()
    for u in unics:
      df = df.append(data[data['key']==u].sample(1))
    
    return df

def load_model():
    return bciavm.pipelines.RegressionPipeline.load('/dbfs/FileStore/artifacts/avm_pipeline_'+str(bciavm.__version__)+'.pkl')

def predict(X, model, columns=['avm']):
    """Main prediction logic
    Returns:
        pd.dataframe 
    """    
    X['key'] = X['POSTCODE_AREA'] + X['PROPERTY_TYPE_e'] 
    unit_index = X['unit_indx'].values
    key = X['key'].values
    resp = pd.DataFrame(model.predict(X).values, columns=columns)
    resp['unit_indx'] = unit_index
    resp['key'] = key
    del X
    gc.collect()
    return resp
    
def save(preds):
    filename='/dbfs/FileStore/tables/avm/avm_output/avm_output_'+str(datetime.now())+'.parquet.gzip'
    return preds.to_parquet(filename, compression='gzip')

def f(ct):
    for x in pd.read_csv('/dbfs/FileStore/tables/avm/epcPrice.csv', chunksize=500000):
        ct = ct + 1
        start_time = datetime.now()
        model = load_model()
        preds = predict(x, model)
        save(preds)
        end_time = datetime.now()
        del x
        del preds
        model = None
        gc.collect()
        print('Duration: {}'.format(end_time - start_time), ct)

    return ct
  
try: os.mkdir('/dbfs/FileStore/tables/avm/avm_output/')
except:pass

In [18]:
ct = 0
dask.compute(f(ct))

In [19]:
preds = dd.read_parquet('/dbfs/FileStore/tables/avm/avm_output/*.parquet.gzip', compression='gzip')
preds = preds.compute()
preds = preds.drop_duplicates('unit_indx')
preds

Unnamed: 0,avm,unit_indx,key
0,151990.171875,0,PLHouse
1,193218.078125,1,NRFlat
2,341111.875000,2,SMHouse
3,280344.187500,3,SOHouse
4,172852.671875,4,LHouse
...,...,...,...
216117,307351.437500,15716117,BNHouse
216118,154638.515625,15716118,COHouse
216119,532772.562500,15716119,WRHouse
216120,108124.789062,15716120,BHouse


In [20]:
#Get all unique POSTCODE_AREA + PROPERTY_TYPE_e
try:
  unics = get_unics(data=data)
except:
  unics = get_unics()
unics

In [21]:
try: os.mkdir('/dbfs/FileStore/tables/avm/avm_conf')
except: pass

unics.to_parquet('/dbfs/FileStore/tables/avm/avm_conf/unics.parquet.gzip', compression='gzip')

In [22]:
unics = dd.read_parquet('/dbfs/FileStore/tables/avm/avm_conf/unics.parquet.gzip', compression='gzip')
unics = unics.compute()
unics

Unnamed: 0.1,Unnamed: 0,unit_indx,POSTCODE,POSTCODE_OUTCODE,POSTCODE_AREA,POSTTOWN_e,PROPERTY_TYPE_e,TOTAL_FLOOR_AREA_e,NUMBER_HEATED_ROOMS_e,FLOOR_LEVEL_e,Latitude_m,Longitude_m,key
11013509,11013509,11013509,PL7 2GT,PL7,PL,PLYMOUTH,House,46.0,3.0,0.0,50.389810,-4.023573,PLHouse
6151308,6151308,6151308,NR17 2EH,NR17,NR,ATTLEBOROUGH,Flat,63.0,0.0,,52.516199,1.013926,NRFlat
4720629,4720629,4720629,SM6 8QB,SM6,SM,WALLINGTON,House,159.0,8.0,0.0,51.360429,-0.145775,SMHouse
10982040,10982040,10982040,SO16 0BP,SO16,SO,SOUTHAMPTON,House,64.0,0.0,0.0,50.939200,-1.466033,SOHouse
11356452,11356452,11356452,L33 1SA,L33,L,LIVERPOOL,House,86.0,5.0,0.0,53.497772,-2.876649,LHouse
...,...,...,...,...,...,...,...,...,...,...,...,...,...
7597236,7597236,7597236,RM3 7TZ,RM3,RM,ROMFORD,Park home,47.0,3.0,0.0,51.619894,0.215145,RMPark home
14140014,14140014,14140014,NE49 0QN,NE49,NE,HALTWHISTLE,Park home,63.0,4.0,0.0,54.922310,-2.471466,NEPark home
13656903,13656903,13656903,AL9 7HZ,AL9,AL,HATFIELD,Park home,61.0,3.0,0.0,51.739307,-0.211835,ALPark home
10593794,10593794,10593794,DG4 6NB,DG4,DG,SANQUHAR,Maisonette,160.0,6.0,1.0,55.387299,-4.003707,DGMaisonette


In [23]:
%%time

print('Building a dask dataframe...')
ddf = dd.from_pandas(unics, npartitions=8)
X_test_arr = dask.persist(ddf)
_ = wait(X_test_arr)
X_test_arr = X_test_arr[0]

cols=['unit_id','avm','avm_lower','avm_upper','conf','ts','latest_production_version','latest_staging_version']

print('Predicting...')
confs = X_test_arr.map_partitions(
        predict, 
        model=conf_model,
        columns=cols
).compute()

confs.to_parquet('/dbfs/FileStore/tables/avm/avm_conf/confs_output.parquet.gzip', compression='gzip')
confs

In [24]:
confs = dd.read_parquet('/dbfs/FileStore/tables/avm/avm_conf/confs_output.parquet.gzip', compression='gzip')
confs = confs.compute()
confs

Unnamed: 0,unit_id,avm,avm_lower,avm_upper,conf,ts,latest_production_version,latest_staging_version,unit_indx,key
0,48571,357471.0,265926.0,439252.0,0.743909,1623744165.489511,29,,48571,GUBungalow
1,60051,68495.0,,,,1623744171.328713,29,,60051,NRPark home
2,139243,243433.0,,,,1623744174.346069,29,,139243,BAPark home
3,158227,250987.0,204725.0,412575.0,,1623744178.011655,29,,158227,WFHouse
4,207589,151513.0,122096.0,200466.0,0.676902,1623744184.665239,29,,207589,BHouse
...,...,...,...,...,...,...,...,...,...,...
59,15506551,196848.0,168859.0,255675.0,0.701156,1623744778.547672,29,,15506551,EXHouse
60,15565619,165163.0,,,,1623744784.160804,29,,15565619,HGPark home
61,15616418,58388.0,32524.0,123120.0,,1623744787.068207,29,,15616418,DHFlat
62,15689393,223733.0,199122.0,265645.0,0.812672,1623744792.967538,29,,15689393,RGHouse


In [25]:
def correct(predictions, conf_min=0.5):
    predictions[ 'avm' ] = round(predictions[ 'avm' ].astype(float), 0)
    predictions[ 'conf' ] = round(predictions[ 'conf' ].astype(float), 2)
    try :
      predictions[ 'avm' ] = np.where(predictions[ 'avm' ].astype(float) < 0.0, np.nan, predictions[ 'avm' ].astype(float))
    except :
      pass
    try :
      predictions[ 'avm_lower' ] = np.where(predictions[ 'avm_lower' ].astype(float) < 0.0, np.nan, predictions[ 'avm_lower' ].astype(float))
    except :
      pass
    try :
      predictions[ 'avm_upper' ] = np.where(predictions[ 'avm_upper' ].astype(float) < 0.0, np.nan, predictions[ 'avm_upper' ].astype(float))
    except :
      pass
    try :
      predictions[ 'avm_lower' ] = np.where(predictions[ 'avm_lower' ].astype(float) > predictions[ 'avm' ].astype(float), np.nan,
                                   predictions[ 'avm_lower' ].astype(float))
    except :
      pass
    try :
      predictions[ 'avm_upper' ] = np.where(predictions[ 'avm_upper' ].astype(float) < predictions[ 'avm' ].astype(float), np.nan,
                                   predictions[ 'avm_upper' ].astype(float))
    except :
      pass
    try :
      predictions.name = self.input_target_name
    except :
      pass

    try :
      predictions[ 'conf' ] = np.where(predictions[ 'conf' ].astype(float) < conf_min, '< 0.5',
                                   predictions[ 'conf' ].astype(float))
    except :
      pass
    
    predictions[ 'conf' ] = np.where(np.isnan(predictions[ 'avm_upper' ]), np.nan, predictions[ 'conf' ])
    predictions[ 'conf' ] = np.where(np.isnan(predictions[ 'avm_lower' ]), np.nan, predictions[ 'conf' ])
    return predictions

In [26]:
combined = preds.merge(confs.drop(['unit_indx', 'avm'],axis=1), on='key', how='left')
lower = combined['avm_lower'] / combined['avm'] - 1.0  
upper = combined['avm_upper'] / combined['avm'] - 1.0
combined['avm_upper'] = upper
combined['avm_lower'] = lower

combined[ 'avm_lower' ] = round(
            combined[ 'avm' ].astype(float) + combined[ 'avm' ].astype(float) * combined[ 'avm_lower' ].astype(float),
            0)
combined[ 'avm_upper' ] = round(
            combined[ 'avm' ].astype(float) + combined[ 'avm' ].astype(float) * combined[ 'avm_upper' ].astype(float),
            0)

combined['fsd'] = np.where((combined[ 'avm' ] - combined[ 'avm_lower' ]) >= (combined[ 'avm_upper' ] - combined[ 'avm' ]), combined[ 'avm' ] - combined[ 'avm_lower' ], combined[ 'avm_upper' ] - combined[ 'avm' ])

conf = 1.0 - combined['fsd'] / combined[ 'avm' ]
combined['conf'] = conf
combined = correct(combined, conf_min=0.5)
combined = combined.drop(['latest_staging_version', 'unit_id'], axis=1)
combined['conf'] = combined['conf'].fillna('< 0.5')
combined['avm_lower'] = np.where(np.isnan(combined['avm_lower']), combined['avm'] - combined['fsd'], combined['avm_lower'])
combined['avm_upper'] = np.where(np.isnan(combined['avm_upper']), combined['avm'] + combined['fsd'], combined['avm_upper'])
combined['avm_lower'] = np.where(combined['avm_lower'] < 0, 0.0, combined['avm_lower'])
combined = combined.drop('key',axis=1)
combined['avm_upper'] = round(combined['avm_upper'], 0)
combined['avm_lower'] = round(combined['avm_lower'], 0)
combined['fsd'] = round(combined['fsd'], 0)
combined

Unnamed: 0,avm,unit_indx,avm_lower,avm_upper,conf,ts,latest_production_version,fsd
0,151990.0,0,117895.0,180902.0,0.78,1623744632.342138,29,34095.0
1,193218.0,1,164335.0,244350.0,0.74,1623744483.552008,29,51132.0
2,341112.0,2,6714.0,675510.0,< 0.5,1623744232.581653,29,334398.0
3,280344.0,3,150273.0,410415.0,< 0.5,1623744615.831788,29,130071.0
4,172853.0,4,58294.0,287412.0,< 0.5,1623744683.505588,29,114559.0
...,...,...,...,...,...,...,...,...
15716117,307351.0,15716117,0.0,677605.0,< 0.5,1623744253.444732,29,370254.0
15716118,154639.0,15716118,0.0,410505.0,< 0.5,1623744228.757267,29,255866.0
15716119,532773.0,15716119,233575.0,831971.0,< 0.5,1623744359.778892,29,299198.0
15716120,108125.0,15716120,15784.0,200466.0,< 0.5,1623744184.665239,29,92341.0


In [27]:
_date = str(datetime.now().date())
_date

In [28]:
combined.to_parquet('/dbfs/FileStore/tables/avm/final_output_'+_date+'.parquet.gzip', compression='gzip')

In [29]:
spark_df = spark.createDataFrame(combined)

spark_df.write.mode("overwrite").saveAsTable("/dbfs/FileStore/tables/avm_output_"+_date)