## Batch Transform (in dev)

This is a placeholder notebook for running a batch transform on new time series data, with latest model, and exporting/logging predictions (along with gt data since we have it) for ingestion into DMM

In [1]:
#import the packages we need
%matplotlib inline
import pandas as pd
import datetime

import pystan
from fbprophet import Prophet

In [2]:
import os
import numpy as np

In [3]:
import pickle

In [4]:
import boto3
from botocore.exceptions import NoCredentialsError

In [5]:
from datetime import date

## pseudocode

### 1. access latest data
 - read csv from Domino Dataset
 - depends on scheduled job for data ingestion
 - create copy of ingestion job and adjust so each refresh's data is distinct, rather than rolling 30 day windw


In [6]:
import datetime
import requests
import subprocess

def refresh(save_location):
    ## save_location example: '/domino/datasets/domino-goyetc/Power-Gen-Total/scratch/data.csv'
    today = datetime.datetime.today().strftime('%Y-%m-%d')
    one_month = (datetime.datetime.today() - datetime.timedelta(1)).strftime('%Y-%m-%d')

    bashCommand = ("curl -o  https://www.bmreports.com/bmrs/?"
                   "q=ajax/filter_csv_download/FUELHH/csv/FromDate%3D{one_month}%26ToDate%3D{today}/&"
                   "filename=GenerationbyFuelType_20191002_1657")
    
    #process = subprocess.run(bashCommand.split())
    process = subprocess.run(["curl", "-o", save_location, "https://www.bmreports.com/bmrs/?"
                   "q=ajax/filter_csv_download/FUELHH/csv/FromDate%3D{"+one_month+"}%26ToDate%3D{"+today+"}/&"
                   "filename=GenerationbyFuelType_20191002_1657"])
    return True

refresh('/domino/datasets/domino-goyetc/Power-Gen-Total/scratch/new_inputs.csv')

True

In [7]:
#read in our data
df = pd.read_csv('/domino/datasets/domino-goyetc/Power-Gen-Total/scratch/new_inputs.csv', skiprows=1, skipfooter=1, header=None, engine='python')

In [8]:
#rename the columns
df.columns = ['HDF', 'date', 'half_hour_increment',
              'CCGT', 'OIL', 'COAL', 'NUCLEAR',
              'WIND', 'PS', 'NPSHYD', 'OCGT',
              'OTHER', 'INTFR', 'INTIRL', 'INTNED',
               'INTEW', 'BIOMASS', 'INTEM']

In [9]:
#Create a new column datetime that represents the starting datetime of the measured increment
df['datetime'] = pd.to_datetime(df['date'], format="%Y%m%d")
df['datetime'].describe()

count                      86
unique                      2
top       2020-07-26 00:00:00
freq                       48
first     2020-07-26 00:00:00
last      2020-07-27 00:00:00
Name: datetime, dtype: object

In [10]:
df['datetime'] = df.apply(lambda x:x['datetime']+ datetime.timedelta(minutes=30*(int(x['half_hour_increment'])-1)), 
                          axis = 1)

### Note: data transformation here converts target variable observed in Intro to Domino documentation (CCGT power production) into _cumulative_ power consumption

In [11]:
#save for ground truth
df['y'] = df.drop(['date','HDF','half_hour_increment'],axis=1).sum(axis=1)

In [12]:
#Prep our data - for Facebook Prophet, the time series data needs to be in a DataFrame with 2 columns named ds and y
df_for_prophet = df[['datetime', 'y']].rename(columns = {'datetime':'ds'})

In [13]:
#Save for logging to DMM
X = df_for_prophet.copy()
y = df_for_prophet['y']

In [14]:
len(y)

86

In [15]:
X.tail()

Unnamed: 0,ds,y
81,2020-07-27 16:30:00,29195
82,2020-07-27 17:00:00,29698
83,2020-07-27 17:30:00,29580
84,2020-07-27 18:00:00,29182
85,2020-07-27 18:30:00,28736


