In [None]:
'''
Licensed Materials - Property of IBM
IBM Maximo APM - Predictive Maintenance Insights On-Premises
IBM Maximo APM - Predictive Maintenance Insights SaaS 
# IBM Maximo Application Suite 
© Copyright IBM Corp. 2019,2020,2021 All Rights Reserved.
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.
'''

# Maximo Predict - End of Life Curve Model Template

1. [Introduction](#introduction)
2. [Install Maximo Predict SDK](#install-maximo-apm-pmi-sdk)
3. [Setup the Model Training Pipeline](#setup-model-training-pipline)
4. [Train the Model Instance](#train-model-instance)
5. [Register the Trained Model Instance](#register-trained-model-instance)
6. [Model Template Internals](#model-template-internals)

<a id='introduction'></a>
## Introduction
Statistically, to evaluate mean life of assets, the sample mean or the average age method is acceptable if a big population has end-of-life information. But assets such as generators, transformers, reactors, cables, and so on, have a relatively long life up to and even beyond 40 years and generally there are very limited end-of-life failure data. This algorithm is designed to address this use case: to estimate mean life with limited end-of-life failure data. In fact, the proposed algorithm works best when fewer than 20% of the assets has end-of-life failure data.

This notebook predicts failure probability curve for a type of asset. In the challenges of asset health assessment, the asset failure probability and its expected remaining life are the key aspects to analyze the asset health status.

The Failure Probability Curve model uses statistics distribution to assess the failure probability versus year. This model has two methods:
+ **Normal Distribution**: a small percentage of assets fail during the early life cycle, a few last beyond the average expected life span, but the majority fails within their mean life.
+ **Weibull Distribution**:
    \begin{cases} f(x;\lambda,k) =  \frac{k}{\lambda}\left(\frac{x}{\lambda}\right)^{k-1}e^{-(x/\lambda)^{k}} & x\geq0 ,\\ 0 & x<0, \end{cases}
    where k > 0 is the shape parameter and λ > 0 is the scale parameter.
    
Data requirement -
The Failure Probability Curve model works best when only small percentage of assets (<= 20%) is end-of-life with at least 10 assets.
The model may not work if 40% or more assets are end-of-life already. On the other side, the sample assets need to have at least two end-of-life.
Note: In Maximo, end-of-life is defined by the ASSET.STATUSDATE when "status" is "DECOMMISSIONED" in Maximo.

<a id="install-maximo-apm-pmi-sdk"></a>
## Install the Maximo Predict SDK



You will need follow 4 credentials to run the notebook. You can obtain first 3 credentials if you are admin on Maximo Predict UI.


Steps -

    APM_ID: Application Administration -> System Properties -> Filter -> Search PMIId -> Current Value (eg. b95ed774)
    APM_API_BASEURL: Application Administration -> Integration -> End Points--> Searh for predict -> click search result PREDICTAPI -> URL (e.g https://predict-api.mas-pmidev1-predict.svc, note you just need first part of the url)
    APM_API_KEY: Application Administration -> Go To Administration -> Copy key from user card (e.g. 6805t46gn3tef37pu0picpg9vcq3hsmamm1enc43), or Add API key for the user if API key does not exist.

Step to get db2_certificate.pem:
    login CP4D. Click Services->Instances. Click the Predict DB. Click "Download SSL Certificate" and save it to the file named as db2_certificate.pem. Then upload the db2_certificate.pem to the CP4D by click "New data Asset".



In [1]:
import json
import os
  
# Opening JSON file

config_file='/project_data/data_asset/Predict_Envs.json'

if  os.path.isfile(config_file):
    f = open('/project_data/data_asset/Predict_Envs.json',)
    data = json.load(f)
    f.close()
    os.environ['APM_ID'] = data['APM_ID']
    os.environ['APM_API_BASEURL'] = data['APM_API_BASEURL']
    os.environ['APM_API_KEY'] = data['APM_API_KEY']
else:
    print('Please make sure the Predict_Envs.json is in the Data Assets')

In [2]:
#Override the 3 credentials if needed.
# %%capture
# # @hidden_cell
# %env APM_ID=***********
# %env APM_API_BASEURL=**************
# %env APM_API_KEY=**************


In [3]:
import os
os.environ['TRUST_PREDICT']= os.getenv('APM_API_BASEURL')[8:]
#print(os.getenv('TRUST_PREDICT'))


os.environ['SSL_VERIFY_APM'] = 'False'
os.environ['SSL_VERIFY_AS'] = 'False'
monitor_url= "https://masdev.api.monitor.masocp-igki4x.apps.masocp-igki4x.ibmazsp.net"
os.environ['isICP']='true'
os.environ['REST_METADATA_URL']=monitor_url
os.environ['REST_KPI_URL']=monitor_url

Then, install the Maximo Predict SDK by using the `pip` command.

In [4]:
!pip uninstall -y pmlib

Found existing installation: pmlib 8.8.1.dev4249
Uninstalling pmlib-8.8.1.dev4249:
  Successfully uninstalled pmlib-8.8.1.dev4249


In [5]:
!pip install --trusted-host ${TRUST_PREDICT}  -U ${APM_API_BASEURL}/ibm/pmi/service/rest/ds/${APM_ID}/${APM_API_KEY}/lib/download?filename=pmlib

Collecting https://masocp-igki4x-predict-api.mas-masocp-igki4x-predict.svc/ibm/pmi/service/rest/ds/masdev/inuqrvb39kbuvc4odt7ntmqr3n1of68ojjjt6o6t/lib/download?filename=pmlib
  Downloading https://masocp-igki4x-predict-api.mas-masocp-igki4x-predict.svc/ibm/pmi/service/rest/ds/masdev/inuqrvb39kbuvc4odt7ntmqr3n1of68ojjjt6o6t/lib/download?filename=pmlib (156.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m156.6/156.6 MB[0m [31m129.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hProcessing //tmp/1000800000/pip-req-build-iagnxk81/dslib/watson_data_client-1.0.1-cp310-cp310-linux_x86_64.whl
Processing //tmp/1000800000/pip-req-build-iagnxk81/dslib/mat-sdk-0.31.0.zip
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Installing backend dependencies ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hProcessing //tmp/10008000







dqlearn is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.
watson-data-client is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.
modelfactory is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.
srom is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.
Building wheels for collected packages: pmlib, aixclient, mat-sdk
  Building wheel for pmlib (setup.py) ... [?25ldone
[?25h  Created wheel for pmlib: filename=pmlib-8.8.1.dev4249-py3-none-any.whl size=972581 sha256=4c70ce9687d052467fc72cb5241fa43dc9ce8542ec088bbdae204c221a16a3c4
  Stored in directory: /tmp/1000800000/pip-ephem-wheel-cache-umqj5tc3/wheels/0c/f6/88/388c6421d8ce6b9f0a544ef416beb8824660d109b9f6babc58
  Building wheel for aixclie

<a id="setup-model-training-pipline"></a>
## Set up the model training pipeline

Before you can start working on the model training pipeline, you have to set up an asset group properly in Maximo. See the IBM Maximo APM - Predictive Maintenance Insights SaaS documentation for details.

Required model pipeline configuration:

* Asset group ID: The unit of model processing is an asset group. Asset groups are managed on the IBM Maximo Health->Predict grouping page. Get the ID of the asset group to be analyzed by this model.
* Asset installation date and decommission date as the label: This model requires asset installation date (asset attribute **```installdate```** in Maximo) and asset decommission date (asset attribute **```ASSET.STATUSDATE when "status" is "DECOMMISSIONED"```** in Maximo) to extract the label for training.

Now you can set up a training pipeine based on this model template, with your own data, to train a model instance.

The preceding example configured a pipeline for this model using asset attributes **```installdate```** and **```ASSET.STATUSDATE when "status" is "DECOMMISSIONED"```** to extract the labels for training. This model only generates the failure probability curve and does not do scoring, hence no predictions are defined.

In [6]:
# you can get asset_group_id from Health's 'Predict grouping' page.
asset_group_id='1002'

In [7]:
from pmlib.degradation_curve import DegradationCurveAssetGroupPipeline

group = DegradationCurveAssetGroupPipeline(
            asset_group_id=asset_group_id, 
            model_pipeline={
                'missing_value_analysis': False,  # skip missing value analysis
                
                "statistics_distribution_args": {
                    "distribution_type": "WEIBULL", 
                    "mean_or_scale": None,
                    "stddev_or_shape": None
                }
            })



2023-07-24T12:45:46.149 INFO::pmlib.util.setup_logging: Log level has not been set yet... setting to default level of 10
2023-07-24T12:45:46.158 INFO::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.__init__: Initializing End of Life Curve Asset Group Pipeline...
2023-07-24T12:45:46.159 INFO::pmlib.api.init_environ: Initializing environment...
2023-07-24T12:45:46.159 DEBUG::pmlib.api.init_environ: APM_ID=masdev, APM_API_BASEURL=https://masocp-igki4x-predict-api.mas-masocp-igki4x-predict.svc, APM_API_KEY=********
2023-07-24T12:45:46.160 DEBUG::pmlib.util.api_request: Making API Request: method=get, url=https://masocp-igki4x-predict-api.mas-masocp-igki4x-predict.svc/ibm/pmi/service/rest/ds/tenant?instanceId=masdev, headers={'apmapitoken': '********'}, timeout=300, ssl_verify=False, json=None, session=None, kwargs={}
2023-07-24T12:45:46.632 DEBUG::pmlib.util.api_request: Received API Response: resp.status_code=200, method=get, url=https://masocp-igki4x-predict-api.mas-masocp-ig

2023-07-24T12:45:50.137 DEBUG::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.__init__: Retrieved post processing configuration: post_processing=[]
2023-07-24T12:45:50.137 DEBUG::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.__init__: Incremental summary enabled: incremental_summary=False
2023-07-24T12:45:50.138 DEBUG::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.__init__: Retrieved published outputs: published_outputs={}
2023-07-24T12:45:50.138 DEBUG::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline._validate_pipeline_outputs: Validating pipeline outputs: current_outputs=[]
2023-07-24T12:45:50.139 DEBUG::pmlib.util.api_request: Making API Request: method=get, url=https://masocp-igki4x-predict-api.mas-masocp-igki4x-predict.svc/ibm/pmi/service/rest/assetgroup/1002/model/?instanceId=masdev, headers={'apmapitoken': '********'}, timeout=300, ssl_verify=False, json=None, session=None, kwargs={}
2023-07-24T12:45:50.640 DEBUG::pmlib.util.api_re

<a id="train-model-instance"></a>
## Train the model instance

Now you can train the model instance.

In [11]:
df = group.execute()

2023-07-24T13:03:02.273 INFO::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.execute: Starting execution of End of Life Curve Asset Group Pipeline...
2023-07-24T13:03:02.274 DEBUG::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.execute: Received input DataFrame: df=None
2023-07-24T13:03:02.275 DEBUG::pmlib.pipeline._ModelPipelineConfig.__init__: Initializing ModelPipelineConfig Dict with the following parameters: features=[], features_for_training=['installdate', 'statusdate', 'status'], predictions=[], features_resampled={}, inputs=(':installdate', ':statusdate', ':status'), renamed_inputs=('installdate', 'statusdate', 'status')
2023-07-24T13:03:02.276 DEBUG::pmlib.pipeline._ModelPipelineConfig.__init__: Added kwargs to ModelPipelineConfig: {'missing_value_analysis': False, 'statistics_distribution_args': {'distribution_type': 'WEIBULL', 'mean_or_scale': None, 'stddev_or_shape': None}}
2023-07-24T13:03:02.277 DEBUG::pmlib.degradation_curve.DegradationCurveAsset

2023-07-24T13:03:03.218 DEBUG::pmlib.api._refresh_asset_cache: Retrieved asset device mappings for asset group id 1002. Mappings=[{'devices': [], 'assetNum': 'ROBOARM1', 'siteId': 'BEDFORD'}, {'devices': [], 'assetNum': 'ROBOARM2', 'siteId': 'BEDFORD'}, {'devices': [], 'assetNum': 'ROBOARM3', 'siteId': 'BEDFORD'}, {'devices': [], 'assetNum': 'ROBOARM4', 'siteId': 'BEDFORD'}, {'devices': [], 'assetNum': 'ROBOARM5', 'siteId': 'BEDFORD'}, {'devices': [], 'assetNum': 'ROBOARM6', 'siteId': 'BEDFORD'}, {'devices': [], 'assetNum': 'ROBOARM7', 'siteId': 'BEDFORD'}, {'devices': [], 'assetNum': 'ROBOARM8', 'siteId': 'BEDFORD'}, {'devices': [], 'assetNum': 'ROBOARM9', 'siteId': 'BEDFORD'}]
2023-07-24T13:03:03.218 DEBUG::pmlib.api._refresh_asset_cache: Generating device list... iterating over data_items: [':installdate', ':statusdate', ':status']
2023-07-24T13:03:03.219 DEBUG::pmlib.api._refresh_asset_cache: Iterating over data items. Current name_type: 
2023-07-24T13:03:03.220 DEBUG::pmlib.api._r

2023-07-24T13:03:03.295 DEBUG::pmlib.loader.AssetLoader.execute: Converted asset device mappings to DataFrame: df_mappings=
shape=(9, 3), 
index={0: 'int64'}, 
columns={'asset_id': 'O', 'entity_type': 'O', 'id': 'O'}, 
head(5)=
                asset_id entity_type   id
0  ROBOARM1-____-BEDFORD         NaN  NaN
1  ROBOARM2-____-BEDFORD         NaN  NaN
2  ROBOARM3-____-BEDFORD         NaN  NaN
3  ROBOARM4-____-BEDFORD         NaN  NaN
4  ROBOARM5-____-BEDFORD         NaN  NaN
2023-07-24T13:03:03.296 DEBUG::pmlib.loader.AssetLoader.execute: Iterating over entity types to prep DataFrames for merging...
2023-07-24T13:03:03.297 DEBUG::pmlib.loader.AssetLoader.execute: Current iteration: entity_type=, data_items=['statusdate', 'installdate', 'status'], is_asset_data=True
2023-07-24T13:03:03.297 DEBUG::pmlib.loader.AssetLoader.execute: Separating asset time series data from non time series data...
2023-07-24T13:03:03.298 DEBUG::pmlib.loader.AssetLoader.execute: data_items=['statusdate', 'inst

2023-07-24T13:03:04.936 DEBUG::pmlib.degradation_curve.DegradationCurveInputDataFilter.execute: df_input=
shape=(9, 3), 
index={'id': 'O', 'evt_timestamp': datetime64[ns, UTC]}, 
columns={'installdate': '<M8[ns]', 'statusdate': datetime64[ns, UTC], 'status': 'O'}, 
head(5)=
                                                       installdate  \
id                    evt_timestamp                                  
ROBOARM1-____-BEDFORD 2023-07-24 12:55:04.917807+00:00  2016-05-05   
ROBOARM2-____-BEDFORD 2023-07-24 12:56:04.917807+00:00  2016-05-05   
ROBOARM3-____-BEDFORD 2023-07-24 12:57:04.917807+00:00  2016-05-05   
ROBOARM4-____-BEDFORD 2023-07-24 12:58:04.917807+00:00  2016-05-07   
ROBOARM5-____-BEDFORD 2023-07-24 12:59:04.917807+00:00  2016-05-10   

                                                                      statusdate  \
id                    evt_timestamp                                                
ROBOARM1-____-BEDFORD 2023-07-24 12:55:04.917807+00:00 2023-06-26 

in the while loop iteration=3700
the new performance objective function value is: 0.04862861664750307 alpha=13.890080610093793 beta=5.719942002484718
in the while loop iteration=3800
the new performance objective function value is: 0.048084880273308477 alpha=13.88797661779122 beta=5.71287460273636
in the while loop iteration=3900
the new performance objective function value is: 0.04754676041314117 alpha=13.88588080536856 beta=5.705844607369419
in the while loop iteration=4000
the new performance objective function value is: 0.047014229966748596 alpha=13.883793156366544 beta=5.6988520371758415
in the while loop iteration=4100
the new performance objective function value is: 0.04648726140198916 alpha=13.881713665370315 beta=5.691896906818195
in the while loop iteration=4200
the new performance objective function value is: 0.04596582676512886 alpha=13.87964233569461 beta=5.684979225516756
in the while loop iteration=4300
the new performance objective function value is: 0.04544989769116988

2023-07-24T13:03:05.289 DEBUG::pmlib.degradation_curve.DegradationCurveEstimator.save_model: Saving model...
2023-07-24T13:03:05.291 DEBUG::pmlib.degradation_curve.DegradationCurveEstimator.save_model: Saved model to path: apm/pmi/model/1002/DegradationCurveEstimator/__1690203785
2023-07-24T13:03:05.292 DEBUG::pmlib.degradation_curve.DegradationCurveEstimator.save_model: Saving model...
2023-07-24T13:03:05.293 DEBUG::pmlib.degradation_curve.DegradationCurveEstimator.save_model: Saved model to path: apm/pmi/model/1002/DegradationCurveEstimator/__1690203785_json
2023-07-24T13:03:05.298 DEBUG::pmlib.degradation_curve.DegradationCurveEstimator.execute: Final DF with predictions: 
shape=(4, 0), 
index={'id': 'O', 'evt_timestamp': datetime64[ns, UTC]}, 
columns={}, 
df=Empty DataFrame
2023-07-24T13:03:05.300 DEBUG::pmlib.pipeline._ModelPipelineConfig.__init__: Initializing ModelPipelineConfig Dict with the following parameters: features=[], features_for_training=['installdate', 'statusdate',

After this method completes successfully, you have a trained model instance ready, and the prediction results are returned as a dataframe for verification.

<a id="register-trained-model-instance"></a>
## Register the trained model instance


If the trained model instance looks good, you can register it to Maximo Predict:

In [12]:
group.register()

2023-07-24T13:03:32.302 INFO::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.register: Beginning registration of model to Maximo Predict...
2023-07-24T13:03:32.303 DEBUG::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.register: target_pipeline_class=pmlib.degradation_curve.DegradationCurveAssetGroupPipeline
2023-07-24T13:03:32.304 DEBUG::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.register: Model Template ID: DegradationCurveAssetGroupPipeline
2023-07-24T13:03:32.305 DEBUG::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.register: This class name: DegradationCurveAssetGroupPipeline
2023-07-24T13:03:32.306 DEBUG::pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.register: Registering model template...
2023-07-24T13:03:32.307 DEBUG::pmlib.util.api_request: Making API Request: method=post, url=https://masocp-igki4x-predict-api.mas-masocp-igki4x-predict.svc/ibm/pmi/service/rest/ds/inuqrvb39kbuvc4odt7ntmqr3n1of68ojjjt6o6t/template?i

2023-07-24T13:03:37.950 DEBUG::pmlib.degradation_curve.DegradationCurveEstimator.save_model: Saving model...
2023-07-24T13:03:37.951 DEBUG::pmlib.degradation_curve.DegradationCurveEstimator.save_model: Saving to KPI_MODEL_STORE, pickle_dump=false
2023-07-24T13:03:37.955 DEBUG::pmlib.degradation_curve.DegradationCurveEstimator.save_model: Saved model to path: apm/pmi/model/1002/DegradationCurveEstimator/__1690203785_input.gz
2023-07-24T13:03:37.958 DEBUG::pmlib.degradation_curve.DegradationCurveEstimator._save_model_df_trace: training_date_range df=
shape=(2, 1), 
index={0: 'int64'}, 
columns={'evt_timestamp': datetime64[ns, UTC]}, 
head(3)=
                     evt_timestamp
0 2023-07-24 13:00:04.917807+00:00
1 2023-07-24 13:03:04.917807+00:00
2023-07-24T13:03:37.959 DEBUG::pmlib.degradation_curve.DegradationCurveEstimator.save_model: Saving model...
2023-07-24T13:03:37.959 DEBUG::pmlib.degradation_curve.DegradationCurveEstimator.save_model: Saving to KPI_MODEL_STORE, pickle_dump=false

'78942A87-DA27-4368-BF76-AA603D8B2702'

After registration succeeds, you can see this newly trained model instance available for the asset group on IBM Maximo Predict.

# Re-train the model instance

If there is an existing model instance and you want to re-train the model, you need to first unregister the model. You can get the model_instance_id from Health UI. Click "Predict grouping" and find the model instance id of your group.
You can also use the below API to get the model_instance_id.
You need to call group.unregister(predict_curve_instance_id,force=True) to unregister the model first.

In [None]:
##This step is to query if any existing predict curve instance associated with the asset group.
##If any, unregister it firstly.
import urllib3
import json
predict_internal_svc=os.getenv('APM_API_BASEURL')[8:]
apm_id=os.getenv('APM_ID')
apm_api_key_dic={}
apm_api_key_dic['apmapitoken']=os.getenv('APM_API_KEY')
predict_curve_instance_id=None


asset_group_id ='1057'


query_url='/ibm/pmi/service/rest/assetgroup/' + asset_group_id +'/model?instanceId='+ apm_id
c = urllib3.HTTPSConnectionPool(predict_internal_svc, port=443, cert_reqs='CERT_NONE',assert_hostname=False)

#response=c.request('GET', '/ibm/pmi/service/rest/assetgroup/1029/model?instanceId=243d1b41',headers={'apmapitoken':'98mgppjl0ghkbqs9loisri01viennenq9hech8eo'})
response=c.request('GET', query_url, headers=apm_api_key_dic)
print(response.data)
response_data_json=json.loads(response.data.decode('utf8'))

for model_instance in response_data_json['modelInstanceList']:
    #print(model_instance)
    modelTemplateName=model_instance.get('modelTemplateDesc',None)
    #print(modelTemplateName)
    #print(model_instance.get('modelTemplateDesc',None))
    if modelTemplateName is not None and modelTemplateName.find('Curve') != -1:
        #print(modelTemplateName)
        #print(model_instance)
        predict_curve_instance_id=model_instance['instanceList'][0]['modelInstanceId']
        print(predict_curve_instance_id)
        


In [None]:
if predict_curve_instance_id is not None:
    group.unregister(predict_curve_instance_id,force=True)

In [None]:
from pmlib.degradation_curve import DegradationCurveAssetGroupPipeline

group = DegradationCurveAssetGroupPipeline(
            asset_group_id=asset_group_id, 
            model_pipeline={
                
                "statistics_distribution_args": {
                    "distribution_type": "WEIBULL", 
                    "mean_or_scale": None,
                    "stddev_or_shape": None
                }
            })

In [None]:
df = group.execute()

In [None]:
group.register()

# Training using CSV file

You can train the model instance using the CSV file.
The format of the CVS is the following:
    asset 	site 	installdate 	statusdate 	status
    The value of status can be 'DECOMMISSIONED' or empty string.

In [None]:
import types
import pandas as pd
df_data_1 = pd.read_csv('/project_data/data_asset/trainbrake_asset_attributes_degradation_curve.csv')
df_data_1.head()


In [None]:
asset_group_id='1018'

In [None]:
from pmlib.degradation_curve import DegradationCurveAssetGroupPipeline
group = DegradationCurveAssetGroupPipeline(
                    asset_group_id=asset_group_id,
                    model_pipeline={
                        'missing_value_analysis': False,  # skip missing value analysis
                        
                        'statistics_distribution_args': {
                            'distribution_type': 'WEIBULL',
                            'mean_or_scale': None,
                            'stddev_or_shape': None,
                        },
                    },
                    asset_device_mappings={
                    'TRAINBRAKE1____-BEDFORD': [], 
                    'TRAINBRAKE2____-BEDFORD': [],
                    'TRAINBRAKE3____-BEDFORD': [],
                    'TRAINBRAKE4____-BEDFORD': [],
                    'TRAINBRAKE5____-BEDFORD': [],
                    },
                   
                    data_substitution={
                    '': [
                        {
                            'df': df_data_1,
                            'keys': ['asset'],
                            'columns': ['installdate','statusdate','status'],
#                             'timestamp': 'datetime'
                        },
                    ]}
                )

In [None]:
df=group.execute()

In [None]:
model_instance_desc='End of Life Curve for group='+asset_group_id
group.register(model_instance_name='End of Life Curve',model_instance_desc=model_instance_desc)

In [None]:
# All time is UTC time, see following example to set schedule to run every 5 minute, every hour, every day, every week, every month
# minute and second will be ignored. "2021-04-12 15:12:15" is same as "2021-04-12 15:00:00" in following examples
#group.enable(enabled=True, schedule={"starting_at": "2021-04-09 15:35:15", "every": "5min"})
#group.enable(enabled=True, schedule={"starting_at": "2021-04-12 15:12:15", "every": "1H"})
#group.enable(enabled=True, schedule={"starting_at": "2021-04-12 15:12:15", "every": "1D"})
#group.enable(enabled=True, schedule={"starting_at": "2021-04-12 15:12:15", "every": "1W"})
#group.enable(enabled=True, schedule={"starting_at": "2021-04-12 15:12:15", "every": "1M"})
group.enable(enabled=True, schedule={"starting_at": "2021-04-12 15:12:15", "every": "5min"})

<a id="model-template-internals"></a>
## Model template internals


#### Use case description

This model deals with computing failure probabilities versus year of a type of asset. Using this model one can answer the questions of the pattern: **What is the failure probability when the asset is N years old ?**

##### Input data from Maximo. The Predictive Maintenance Insights SDK does this part.

+ Assets metadata: installation date and decommission date

##### Output

The output is a list of the assets' year versus failure probability, which is saved in Cloud Object Storage.

##### Customization points

+ The **`distribution_type`** in cell **`Setup the Model Training Pipeline`** is the distribution type. It should be NORMAL or WEIBULL.
+ The **`mean_or_scale"`** and **`stddev_or_shape`** are the parameters for NORMAL or WEIBULL distribution. If not specifed, it uses assets' metadata to calculate the failure probability curve. Otherwise, it generates the curve with the specified parameters.
+ Failure probability curve algorithm (**`DegradationCurveEstimator`** class in the following model template code)
+ Model pipeline stages control (**`DegradationCurveAssetGroupPipeline`** class in the following model template code)

##### Model workflow and description (from **`degradation_curve_model.py`** in the Predictive Maintenance Insights SDK)

+ Step 1: Collect data of in-service years and retired years of the specific assets
+ Step 2: Calculate:
                  Retired (died) one: Age = “Retired year” – “In-service year”
                  Normal one: Age = “Current year” – “In-service year”
                  Exposed number: how many reactors service longer than the age
+ Step 3: Calculate cumulative failure probability (CFP) table:
                  Failure Probability (FP) = retired number / exposed number
                  Cumulative FP (k) = Cumulative FP (k-1) + FP (k) 
                  Z values based on Cumulative FP
                  Create a table following the form like : | Age | Cumulative Failure Probability | z |
+ Step 4: Estimate the mean life and the standard deviation of the normal distribution
+ Step 5: Generate the NORMAL degradation curve
+ Step 6: based on CFP table to get survival table like: | Age | Survival Probability |
+ Step 7: Construct maximum likelihood function
+ Step 8: Calculate initial shape and scale parameters for WEIBULL distribution
+ Step 9: Use gradient-decent method to get optimal shape and scale parameters

In [None]:
class DegradationCurveEstimator(BaseEstimator):
    def __init__(self, features, targets, predictions, statistics_distribution_args=None, **kwargs):
        super().__init__(features, targets, predictions, **kwargs)
        self.statistics_distribution_args = statistics_distribution_args
        self.installdate = kwargs['features_for_training'][0]
        self.decommissiondate = kwargs['features_for_training'][1]

    def train_model(self, df):
        # parse statistics_distribution_args
        distribution_type = self.statistics_distribution_args["distribution_type"]
        mean_or_scale = self.statistics_distribution_args["mean_or_scale"]
        stddev_or_shape = self.statistics_distribution_args["stddev_or_shape"]

        # distribution should not be none
        if distribution_type is None:
            raise('distribution_type should be NORMAL or WEIBULL')
        
        degradation_curve_pipline = DegradationCurve(distribution_type, mean_or_scale, stddev_or_shape, self.installdate, self.decommissiondate)
        # using sample data to test degradation curve calculation
        #df_curve_training_data = degradation_curve_pipline.sample_data()
        # use real data from DB2
        df_curve_training_data = df
        self.logger.debug('df_curve_training_data=\n%s' % df_curve_training_data.head())

        final_degradation_curve = degradation_curve_pipline.fit(df_curve_training_data, distribution_type, mean_or_scale, stddev_or_shape)
        degradation_curve_model = dict()
        #degradation_curve_model["target"] =  "degradation_curve"
        degradation_curve_model["final_degradation_curve"] =  final_degradation_curve

        return degradation_curve_model
    
    def get_model_extra(self, new_model, model_path):
        extras = []

        model_json_path = model_path + '_json'
        extras.append((model_json_path, json.dumps(new_model), False, False)) # no pickle dump, not binary

        self.logger.debug('extras=%s' % str(extras))

        return extras


class DegradationCurveAssetGroupPipeline(AssetGroupPipeline):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        self.model_template_name = 'Failure Probability Curve'

        self.fillna = None
        self.dropna = None

    def prepare_execute(self, pipeline, model_config):
        pipeline.add_stage(DegradationCurveEstimator(**model_config))

    @staticmethod
    def generate_sample_data(**kwargs):
        return generate_degradation_curve_data(**kwargs)

In [None]:
class DegradationCurve:
    '''
    The degradation curve supports NORMAL and WEIBULL distributions
    distribution_type: the distribution type, NORMAL or WEIBULL
    mean_or_scale: the mean value of NORMAL or the scale value of WEIBULL
    stddev_or_shape: the standard deviation value of NORMAL or the shape value of WEIBULL
    '''

    @classmethod
    def metadata(cls):
        return {
            'name': cls.__name__,
            'moduleAndTargetName': '%s.%s' % (cls.__module__, cls.__name__),
            'category': 'TRANSFORMER',
            'description': 'DegradationCurve',
            'input': [
            ],
            'output': [
            ],
            'tags': [
                'EVENT'
            ]
        }

    def __init__(self, data_items, statistics_distribution_args, degradation_curve, installdate, decommissiondate):
        self.logger = logging.getLogger('analytics_service.%s.%s' % (self.__module__, self.__class__.__name__))
        self.data_items = data_items
        self.statistics_distribution_args = statistics_distribution_args
        self.degradation_curve = degradation_curve
        self.installdate = installdate
        self.decommissiondate = decommissiondate

    def execute (self, df):
        self.logger.debug('input_df=\n%s' % df.head())

        df_original = df

        # pick only needed columns for scoring 
        df = df[list(set(self.data_items) - set(df.index.names))]
        self.logger.debug('input_df_pick_need_columns=\n%s' % df.head())

        sources_not_in_column=df.index.names
        df = df.reset_index()

        # parse statistics_distribution_args
        distribution_type = self.statistics_distribution_args["distribution_type"]
        mean_or_scale = self.statistics_distribution_args["mean_or_scale"]
        stddev_or_shape = self.statistics_distribution_args["stddev_or_shape"]

        # distribution should not be none
        if distribution_type is None:
            raise('distribution_type should be NORMAL or WEIBULL')

        # using sample data to test degradation curve calculation
        df_curve_training_data = self.sample_data()
        self.logger.debug('df_curve_training_data=\n%s' % df_curve_training_data.head())

        # fit the degradation curve
        df_score = df.astype({self._entity_type._timestamp: 'datetime64[ms]'})[[self._entity_type._df_index_entity_id, self._entity_type._timestamp]].copy()
        final_degradation_curve = self.fit(df_curve_training_data, distribution_type, mean_or_scale, stddev_or_shape)
        df_score[self.degradation_curve] = str(final_degradation_curve).strip('[]')
        self.logger.debug('df_score=\n%s' % df.head())

        df = df_original.merge(df_score, how='left', left_index=True, right_on=[self._entity_type._df_index_entity_id, self._entity_type._timestamp], sort=False)
        df = df.set_index(keys=sources_not_in_column)
        self.logger.debug('df_final=\n%s' % df.head())

        return df


    def fit(self, df_curve_training_data, distribution_type, mean_or_scale, stddev_or_shape):
        self.logger.debug('initial mean_or_scale=%s' % mean_or_scale)
        self.logger.debug('initial stddev_or_shape=%s' % stddev_or_shape)

        df_curve_training_data[self.installdate] = pd.to_datetime(df_curve_training_data[self.installdate]).dt.year
        df_curve_training_data[self.decommissiondate] = np.where(pd.notna(df_curve_training_data[self.decommissiondate]), pd.to_datetime(df_curve_training_data[self.decommissiondate]).dt.year, -1)
        df_curve_training_data = df_curve_training_data.astype({self.installdate: int, self.decommissiondate: int})
        self.logger.debug('df_curve_training_data=%s' % log_df_info(df_curve_training_data, head=-1))

        # initialize parameter
        mean_or_scale_final = 0.0
        stddev_or_shape_final = 0.0

        # if mean value and stddev value are specified by user, use them to generate the degradation curve
        if mean_or_scale is not None and stddev_or_shape is not None:
            self.logger.debug('generate degradation curve with user defined parameters')
            mean_or_scale_final = float(mean_or_scale)
            stddev_or_shape_final = float(stddev_or_shape)
        else:
            # generate NORMAL distribution by input data
            if distribution_type == 'NORMAL':
                self.logger.debug('calculate the mean value and stddev value for normal distribution...')
                mean_or_scale_final, stddev_or_shape_final, df_cfp = self.generate_normal_distribution(df_curve_training_data)
                self.logger.debug('calculate done')

            # generate WEIBULL distribution by input data
            if distribution_type == 'WEIBULL':   
                self.logger.debug('calculate the scale value and shape value for normal distribution...')
                mean_or_scale_NORMAL, stddev_or_shape_NORMAL, df_cfp = self.generate_normal_distribution(df_curve_training_data)
                mean_or_scale_final, stddev_or_shape_final = self.generate_weibull_distribution(mean_or_scale_NORMAL, stddev_or_shape_NORMAL, df_cfp)
                self.logger.debug('calculate done')

        # return the final curve
        return self.generate_final_curve(distribution_type, mean_or_scale_final, stddev_or_shape_final)
                
    def generate_normal_distribution(self, df_curve_training_data):
        # df_curve_training_data: [assetId, installationDate, removeDate]
        # step1. pre-processing
        date_service = df_curve_training_data[self.decommissiondate] - df_curve_training_data[self.installdate]
        df_curve_training_data['retired_flag'] = np.where(date_service>=0, 1, 0)
        df_curve_training_data['date_service'] = np.where(date_service>=0, date_service, 2000 - df_curve_training_data[self.installdate])  # should be datetime.datetime.now().year, 2000 as test year for sample data
        self.logger.debug('df_curve_training_data_pre_processing=\n%s' % df_curve_training_data)

        # step2. calculate the exposed table
        df_exposed_table = df_curve_training_data.groupby(['date_service']).agg({'retired_flag':'sum', 'date_service':'count'})
        df_exposed_table.rename(columns={'date_service': 'pre_exposed_number'}, inplace=True)
        df_exposed_table.sort_index(inplace=True, ascending=False)
        
        df_exposed_table['exposed_number'] = df_exposed_table.pre_exposed_number.cumsum()
        del df_exposed_table['pre_exposed_number']
        df_exposed_table = df_exposed_table.reset_index()
        df_exposed_table.rename(columns={'retired_flag':'retired_number', 'date_service':'age'}, inplace=True)
        self.logger.debug('df_exposed_table=\n%s' % df_exposed_table.head(200))

        # step3. calculate cumulative failure probablity (cfp) table
        # df_exposed_table: [age, retired_number, exposed_number]
        max_age = df_exposed_table['age'].max()
        
        
        
        max_age_idx = df_exposed_table[df_exposed_table['age'] == max_age].index.values.astype(int)[0] # 0, first row
        #print('------------ ' + str(max_age_idx))
        a1 = (df_exposed_table.loc[max_age_idx, 'retired_number']) 
        a2 = (df_exposed_table.loc[max_age_idx, 'exposed_number'])
        if (a1 == a2) : 
            print ("Number of exposed assets = number of retired assets in max_age, meaning all the asset observed failed at max_age")
            # If max_age-1 exist in the df, drop the max_age row as outlier - some asset may survive after max_age year, we just need enough time to observe.
            # If max_age-1 doesn't exist in the df, modify max_age row to max_age-1
            if ( (max_age-1) == df_exposed_table.loc[max_age_idx+1, 'age'] ):
                #print ("max_age-1 is in !!! drop max_age row")
                df_exposed_table = df_exposed_table.drop(df_exposed_table.index[max_age_idx]) 
            else:    
                df_exposed_table.loc[max_age_idx, 'retired_number'] = 0
                df_exposed_table.loc[max_age_idx, 'age'] = max_age - 1
        #print('df_exposed_table 2 =\n%s' % df_exposed_table.head(200))        
        #print('------------')
        
        
        
        df_cfp = df_exposed_table[df_exposed_table['retired_number'] !=0]
        self.logger.debug('df_cfp_input=\n%s' % df_cfp.head())
        df_cfp['pre_cumulative_probability'] = df_cfp['retired_number'] / df_cfp['exposed_number']
        cfp_first_line = [df_cfp['age'].min()-1, -1, -1, 0.001]
        cfp_last_line = [max_age, -1, -1, 0]
        df_cfp = pd.DataFrame(np.array([cfp_first_line, cfp_last_line]), columns=['age', 'retired_number', 'exposed_number', 'pre_cumulative_probability']).append(df_cfp, ignore_index=True)
        df_cfp.sort_values('age', inplace=True)
        df_cfp['cumulative_probability'] = df_cfp.pre_cumulative_probability.cumsum()
        df_cfp['z'] = norm.ppf(df_cfp['cumulative_probability'])
        self.logger.debug('df_cfp=\n%s' % df_cfp.head(200))

        # step4. estimate optimal mean value and stddev value
        #df_cfp: [age, retired_number, exposed_number, pre_cumulative_probability, cumulative_probability, z]
        age_mean = df_cfp['age'].mean()
        z_mean = df_cfp['z'].mean()
        df_cfp['pre_Szx'] = (df_cfp['z'] - df_cfp['z'].mean()) * (df_cfp['age'] - df_cfp['age'].mean())
        df_cfp['pre_Szz'] = (df_cfp['z'] - df_cfp['z'].mean()) * (df_cfp['z'] - df_cfp['z'].mean())
        Szx = df_cfp['pre_Szx'].sum()
        Szz = df_cfp['pre_Szz'].sum()
        stddev_or_shape_NORMAL = Szx / Szz
        mean_or_scale_NORMAL = age_mean - stddev_or_shape_NORMAL * z_mean
        self.logger.debug('mean_or_scale_NORMAL=\n%s' % mean_or_scale_NORMAL)
        self.logger.debug('stddev_or_shape_NORMAL=\n%s' % stddev_or_shape_NORMAL)
        return mean_or_scale_NORMAL, stddev_or_shape_NORMAL, df_cfp

    def generate_weibull_distribution(self, mean_or_scale_NORMAL, stddev_or_shape_NORMAL, df_cfp):
        # step1. get the survival table based on cfp table
        self.logger.debug('WEIBULL_input_df_cfp=\n%s' % df_cfp)
        df_cfp['survival_probablity'] = np.where(df_cfp['age'] == df_cfp['age'].min(), 1, 1 - df_cfp['cumulative_probability'])
        df_cfp_survival = df_cfp.reset_index()
        self.logger.debug('df_cfp_survival=\n%s' % df_cfp_survival)

        # step2. get initial alpha (scale) and beta (shape)
        # initial beta
        initial_beta = 0.0
        beta_criteia = sys.float_info.max
        for beta in np.arange(0.1, 100, 0.001):
            beta_estimation = ((1+2/beta)**(0.5+2/beta) * math.exp(-(1+2/beta)) * (1 + 1/12/(1+2/beta))) \
                                      / ((1+1/beta)**(1+2/beta) * math.exp(-(2+2/beta)) * (1+1/12/(1+1/beta))**2 * math.sqrt(2*math.pi)) 
            beta_criteia_now = abs(beta_estimation - (1 + stddev_or_shape_NORMAL ** 2 / mean_or_scale_NORMAL ** 2))
            if beta_criteia_now < beta_criteia:
                beta_criteia = beta_criteia_now
                initial_beta = beta
        self.logger.debug('initial_beta=\n%s' % initial_beta)
        # initial alpha
        initial_alpha = math.sqrt(stddev_or_shape_NORMAL**2 / ( math.sqrt(2*math.pi)*((1+2/initial_beta)**(0.5+2/initial_beta) \
                                    * math.exp(-(1+2/initial_beta)) * (1 + 1/12/(1+2/initial_beta))) - (2*math.pi*(1+1/initial_beta)**(1+2/initial_beta) \
                                    * math.exp(-(2+2/initial_beta)) * (1+1/12/(1+1/initial_beta))**2) ) )
        self.logger.debug('initial_alpha=\n%s' % initial_alpha)

        # step3. use gradient decent method to search optimal alpha and beta
        # initialization
        age_array = df_cfp_survival['age'].tolist()
        SP_array = df_cfp_survival['survival_probablity'].tolist() # SP for survivial probability
        rowcount = len(age_array)
        alpha = initial_alpha # initial value of alpha
        beta = initial_beta # initial value of beta
        eps = 0.01 # step length
        precision = 0.003 # stop criteria, it is not set too small to avoid overfitting
        performance_objective = 0 # performance object function
        gradient_alpha = 0
        gradient_beta = 0
        err_objective_iter = 0
        gradient_alpha_iter = 0
        gradient_beta_iter = 0
        
        # initial value for performance objective function
        for i in range(0,rowcount):       
            err_objective_iter = err_objective_iter + math.pow(math.log(SP_array[i]) + math.pow((age_array[i] / alpha), beta),2) 
        performance_objective = err_objective_iter
        err_objective_iter = 0
        self.logger.debug("the init performance objective function vaule is: " + str(performance_objective))

        # main part of gradient decent
        iteration = 0
        while abs(performance_objective) > precision:          
            for i in range(0,rowcount):
                err_objective_iter = err_objective_iter + math.pow(math.log(SP_array[i]) + math.pow((age_array[i] / alpha), beta), 2)
                gradient_alpha_iter = gradient_alpha_iter + 2 * (math.log(SP_array[i]) + math.pow((age_array[i] / alpha), beta)) * beta * math.pow((age_array[i] / alpha), beta-1) * (-age_array[i] / alpha / alpha)
                gradient_beta_iter = gradient_beta_iter + 2 * (math.log(SP_array[i]) + math.pow((age_array[i] / alpha), beta)) * math.pow((age_array[i] / alpha), beta) * math.log(age_array[i] / alpha)
            gradient_alpha = gradient_alpha_iter
            gradient_alpha_iter = 0
            gradient_beta = gradient_beta_iter
            gradient_beta_iter = 0
            performance_objective = err_objective_iter
            performance_objective_iter = 0
            alpha = alpha - eps * gradient_alpha # gradient decent
            beta = beta - eps * gradient_beta # gradient decent          
            iteration = iteration + 1
            #print(iteration)
            if iteration == 1000 :
                break
        self.logger.debug("the optimal alpha is " + str(alpha))
        self.logger.debug("the optimal beta is " + str(beta))

        mean_or_scale_WEIBULL = alpha
        stddev_or_shape_WEIBULL = beta
        self.logger.debug('mean_or_scale_WEIBULL=\n%s' % mean_or_scale_WEIBULL)
        self.logger.debug('stddev_or_shape_WEIBULL=\n%s' % stddev_or_shape_WEIBULL)
        return mean_or_scale_WEIBULL, stddev_or_shape_WEIBULL

    def generate_final_curve(self, distribution_type, mean_or_scale_final, stddev_or_shape_final):
        self.logger.debug('generate final degradation curve')
        #final_degradation_curve = []
        final_degradation_curve = dict()
        
        if distribution_type == 'WEIBULL':
            for age in range(0, 101, 1):
                failure_probablity_for_age = (1- math.exp(-((age / mean_or_scale_final) ** stddev_or_shape_final))) * 100  #cumulative density function of WEIBULL
                #final_degradation_curve.append([age, failure_probablity_for_age])
                final_degradation_curve[age] = failure_probablity_for_age
                
        elif distribution_type == 'NORMAL':
            for age in range(0, 101, 1):
                failure_probablity_for_age = norm(mean_or_scale_final, stddev_or_shape_final).cdf(age)  #cumulative density function of NORMAL
                #final_degradation_curve.append([(]age, failure_probablity_for_age])
                final_degradation_curve[age] = failure_probablity_for_age
        else:
            raise('distribution type is invalid')
        self.logger.debug('final_degradation_curve=\n%s' % final_degradation_curve)    
        # here return the list curve, to consider both old and new pipeline mode
        #return str(final_degradation_curve).strip('[]')
        return final_degradation_curve

## How to override base class
If you want to customize some functions in the model template, you can just override the function. For example **`DegradationCurveEstimator(BaseEstimator)`**, It is based on the base class **`BaseEstimator`** you can:
+ override the existing method in **`BaseEstimator`**, like **`train_model`** method
+ add new function to configure the algorithm

    def train_model(self, df):
        # parse statistics_distribution_args
        distribution_type = self.statistics_distribution_args["distribution_type"]
        mean_or_scale = self.statistics_distribution_args["mean_or_scale"]
        stddev_or_shape = self.statistics_distribution_args["stddev_or_shape"]

        # distribution should not be none
        if distribution_type is None:
            raise('distribution_type should be NORMAL or WEIBULL')
        
        degradation_curve_pipline = DegradationCurve(distribution_type, mean_or_scale, stddev_or_shape, self.installdate, self.decommissiondate)
        # using sample data to test degradation curve calculation
        #df_curve_training_data = degradation_curve_pipline.sample_data()
        # use real data from DB2
        df_curve_training_data = df
        self.logger.debug('df_curve_training_data=\n%s' % df_curve_training_data.head())

        final_degradation_curve = degradation_curve_pipline.fit(df_curve_training_data, distribution_type, mean_or_scale, stddev_or_shape)
        degradation_curve_model = dict()
        #degradation_curve_model["target"] =  "degradation_curve"
        degradation_curve_model["final_degradation_curve"] =  final_degradation_curve

        return degradation_curve_model

#### Base class `BaseEstimator`

In [None]:
class BaseEstimator(BaseEstimatorFunction):
    '''Base class for estimators, supporting training/prediction/scoring.

    Note that though the AS base class supports multiple targets per estimator, we 
    are using a single target per estimator for now. So this class assumes that the 
    given targets and predictions are always one-element arrays. Also, when 
    prediction is not needed, the passed-in predictions should be an array of one 
    element 'None'.
    '''
    def __init__(self, features, targets, predictions, features_for_training=None, label_names=None, **kwargs):
        super().__init__(features, targets, predictions)
        self.models = dict()
        self.model_extras = defaultdict(list)
        self.logger = get_logger(self)
        self.features_for_training = features_for_training
        self.label_names = label_names
        self.local_model = True
        self.model_timestamp = None
        self.training_timestamp = None

    def get_model_name(self, target_name, suffix=None):
        if suffix is None:
            suffix = self.model_timestamp
        return self.generate_model_name(target_name=target_name, prefix=None, suffix=suffix)
    
    def generate_model_name(self, target_name, prefix=None, suffix=None):
        name = ['apm', 'pmi', 'model']

        if prefix is not None:
            if isinstance(prefix, str):
                prefix = [prefix]
            if len(prefix) > 0:
                name += prefix
        name.extend([self._entity_type.logical_name, self.name, target_name])
        name = '/'.join(name)

        if suffix is not None:
            if isinstance(suffix, datetime):
                name += '_' + str(calendar.timegm(suffix.timetuple()))
            else:
                name += '_' + str(suffix)

        return name

    def _get_target_name(self):
        return '_' if (self.predictions is None or len(self.predictions) == 0) else self.predictions[0]

    def _load_model(self, bucket):
        model_name = self.get_model_name(target_name=self._get_target_name()) # load with default suffix timestamp
        return (model_name, self.load_model(model_name, bucket, self.local_model))

    def load_model(self, model_path, bucket, local):
        if local:
            # local FS
            model = None
            try:
                with open(model_path, mode='rb') as file:
                    model = file.read()
            except FileNotFoundError as e:
                pass
            return pickle.loads(model) if model is not None else None
        else:
            return self._entity_type.db.cos_load(filename=model_path, bucket=bucket, binary=True)

    def _save_model(self, bucket, new_model, suffix=None, local=True):
        filename = self.get_model_name(target_name=self._get_target_name(), suffix=suffix) # save with explicity suffix timestamp set

        objects = [(filename, new_model, True, True)] # model itself always pickle dumped and binary
        extras = self.get_model_extra(new_model, filename)
        objects.extend(extras)
        for fname, obj, picket_dump, binary in objects:
            self.save_model(obj, fname, bucket, picket_dump, binary, local)

        # add model to internal list for prediction usage
        self.models[filename] = new_model
        if len(extras) > 0:
            self.model_extras[filename].extend(extras)

    def save_model(self, new_model, model_path, bucket, pickle_dump, binary, local):
        if local:
            mode = 'w'
            if pickle_dump:
                new_model = pickle.dumps(new_model)
            if binary:
                mode += 'b'

            try:
                mkdirp(model_path)
            except:
                pass
            with open(model_path, mode=mode) as file:
                file.write(new_model)
        else:
            try:
                if pickle_dump:
                    self._entity_type.db.cos_save(persisted_object=new_model, filename=model_path, bucket=bucket, binary=binary)
                else:
                    # work-around to be able to not pickle save to cos
                    ret = self._entity_type.db.cos_client._cos_api_request('PUT', bucket=bucket, key=model_path, payload=new_model, binary=binary)
                    if ret is None:
                        self.logger.warn('Not able to PUT %s to COS bucket %s', (model_path, bucket))
            except requests.exceptions.ReadTimeout as err:
                self.logger.warn('timeout saving %s to cos: %s' % (model_path, err))

        self.logger.debug('saved %s' % model_path)

    def get_model_extra(self, new_model, model_path):
        '''Return extra objects to be saved along with the model as a list of (path, object, pickle_dump, binary) tuples.

        A normal estimator only has one model object to be saved to Cloud Object Storage. Some estimtors might want to save 
        other objects, possibly caching/deriving from the model object, for other usage. You can override 
        this method to return a list of such additional objects, in the form of (cos_path, object, pickle_dump, binary) tuple.

        It is recommended that you construct your extra object cos_path based on the given model_path, with 
        different suffix appended.
        '''
        return []

    def get_models_for_training(self, db, df, bucket=None):
        model_name, model = self._load_model(bucket=bucket)

        if model is not None:
            self.models[model_name] = model
            return []
        else:
            return [model]

    def get_models_for_predict(self, db, bucket=None):
        if len(self.predictions) == 0 or self.predictions[0] is None:
            return []
        else:
            return list(self.models.values())

    def conform_index(self,df,entity_id_col = None, timestamp_col = None):
        # workaround for avoiding base class adding columns
        return df

    def add_training_preprocessor(self, stage):
        self.add_preprocessor(stage)

    def execute_training_preprocessing(self, df):
        if len(self._preprocessors) == 0:
            return df
        else:
            return super().execute_preprocessing(df)

    def get_df_for_training(self, df):
        features = [] + self.features
        if self.features_for_training is not None:
            features.extend(self.features_for_training)

        df = df[features]
        df = df.reset_index(drop=True)

        return df

    def execute_train_test_split(self,df):
        # TODO disable splitting for now
        return (df, None)

    def get_df_for_prediction(self, df):
        df_for_prediction = df[self.features]
        self.logger.debug('df_for_prediction: %s' % log_df_info(df_for_prediction, head=5))
        # self.logger.debug('df_for_prediction: %s' % str(df_for_prediction.isna().any(axis='columns')))
        # df_for_prediction = df_for_prediction.dropna()
        # self.logger.debug('df_for_prediction_dropna: %s' % log_df_info(df_for_prediction, head=5))
        return df_for_prediction

    def predict(self, model, df):
        return list(zip(model.predict(df), model.predict_proba(df))) if model is not None else None

    def get_prediction_result_value_index(self):
        raise RuntimeError('required but not implemented')

    def process_prediction_result(self, df, prediction_result, model):
        self.logger.debug('prediction_result_length=%d, prediction_result=%s' % (len(prediction_result), str(prediction_result[:10])))

        if prediction_result is None:
            df[self.predictions[0]] = None

            self.logger.debug('No suitable model found. Created null predictions')
        else:
            for idx in self.get_prediction_result_value_index():
                if not all([isinstance(p, (tuple, list, np.ndarray)) for p in prediction_result]):
                    break

                prediction_result = [p[idx] for p in prediction_result]

            df[self.predictions[0]] = prediction_result

            self.logger.debug('final_prediction_result_length=%d, final_prediction_result=%s' % (len(prediction_result), str(prediction_result[:10])))

        return df

    def execute(self, df=None):
        self.logger.debug('df_input: %s' % log_df_info(df, head=5))

        self.logger.debug('self.model_timestamp=%s' % self.model_timestamp)

        db = self._entity_type.db
        bucket = self.get_bucket_name()

        self.training_timestamp = None

        # transform incoming data using any preprocessors
        # include whatever preprocessing stages are required by implementing a set_preprocessors method
        required_models = self.get_models_for_training(db=db, df=df, bucket=bucket)
        if len(required_models) > 0:   
            # Training

            # only do preprocessing and splitting once
            df_train = self.execute_training_preprocessing(df)
            df_train = self.get_df_for_training(df_train)
            self.logger.debug('df_train: %s' % log_df_info(df_train, head=5))
            # df_train, df_test = self.execute_train_test_split(df_train)

            for model in required_models:
                self.logger.info('training model: %s' % model) 

                new_model = self.train_model(df_train)

                self.training_timestamp = str(calendar.timegm(datetime.utcnow().timetuple()))

                # TODO add evaluation of the new model here, before saving it
                # if df_test is not None:
                #     new_model.test(df_test)                
                #     self.evaluate_and_write_model(new_model = new_model,
                #                                   current_model = model,
                #                                   db = db,
                #                                   bucket=bucket)

                self._save_model(bucket=bucket, new_model=new_model, suffix=self.training_timestamp, local=self.local_model)

                # switch to the new one just trained
                self.model_timestamp = self.training_timestamp
        elif self.model_timestamp is not None:
            self.training_timestamp = self.model_timestamp

        # Predictions

        df_for_prediction = None
        for idx, model in enumerate(self.get_models_for_predict(db=db, bucket=bucket)):        
            # TODO deal with multiple predictions
            if df_for_prediction is None:
                df_for_prediction = self.get_df_for_prediction(df)
            df = self.process_prediction_result(df, self.predict(model, df_for_prediction), model)

        if df_for_prediction is None:
            # no prediction needed, return empty df
            df = df[[]]

        self.logger.debug('df_final: %s' % log_df_info(df, head=5))

        return df