### 2. load latest model
 - Trained models are meant to be used. There is no reason to re-train the model each time you use the model. Export or serialize the model to a file to load and reuse the model later. In Python, the pickle module implements protocols for serializing and de-serializing objects. 
 - load model.pkl

In [16]:
with open('model.pkl', 'rb') as f:
    m = pickle.load(f)

### 3. use loaded model to compute predictions of 'test data', aka latest loaded data
 - note that this test data also contains the ground truth, since it is (in reality) historical


Note: don't need to create future dataframe, because we already have the data, in this demo example

..instead, predict on 'ds' column isolated from new input data

In [17]:
forecast = m.predict(X[['ds']])

#inspect the dataframe
forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
81,2020-07-27 16:30:00,24440.104357,15347.077257,35422.291827
82,2020-07-27 17:00:00,25216.109977,16134.414777,36274.348422
83,2020-07-27 17:30:00,26020.259422,16384.01517,36945.469108
84,2020-07-27 18:00:00,26773.001847,17687.118458,37875.590647
85,2020-07-27 18:30:00,27390.133739,17973.793638,38413.306717


In [18]:
len(forecast)

86

In [19]:
forecast.head()

Unnamed: 0,ds,trend,yhat_lower,yhat_upper,trend_lower,trend_upper,additive_terms,additive_terms_lower,additive_terms_upper,daily,daily_lower,daily_upper,weekly,weekly_lower,weekly_upper,multiplicative_terms,multiplicative_terms_lower,multiplicative_terms_upper,yhat
0,2020-07-26 00:00:00,22715.339926,9929.030056,29053.613458,14583.263035,32600.558534,-3916.775339,-3916.775339,-3916.775339,-1721.049859,-1721.049859,-1721.049859,-2195.72548,-2195.72548,-2195.72548,0.0,0.0,0.0,18798.564587
1,2020-07-26 00:30:00,22714.481089,9724.01682,28457.571274,14569.355693,32610.74458,-4476.344846,-4476.344846,-4476.344846,-2281.371311,-2281.371311,-2281.371311,-2194.973535,-2194.973535,-2194.973535,0.0,0.0,0.0,18238.136243
2,2020-07-26 01:00:00,22713.622252,9331.849155,28057.104007,14561.357249,32620.930626,-4925.967851,-4925.967851,-4925.967851,-2733.059705,-2733.059705,-2733.059705,-2192.908146,-2192.908146,-2192.908146,0.0,0.0,0.0,17787.654401
3,2020-07-26 01:30:00,22712.763415,8913.676332,27418.190586,14555.539087,32631.116672,-5288.385129,-5288.385129,-5288.385129,-3098.845708,-3098.845708,-3098.845708,-2189.539421,-2189.539421,-2189.539421,0.0,0.0,0.0,17424.378286
4,2020-07-26 02:00:00,22711.904578,8502.723967,27394.798098,14549.720925,32641.302719,-5582.73162,-5582.73162,-5582.73162,-3397.852806,-3397.852806,-3397.852806,-2184.878814,-2184.878814,-2184.878814,0.0,0.0,0.0,17129.172958


In [20]:
forecast[['ds']].tail()

Unnamed: 0,ds
81,2020-07-27 16:30:00
82,2020-07-27 17:00:00
83,2020-07-27 17:30:00
84,2020-07-27 18:00:00
85,2020-07-27 18:30:00


### 4. format data and export
 - format predictions + test data
 - format ground truth labels
 
#### Note on data to be used/exported
1. inputs --> df
2. predictions --> preds
3. preds for DMM --> df + preds (inner join on 'ds' index/column)
4. ground truth for DMM --> df_for_prophet, below.. or "X" below

### Dev: Export train, test+predictions, and ground truth labels for DMM example

In [21]:
def split_data_export(df,n,prefix):
    arrays = np.array_split(df,n)
    df_dict = {}
    file_names = list()
    
    for i in range(n):
        df_dict["partition{0}".format(i+1)] = arrays[i]
   
    for key, value in df_dict.items():
        if n ==1:
            name = os.environ['DOMINO_WORKING_DIR']+'/results/{0}_total_{1}.csv'.format(prefix,date.today())
        else:
            name = os.environ['DOMINO_WORKING_DIR']+'/results/{0}_total_{1}_{2}.csv'.format(prefix,key,date.today())
            
        file_names.append(name)   
        value.to_csv(name)

    return file_names

Note that here, we use the entire "new" dataset, no splitting needed

In [22]:
#preds
#add underlying features back into prediction dataset
pred_data = df.copy().drop(columns=['date','half_hour_increment','y']).rename(columns = {'datetime':'ds'}).set_index('ds')
pred_data.head()

Unnamed: 0_level_0,HDF,CCGT,OIL,COAL,NUCLEAR,WIND,PS,NPSHYD,OCGT,OTHER,INTFR,INTIRL,INTNED,INTEW,BIOMASS,INTEM
ds,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
2020-07-26 00:00:00,FUELHH,7079,0,0,5112,4415,0,217,1,121,792,252,682,44,1007,556
2020-07-26 00:30:00,FUELHH,5799,0,0,5102,4886,0,208,0,132,778,252,682,50,1006,556
2020-07-26 01:00:00,FUELHH,5001,0,0,5111,4924,0,136,0,165,780,252,682,198,1014,556
2020-07-26 01:30:00,FUELHH,4658,0,0,5119,5231,0,128,0,137,780,252,684,210,1017,556
2020-07-26 02:00:00,FUELHH,4798,0,0,5041,5467,0,127,0,138,728,252,634,60,1017,554


In [23]:
pred_data.shape

(86, 16)

same thing with forecast data.. no splitting needed

In [24]:
preds = forecast.set_index('ds')
preds.head()

Unnamed: 0_level_0,trend,yhat_lower,yhat_upper,trend_lower,trend_upper,additive_terms,additive_terms_lower,additive_terms_upper,daily,daily_lower,daily_upper,weekly,weekly_lower,weekly_upper,multiplicative_terms,multiplicative_terms_lower,multiplicative_terms_upper,yhat
ds,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
2020-07-26 00:00:00,22715.339926,9929.030056,29053.613458,14583.263035,32600.558534,-3916.775339,-3916.775339,-3916.775339,-1721.049859,-1721.049859,-1721.049859,-2195.72548,-2195.72548,-2195.72548,0.0,0.0,0.0,18798.564587
2020-07-26 00:30:00,22714.481089,9724.01682,28457.571274,14569.355693,32610.74458,-4476.344846,-4476.344846,-4476.344846,-2281.371311,-2281.371311,-2281.371311,-2194.973535,-2194.973535,-2194.973535,0.0,0.0,0.0,18238.136243
2020-07-26 01:00:00,22713.622252,9331.849155,28057.104007,14561.357249,32620.930626,-4925.967851,-4925.967851,-4925.967851,-2733.059705,-2733.059705,-2733.059705,-2192.908146,-2192.908146,-2192.908146,0.0,0.0,0.0,17787.654401
2020-07-26 01:30:00,22712.763415,8913.676332,27418.190586,14555.539087,32631.116672,-5288.385129,-5288.385129,-5288.385129,-3098.845708,-3098.845708,-3098.845708,-2189.539421,-2189.539421,-2189.539421,0.0,0.0,0.0,17424.378286
2020-07-26 02:00:00,22711.904578,8502.723967,27394.798098,14549.720925,32641.302719,-5582.73162,-5582.73162,-5582.73162,-3397.852806,-3397.852806,-3397.852806,-2184.878814,-2184.878814,-2184.878814,0.0,0.0,0.0,17129.172958


In [25]:
preds.shape

(86, 18)

In [26]:
preds = pd.merge(pred_data, preds, how='inner', on='ds')
preds = preds.rename(columns = {'yhat':'y'})
preds.shape

(86, 34)

In [27]:
preds.columns

Index(['HDF', 'CCGT', 'OIL', 'COAL', 'NUCLEAR', 'WIND', 'PS', 'NPSHYD', 'OCGT',
       'OTHER', 'INTFR', 'INTIRL', 'INTNED', 'INTEW', 'BIOMASS', 'INTEM',
       'trend', 'yhat_lower', 'yhat_upper', 'trend_lower', 'trend_upper',
       'additive_terms', 'additive_terms_lower', 'additive_terms_upper',
       'daily', 'daily_lower', 'daily_upper', 'weekly', 'weekly_lower',
       'weekly_upper', 'multiplicative_terms', 'multiplicative_terms_lower',
       'multiplicative_terms_upper', 'y'],
      dtype='object')

In [47]:
inputs_preds = split_data_export(preds,1,'inputs_and_preds')
inputs_preds

['/mnt/results/inputs_and_preds_total_2020-07-27.csv']

In [29]:
#ground truth
dt = X.rename(columns = {'y':'y_gt'}).reset_index(drop=True).set_index('ds')
dt.head()

Unnamed: 0_level_0,y_gt
ds,Unnamed: 1_level_1
2020-07-26 00:00:00,20278
2020-07-26 00:30:00,19451
2020-07-26 01:00:00,18819
2020-07-26 01:30:00,18772
2020-07-26 02:00:00,18816


Quick gut check -- does the length of our ground truth dataframe match that of the predictions? 

In [31]:
len(dt)

86

In [None]:
dt.shape

Yes, yes it does.

The following step was an alternate method to create the ground truth dataframe.. redundant with above in current state, but also useful for more general purpose cross-indexing of pred date range with ground truth data

In [33]:
gt = dt[dt.index.isin(preds.index)]
#gt = pd.merge(preds, dt, how='inner', on='ds')
#gt = pd.concat([y, X[['ds']]], axis=1, sort=False).rename(columns = {'y':'y_gt'}).reset_index(drop=True).set_index('ds')
gt.shape

(86, 1)

In [36]:
gt.head()

Unnamed: 0_level_0,y_gt
ds,Unnamed: 1_level_1
2020-07-26 00:00:00,20278
2020-07-26 00:30:00,19451
2020-07-26 01:00:00,18819
2020-07-26 01:30:00,18772
2020-07-26 02:00:00,18816


In [37]:
gt.tail()

Unnamed: 0_level_0,y_gt
ds,Unnamed: 1_level_1
2020-07-27 16:30:00,29195
2020-07-27 17:00:00,29698
2020-07-27 17:30:00,29580
2020-07-27 18:00:00,29182
2020-07-27 18:30:00,28736


In [40]:
ground_truth = split_data_export(gt,1,'ground_truth')
ground_truth

['/mnt/results/ground_truth_total_2020-07-27.csv']

### 5. Use S3 APIs to export input+pred and ground truth data directly to bucket of choice
 - https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html

In [44]:
global ACCESS_KEY
global SECRET_KEY
ACCESS_KEY = os.environ['AWS_ACCESS_KEY_ID']
SECRET_KEY = os.environ['AWS_SECRET_ACCESS_KEY']

In [45]:
def upload_to_aws(local_file, bucket, prefix):
    s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY,
                      aws_secret_access_key=SECRET_KEY)
    
    s3_file = '{}/{}'.format(prefix,os.path.basename(local_file))
    
    try:
        s3.upload_file(local_file, bucket, s3_file)
        print(str(s3_file) + " Upload Successful")
        return True
    except FileNotFoundError:
        print("The file was not found")
        return False
    except NoCredentialsError:
        print("Credentials not available")
        return False

In [46]:
for name in inputs_preds:
    upload_to_aws(name, 'cg-scratch', 'automation-test')
    
for name in ground_truth:
    upload_to_aws(name, 'cg-scratch', 'automation-test')

automation-test/inputs_and_preds_total_2020-07-27.csv Upload Successful
automation-test/ground_truth_total_2020-07-27.csv Upload Successful


### 6. Use DMM APIs to register new preds and ground truth data
 - specify s3 bucket 
 - PUT https://dmm-hartford.domino-pilot.com/api/v0/models/<MODEL-ID>/add_predictions
 - PUT https://dmm-hartford.domino-pilot.com/api/v0/models/<MODEL-ID>/add_ground_truths

#### A. Example: Get Model List

In [51]:
import json

In [66]:
url = "https://dmm-hartford.domino-pilot.com/api/v0/models/list_models"

payload  = {}
headers = {
  'Authorization': 'eyJhbGciOiJIUzUxMiJ9.eyJpZCI6IjVlZmNkZGVkYjIzYTNjMDAwMTQ2MzZmNCIsInVzZXJuYW1lIjoiRERMVGVzdE9yZ0FkbWluIiwidXNlcl90eXBlIjoiYXBpIiwib3JnYW5pemF0aW9uX2lkIjoiNWVmY2RkZWRiMjNhM2MwMDAxNDYzNmY1In0.JWOJsJ1BX6-rMa_69qEljudCLxZN161Gi1efOUJCuOjfRYdAGsTTnq7s2sPqhAUSIEtU8lhFPqT7fng63yV51g',
  'Content-Type': 'application/json'
}

In [67]:
response = requests.request("GET", url, headers=headers, data = payload)

In [68]:
data = json.loads(response.text.encode('utf8'))
df = pd.DataFrame.from_dict(data[:])
df
#print(data[0]['id'])
#print(response.text.encode('utf8'))

Unnamed: 0,id,name,modelType,modelVersion,description,dateRegistered,modelAuthor
0,5f031bb31340042f50114e40,DivTest,classification,1,"KL - age: 0.0465, job: 0.0288, marital: 0.0177...",2020-07-06T12:40:19.072000,samit.demo@dominodatalab.com
1,5f08ba19ae054536449ce2c2,UK Power Prediction - BMRS,regression,v2,"Predicting Total Power Production in the UK, a...",2020-07-10T18:57:29.620000,Colin Goyette
2,5f0ef5111340042f50114eb5,Hartford,regression,1,,2020-07-15T12:22:41.790000,samit.thange@dominodatalab.com
3,5f0eff0a127595122a537e04,Power Generation,regression,v1.1,Predicting total power generation in the UK,2020-07-15T13:05:14.805000,DDLTestOrgAdmin
4,5f10a0e108a1fca0bd9ff6df,UK Power Prediction - BMRS,regression,v3,,2020-07-16T18:48:01.313000,DDLTestOrgAdmin
5,5f1f1cfbe76c80e6f55f857a,UK Power Gen - Total,regression,v1.2,,2020-07-27T18:29:15.547000,DDLTestOrgAdmin
6,5f04ac7bf52c482ae3f54a29,reg_test,regression,v1.0,,2020-07-07T17:10:19.243000,JaiRaju
7,5f04ac7ef52c482ae3f54a2a,hartford test,classification,v1,,2020-07-07T17:10:22.137000,ChrisDong


#### B. Register new Model

In [None]:
# TO DO

#### C. Register Predictions

In [170]:
def register_pred(file_path, MODEL_ID):
    #file_name = os.path.basename(file_path)
    #file_location = 'https://{}.s3.amazonaws.com/automation-test/{}'.format(bucket,file_name)
    
    url = "https://dmm-hartford.domino-pilot.com/api/v0/models/{}/add_predictions".format(MODEL_ID)
    
    payload = '{\n  \"dataLocation\": \"https://s3.amazonaws.com/cg-scratch/automation-test/'+file_path+'\"\n}'
    
    headers = {
      'Authorization': 'eyJhbGciOiJIUzUxMiJ9.eyJpZCI6IjVlZmNkZGVkYjIzYTNjMDAwMTQ2MzZmNCIsInVzZXJuYW1lIjoiRERMVGVzdE9yZ0FkbWluIiwidXNlcl90eXBlIjoiYXBpIiwib3JnYW5pemF0aW9uX2lkIjoiNWVmY2RkZWRiMjNhM2MwMDAxNDYzNmY1In0.JWOJsJ1BX6-rMa_69qEljudCLxZN161Gi1efOUJCuOjfRYdAGsTTnq7s2sPqhAUSIEtU8lhFPqT7fng63yV51g',
      'Content-Type': 'application/json'
    }

    response = requests.request("PUT", url, headers=headers, data = payload)
    print(response.text.encode('utf8'))
        
    return response, response.text.encode('utf8')

In [171]:
register_pred(os.path.basename(inputs_preds[0]),'5f1f1cfbe76c80e6f55f857a')

b'{"status": "Fail", "message": "[\'dataLocation points to an invalid file path\']"}'


(<Response [400]>,
 b'{"status": "Fail", "message": "[\'dataLocation points to an invalid file path\']"}')

.. fails with same code in Postman. -- Copying python - requests code sample here to try without wrapper function

## Register Ground Truth

NOTES:
1. first time around, if doing so through API, must include "data columns" details. 
2. afterwards, only data location necessary

In [172]:
def ground_truth_reg(file_path,MODEL_ID):
    #file_name = os.path.basename(file_path)
    #file_location = 'https://{}.s3.amazonaws.com/automation-test/{}'.format(bucket,file_name)
    
    url = "https://dmm-hartford.domino-pilot.com/api/v0/models/{}/add_ground_truths".format(MODEL_ID)
    
    payload = '{\n  \"dataLocation\": \"https://s3.amazonaws.com/cg-scratch/automation-test/'+file_path+'\"\n}'

    headers = {
      'Authorization': 'eyJhbGciOiJIUzUxMiJ9.eyJpZCI6IjVlZmNkZGVkYjIzYTNjMDAwMTQ2MzZmNCIsInVzZXJuYW1lIjoiRERMVGVzdE9yZ0FkbWluIiwidXNlcl90eXBlIjoiYXBpIiwib3JnYW5pemF0aW9uX2lkIjoiNWVmY2RkZWRiMjNhM2MwMDAxNDYzNmY1In0.JWOJsJ1BX6-rMa_69qEljudCLxZN161Gi1efOUJCuOjfRYdAGsTTnq7s2sPqhAUSIEtU8lhFPqT7fng63yV51g',
      'Content-Type': 'application/json'
    }

    response = requests.request("PUT", url, headers=headers, data = payload)
    print(response.text.encode('utf8'))
    
    return response, response.text.encode('utf8')

In [173]:
print(os.path.basename(ground_truth[0]))

ground_truth_total_2020-07-27.csv


## Test

In [164]:
global model_id
model_id = '5f1f1cfbe76c80e6f55f857a'

In [165]:
inputs_preds[0]

'/mnt/results/inputs_and_preds_total_2020-07-27.csv'

In [177]:
os.path.basename(inputs_preds[0])

'inputs_and_preds_total_2020-07-27.csv'

In [179]:
register_pred(os.path.basename(inputs_preds[0]), model_id)

b'{"status": "Fail", "message": "[\'dataLocation points to an invalid file path\']"}'


(<Response [400]>,
 b'{"status": "Fail", "message": "[\'dataLocation points to an invalid file path\']"}')

In [180]:
ground_truth[0]

'/mnt/results/ground_truth_total_2020-07-27.csv'

In [181]:
os.path.basename(ground_truth[0])

'ground_truth_total_2020-07-27.csv'

In [175]:
ground_truth_reg(os.path.basename('mnt/results/inputs_and_preds_total_2020-07-17.csv'), model_id)

b'{"status": "Fail", "message": "[\'This ground truth dataset is already registered with model\']"}'


(<Response [400]>,
 b'{"status": "Fail", "message": "[\'This ground truth dataset is already registered with model\']"}')