In [1]:
'''
Licensed Materials - Property of IBM
IBM Maximo APM - Predictive Maintenance Insights On-Premises
IBM Maximo APM - Predictive Maintenance Insights SaaS 
© Copyright IBM Corp. 2019 All Rights Reserved.
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.
'''

'\nLicensed Materials - Property of IBM\nIBM Maximo APM - Predictive Maintenance Insights On-Premises\nIBM Maximo APM - Predictive Maintenance Insights SaaS \n© Copyright IBM Corp. 2019 All Rights Reserved.\nUS Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.\n'

# Maximo APM PMI - Failure Probability Curve Model Template

1. [Introduction](#introduction)
2. [Install Maximo APM PMI SDK](#install-maximo-apm-pmi-sdk)
3. [Setup the Model Training Pipeline](#setup-model-training-pipline)
4. [Train the Model Instance](#train-model-instance)
5. [Register the Trained Model Instance](#register-trained-model-instance)
6. [Model Template Internals](#model-template-internals)

<a id='introduction'></a>
## Introduction
Statistically, to evaluate mean life of assets the sample mean or the average age method is acceptable if a big population has end-of-life information. But asset such as generators, transformers, reactors, cables, etc. have a relatively long life up to and even beyond 40 years and generally there are very limited end-of-life failure data. This algorithm is designed to address this use case (to estimate mean life with limited end-of-lift failure data). In fact the proposed algorithm works best when less than 20% of the assets has end-of-life failure data.

In this notebook, we will be predicting failure probability curve for a kind of assets. In the challenges of asset health assessment, the asset failure probability and its expected remaining life are the key aspects to analysis the asset health status.

The Failure Probability Curve model uses statistics distribution to assess the failure probability vs. year, and this model has two methods:
+ **Normal Distribution**: a small percentage of assets fail during the early life cycle, a few last beyond the average expected life span, but the majority fails within their mean life.
+ **Weibull Distribution**:
    \begin{cases} f(x;\lambda,k) =  \frac{k}{\lambda}\left(\frac{x}{\lambda}\right)^{k-1}e^{-(x/\lambda)^{k}} & x\geq0 ,\\ 0 & x<0, \end{cases}
    where k > 0 is the shape parameter and λ > 0 is the scale parameter.


<a id="install-maximo-apm-pmi-sdk"></a>
## Install Maximo APM PMI SDK

To install the SDK, you need your Maximo APM PMI instance ID, API base URL, and your API key. The Maximo APM PMI instance ID and API base URL can be found in the user welcome letter. For API key, request to your Maximo admin to create an user account first to generate one for you. Create one environment variable for each here.

In [2]:
%%capture
%env APM_ID=4ac3917e
%env APM_API_BASEURL=https://prod.pmi.apm.maximo.ibm.com
%env APM_API_KEY=dp7opk78635sbf809f07o4t53lum3c9eoaovpk9f

Then, install PMI SDK with `pip`. Note that we have to upgrade `pip` first.

In [3]:
!pip install -U pip~=18.1
!pip install pyspark
!pip install -U https://prod.pmi.apm.maximo.ibm.com/ibm/pmi/service/rest/ds/4ac3917e/dp7opk78635sbf809f07o4t53lum3c9eoaovpk9f/lib/download?filename=pmlib-1.0.0.tar.gz

Requirement already up-to-date: pip~=18.1 in /opt/conda/envs/Python36/lib/python3.6/site-packages (18.1)
Collecting https://prod.pmi.apm.maximo.ibm.com/ibm/pmi/service/rest/ds/4ac3917e/dp7opk78635sbf809f07o4t53lum3c9eoaovpk9f/lib/download?filename=pmlib-1.0.0.tar.gz
[?25l  Downloading https://prod.pmi.apm.maximo.ibm.com/ibm/pmi/service/rest/ds/4ac3917e/dp7opk78635sbf809f07o4t53lum3c9eoaovpk9f/lib/download?filename=pmlib-1.0.0.tar.gz (792kB)
[K    100% |████████████████████████████████| 798kB 60.0MB/s ta 0:00:01


Building wheels for collected packages: pmlib
  Running setup.py bdist_wheel for pmlib ... [?25ldone
[?25h  Stored in directory: /home/dsxuser/.tmp/pip-ephem-wheel-cache-_cvxcu45/wheels/27/e3/4f/8f4f27f0744ea9362917922e6990c31fc7ed89833aa12f6e3a
Successfully built pmlib
Installing collected packages: pmlib
  Found existing installation: pmlib 1.0.0
    Uninstalling pmlib-1.0.0:
      Successfully uninstalled pmlib-1.0.0
Successfully installed pmlib-1.0.0


<a id="setup-model-training-pipline"></a>
## Setup the Model Training Pipeline

Before you can start working on the model training pipeline, you have to setup an asset group properly in Maximo. See IBM Maximo APM - Predictive MaintenanceInsights SaaS User Guide for details.

Required model pipeline configuration:

* Asset group ID: The unit of model processing is an asset group. Asset groups are managed on Maximo APM UI. You need to get the ID of the asset group to be analyzed by this model.
* Asset installation date and decommission date as the label: This model requires asset installation date (Asset attribute **```installdate```** in Maximo) and asset decommission date (Asset attribute **```estendoflife```** in Maximo) to extract the latel for training.

Now you can setup a training pipeine based on this model template, with your own data, to train a model instance.

In [4]:
from pmlib.degradation_curve import DegradationCurveAssetGroupPipeline

group = DegradationCurveAssetGroupPipeline(
            asset_group_id='1016', 
            model_pipeline={
                # first feature for training must be asset installation date, and second the asset decommission date
                "features_for_training": [":installdate", ":estendoflife"],
                "statistics_distribution_args": {
                    "distribution_type": "WEIBULL", 
                    "mean_or_scale": None,
                    "stddev_or_shape": None
                }
            })



2020-03-30T09:46:33.870 pmlib.api.init_environ INFO APM_ID=4ac3917e, APM_API_BASEURL=https://prod.pmi.apm.maximo.ibm.com, APM_API_KEY=********
2020-03-30T09:46:33.872 pmlib.util.api_request INFO method=get, url=https://prod.pmi.apm.maximo.ibm.com/ibm/pmi/service/rest/ds/tenant?instanceId=4ac3917e, headers={'apmapitoken': '********'}, timeout=30, ssl_verify=True, json=None, session=None, kwargs={}
2020-03-30T09:46:35.561 pmlib.util.api_request INFO resp.status_code=200, method=get, url=https://prod.pmi.apm.maximo.ibm.com/ibm/pmi/service/rest/ds/tenant?instanceId=4ac3917e
2020-03-30T09:46:35.563 pmlib.api.init_environ DEBUG resp={
    "as_apikey": "********",
    "as_apitoken": "********",
    "as_id": null,
    "as_url": "https://api-us.connectedproducts.internetofthings.ibmcloud.com",
    "info": {
        "API_BASEURL": "https://api-us.connectedproducts.internetofthings.ibmcloud.com",
        "API_KEY": "********",
        "API_TOKEN": "********",
        "COS_BUCKET_KPI": "analytics-

2020-03-30T09:46:41.911 analytics_service.pmlib.loader.AssetLoader._validate_data_items DEBUG all_entity_types={'1016', 'IOT', 'IIOT', 'ZPMI', 'ASSET_CACHE', '1011', '1015'}
2020-03-30T09:46:41.914 analytics_service.pmlib.loader.AssetLoader._set_asset_device_mappings INFO input_asset_device_mappings=None, asset_device_mappings={}, entity_type_meta={}
2020-03-30T09:46:41.917 analytics_service.pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.__init__ DEBUG pipeline_config={'features': [], 'inputs': [':installdate', ':estendoflife'], 'renamed_inputs': ['installdate', 'estendoflife'], 'features_for_training': ['installdate', 'estendoflife'], 'targets': ['installdate', 'estendoflife'], 'predictions': [], 'features_resampled': {}, 'statistics_distribution_args': {'distribution_type': 'WEIBULL', 'mean_or_scale': None, 'stddev_or_shape': None}}
2020-03-30T09:46:41.921 analytics_service.pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.__init__ DEBUG post_processing=[], incre

The example above configured a pipeline for this model, using asset attribute **```installdate```** and **```estendoflife```** to extract the labels for training. This model only generates the failure probability curve and does not do scoring, hence no predictions defined.

<a id="train-model-instance"></a>
## Train the Model Instance

Now you can train the model instance.

In [5]:
df = group.execute()

2020-03-30T09:46:41.940 analytics_service.pmlib.cache_loader.AssetCacheRefresher.execute INFO start_ts=None, end_ts=None, entities=None
2020-03-30T09:46:47.503 pmlib.util.api_request INFO method=get, url=https://api-us.connectedproducts.internetofthings.ibmcloud.com/api/meta/v1/CTP-PMI-Democore-31/entityType/ASSET_CACHE, headers={'Content-Type': 'application/json', 'X-api-key': '********', 'X-api-token': '********', 'Cache-Control': 'no-cache'}, timeout=30, ssl_verify=True, json=None, session=None, kwargs={}
2020-03-30T09:46:48.233 pmlib.util.api_request INFO resp.status_code=200, method=get, url=https://api-us.connectedproducts.internetofthings.ibmcloud.com/api/meta/v1/CTP-PMI-Democore-31/entityType/ASSET_CACHE
2020-03-30T09:46:48.237 iotfunctions.metadata.__init__ DEBUG Initializing new entity type using iotfunctions 2.0.3
2020-03-30T09:46:48.238 iotfunctions.util.__init__ DEBUG Starting trace
2020-03-30T09:46:48.250 iotfunctions.util.__init__ DEBUG Trace name: auto_trace_ASSET_CACHE

2020-03-30T09:46:54.651 pmlib.util.api_request INFO method=post, url=https://prod.pmi.apm.maximo.ibm.com/ibm/pmi/service/rest/pmiboard/assetmeta?instanceId=4ac3917e, headers={'apmapitoken': '********'}, timeout=30, ssl_verify=True, json={'assets': [{'assetNum': 'ZIOT1001', 'siteId': 'WCM'}, {'assetNum': 'ZIOT1002', 'siteId': 'WCM'}, {'assetNum': 'ZIOT1003', 'siteId': 'WCM'}, {'assetNum': 'ZIOT1004', 'siteId': 'WCM'}, {'assetNum': 'ZIOT1005', 'siteId': 'WCM'}], 'attributes': ['estendoflife', 'installdate']}, session=None, kwargs={}
2020-03-30T09:46:56.517 pmlib.util.api_request INFO resp.status_code=200, method=post, url=https://prod.pmi.apm.maximo.ibm.com/ibm/pmi/service/rest/pmiboard/assetmeta?instanceId=4ac3917e
2020-03-30T09:46:56.532 analytics_service.pmlib.loader.AssetLoader.execute DEBUG df_loaded_asset_dimension=shape=(5, 3), index={0: 'int64'}, columns={'asset_id': 'O', 'estendoflife': '<M8[ns]', 'installdate': '<M8[ns]'}, head(5)=
            asset_id estendoflife installdate


2020-03-30T09:46:56.757 analytics_service.pmlib.degradation_curve.DegradationCurve.generate_normal_distribution DEBUG mean_or_scale_NORMAL=6.385270427417803
2020-03-30T09:46:56.758 analytics_service.pmlib.degradation_curve.DegradationCurve.generate_normal_distribution DEBUG stddev_or_shape_NORMAL=0.4482738804629455
2020-03-30T09:46:56.773 analytics_service.pmlib.degradation_curve.DegradationCurve.generate_weibull_distribution DEBUG WEIBULL_input_df_cfp=
   age  retired_number  exposed_number  pre_cumulative_probability  \
0  5.0            -1.0            -1.0                       0.001   
2  6.0             3.0             5.0                       0.600   
1  7.0            -1.0            -1.0                       0.000   

   cumulative_probability         z   pre_Szx   pre_Szz  
0                   0.001 -3.090232  2.230779  4.976375  
2                   0.601  0.255936  0.000000  1.244094  
1                   0.601  0.255936  1.115390  1.244094  
2020-03-30T09:46:56.783 analy

2020-03-30T09:46:58.412 analytics_service.pmlib.pipeline._ModelPipelineConfig.__init__ DEBUG features=[], features_for_training=['installdate', 'estendoflife'], predictions=[], features_resampled={}, inputs=[], renamed_inputs=[]
2020-03-30T09:46:58.414 analytics_service.pmlib.pipeline._ModelPipelineConfig.__init__ DEBUG kwargs={'statistics_distribution_args': {'distribution_type': 'WEIBULL', 'mean_or_scale': None, 'stddev_or_shape': None}}
2020-03-30T09:46:58.417 analytics_service.pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.execute DEBUG adjusted after model trained: loader_inputs=[], loader_names=[]
2020-03-30T09:46:58.419 analytics_service.pmlib.loader.AssetLoader._validate_mappings DEBUG asset_device_mappings={'ZIOT1001-____-WCM': ['IIOT:ZIOT1001'], 'ZIOT1002-____-WCM': ['IIOT:ZIOT1002'], 'ZIOT1003-____-WCM': ['IIOT:ZIOT1003'], 'ZIOT1004-____-WCM': ['IIOT:ZIOT1004'], 'ZIOT1005-____-WCM': ['IIOT:ZIOT1005']}
2020-03-30T09:46:58.420 analytics_service.pmlib.loader.AssetLo

Once this method completes successfully, you'll have a trained model instance reday (for next step, see below) and also with the prediction results returned as a dataframe for verification.

<a id="register-trained-model-instance"></a>
## Register the Trained Model Instance


If the trained model instance looks good, you can register it to Maximo APM PMI:

In [6]:
group.register()

2020-03-30T09:46:58.473 analytics_service.pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.register DEBUG target_pipeilne_class=pmlib.degradation_curve.DegradationCurveAssetGroupPipeline, url=None
2020-03-30T09:46:58.476 analytics_service.pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.register DEBUG catalog_config={'name': 'DegradationCurveAssetGroupPipeline', 'description': 'DegradationCurveAssetGroupPipeline', 'moduleAndTargetName': 'pmlib.degradation_curve.DegradationCurveAssetGroupPipeline', 'url': 'https://prod.pmi.apm.maximo.ibm.com/ibm/pmi/service/rest/ds/4ac3917e/dp7opk78635sbf809f07o4t53lum3c9eoaovpk9f/lib/download?filename=pmlib-1.0.0.tar.gz', 'category': 'TRANSFORMER', 'tags': [], 'output': [{'name': 'names', 'description': 'Provide a list of output names to be generated from the pipeline.', 'dataType': 'ARRAY', 'jsonSchema': {'minItems': 1, '$schema': 'http://json-schema.org/draft-07/schema#', 'type': 'array', 'items': {'type': 'string'}}, 'tags': []}]

2020-03-30T09:46:59.751 iotfunctions.metadata.register DEBUG found METRIC column eventtype
2020-03-30T09:46:59.751 iotfunctions.metadata.register DEBUG found METRIC column format
2020-03-30T09:46:59.756 iotfunctions.metadata.register DEBUG found METRIC column updated_utc
2020-03-30T09:47:00.440 iotfunctions.db.http_request DEBUG http request successful. status 200
2020-03-30T09:47:00.441 iotfunctions.metadata.register DEBUG Metadata registered for table apm_1016 
2020-03-30T09:47:00.444 analytics_service.pmlib.degradation_curve.DegradationCurveEstimator.get_model_extra DEBUG extras=[('apm/pmi/model/1016/DegradationCurveEstimator/__1585561618_json', '{"final_degradation_curve": {"0": 0.0, "1": 1.1199467892963222, "2": 5.229280561401184, "3": 12.535419066580033, "4": 22.59605403035132, "5": 34.52558474148053, "6": 47.20318770804353, "7": 59.50662868768457, "8": 70.51982488454898, "9": 79.66403791366682, "10": 86.72982252945589, "11": 91.82078100232512, "12": 95.2448624230252, "13": 97.39

2020-03-30T09:47:09.399 pmlib.util.api_request INFO resp.status_code=200, method=post, url=https://prod.pmi.apm.maximo.ibm.com/ibm/pmi/service/rest/ds/dp7opk78635sbf809f07o4t53lum3c9eoaovpk9f/1016/model?instanceId=4ac3917e
2020-03-30T09:47:09.401 analytics_service.pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.register DEBUG <Response [200]>
2020-03-30T09:47:09.405 analytics_service.pmlib.degradation_curve.DegradationCurveAssetGroupPipeline.register DEBUG model_instance_response={'modelInstanceId': '1063996D-01F4-49DD-9A8C-564AE30FA41F', 'message': 'APM-PMI-I-0005: Created', 'status': 0}


'1063996D-01F4-49DD-9A8C-564AE30FA41F'

Once registration succeeds, you can see this newly trained model instance available for the asset group on IBM Maximo APM - AHI.

<a id="model-template-internals"></a>
## Model Template Internals


#### Use Case Description

This model deals with computing failure probabilities vs. year of a kind of assets. Using this model one can answer the questions of the pattern: **What is the failure probability when the asset is N years old ?**

##### Input Data (from Maximo, PMI SDK will do this part)

+ Assets meta data: installation date & decommission date

##### Output

The output will be a list of asset's year vs. failure probability, which is saved in Cloud Object Storage.

##### Customization Points

+ The **`distribution_type`** in cell **`Setup the Model Training Pipeline`** is the distribution type, should be NORMAL or WEIBULL)
+ The **`mean_or_scale"`** and **`stddev_or_shape`** are the parameters for NORMAL or WEIBULL distribution. If not specifed, it will use assets' meta data to calculate the Failure Probability Curve. Otherwise it will generate the curve with specified parameters.
+ Failure Probability Curve algorithm (**`DegradationCurveEstimator`** Class in below model template code)
+ Model pipeline stages control (**`DegradationCurveAssetGroupPipeline`** Class in below model template code)

##### Model Workflow and Description (from **`degradation_curve_model.py`** in PMI SDK)

+ Step 1: Collect data of in-service years and retired years of the specific assets
+ Step 2: Calculate:
                  Retired (died) one: Age = “Retired year” – “In-service year”
                  Normal one: Age = “Current year” – “In-service year”
                  Exposed number: how many reactors service longer than the Age
+ Step 3: Calculate CFP table:
                  Failure Probability (FP) = retired number / exposed number
                  Cumulative FP (k) = Cumulative FP (k-1) + FP (k) 
                  Z values based on Cumulative FP
                  Create a table following the form like : | Age | Cumulative Failure Probability | z |
+ Step 4: Estimate the mean life and the standard deviation of the Normal Distribution
+ Step 5: Generate the NORMAL Degradation Curve
+ Step 6: based on CFP table to get survival table like: | Age | Survival Probability |
+ Step 7: Construct Maximum Likelihood function
+ Step 8: Calculate Initial shape and scale parameters for WEIBULL distribution
+ Step 9: Use gradient-decent method to get optimal shape and scale parameters

In [7]:
class DegradationCurveEstimator(BaseEstimator):
    def __init__(self, features, targets, predictions, statistics_distribution_args=None, **kwargs):
        super().__init__(features, targets, predictions, **kwargs)
        self.statistics_distribution_args = statistics_distribution_args
        self.installdate = kwargs['features_for_training'][0]
        self.decommissiondate = kwargs['features_for_training'][1]

    def train_model(self, df):
        # parse statistics_distribution_args
        distribution_type = self.statistics_distribution_args["distribution_type"]
        mean_or_scale = self.statistics_distribution_args["mean_or_scale"]
        stddev_or_shape = self.statistics_distribution_args["stddev_or_shape"]

        # distribution should not be none
        if distribution_type is None:
            raise('distribution_type should be NORMAL or WEIBULL')
        
        degradation_curve_pipline = DegradationCurve(distribution_type, mean_or_scale, stddev_or_shape, self.installdate, self.decommissiondate)
        # using sample data to test degradation curve calculation
        #df_curve_training_data = degradation_curve_pipline.sample_data()
        # use real data from DB2
        df_curve_training_data = df
        self.logger.debug('df_curve_training_data=\n%s' % df_curve_training_data.head())

        final_degradation_curve = degradation_curve_pipline.fit(df_curve_training_data, distribution_type, mean_or_scale, stddev_or_shape)
        degradation_curve_model = dict()
        #degradation_curve_model["target"] =  "degradation_curve"
        degradation_curve_model["final_degradation_curve"] =  final_degradation_curve

        return degradation_curve_model
    
    def get_model_extra(self, new_model, model_path):
        extras = []

        model_json_path = model_path + '_json'
        extras.append((model_json_path, json.dumps(new_model), False, False)) # no pickle dump, not binary

        self.logger.debug('extras=%s' % str(extras))

        return extras


class DegradationCurveAssetGroupPipeline(AssetGroupPipeline):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        self.model_template_name = 'Failure Probability Curve'

        self.fillna = None
        self.dropna = None

    def prepare_execute(self, pipeline, model_config):
        pipeline.add_stage(DegradationCurveEstimator(**model_config))

    @staticmethod
    def generate_sample_data(**kwargs):
        return generate_degradation_curve_data(**kwargs)

NameError: name 'BaseEstimator' is not defined

In [None]:
class DegradationCurve:
    '''
    The degradation curve supports NORMAL and WEIBULL distributions
    distribution_type: the distribution type, NORMAL or WEIBULL
    mean_or_scale: the mean value of NORMAL or the scale value of WEIBULL
    stddev_or_shape: the standard deviation value of NORMAL or the shape value of WEIBULL
    '''

    @classmethod
    def metadata(cls):
        return {
            'name': cls.__name__,
            'moduleAndTargetName': '%s.%s' % (cls.__module__, cls.__name__),
            'category': 'TRANSFORMER',
            'description': 'DegradationCurve',
            'input': [
            ],
            'output': [
            ],
            'tags': [
                'EVENT'
            ]
        }

    def __init__(self, data_items, statistics_distribution_args, degradation_curve, installdate, decommissiondate):
        self.logger = logging.getLogger('analytics_service.%s.%s' % (self.__module__, self.__class__.__name__))
        self.data_items = data_items
        self.statistics_distribution_args = statistics_distribution_args
        self.degradation_curve = degradation_curve
        self.installdate = installdate
        self.decommissiondate = decommissiondate

    def execute (self, df):
        self.logger.debug('input_df=\n%s' % df.head())

        df_original = df

        # pick only needed columns for scoring 
        df = df[list(set(self.data_items) - set(df.index.names))]
        self.logger.debug('input_df_pick_need_columns=\n%s' % df.head())

        sources_not_in_column=df.index.names
        df = df.reset_index()

        # parse statistics_distribution_args
        distribution_type = self.statistics_distribution_args["distribution_type"]
        mean_or_scale = self.statistics_distribution_args["mean_or_scale"]
        stddev_or_shape = self.statistics_distribution_args["stddev_or_shape"]

        # distribution should not be none
        if distribution_type is None:
            raise('distribution_type should be NORMAL or WEIBULL')

        # using sample data to test degradation curve calculation
        df_curve_training_data = self.sample_data()
        self.logger.debug('df_curve_training_data=\n%s' % df_curve_training_data.head())

        # fit the degradation curve
        df_score = df.astype({self._entity_type._timestamp: 'datetime64[ms]'})[[self._entity_type._df_index_entity_id, self._entity_type._timestamp]].copy()
        final_degradation_curve = self.fit(df_curve_training_data, distribution_type, mean_or_scale, stddev_or_shape)
        df_score[self.degradation_curve] = str(final_degradation_curve).strip('[]')
        self.logger.debug('df_score=\n%s' % df.head())

        df = df_original.merge(df_score, how='left', left_index=True, right_on=[self._entity_type._df_index_entity_id, self._entity_type._timestamp], sort=False)
        df = df.set_index(keys=sources_not_in_column)
        self.logger.debug('df_final=\n%s' % df.head())

        return df


    def fit(self, df_curve_training_data, distribution_type, mean_or_scale, stddev_or_shape):
        self.logger.debug('initial mean_or_scale=%s' % mean_or_scale)
        self.logger.debug('initial stddev_or_shape=%s' % stddev_or_shape)

        df_curve_training_data[self.installdate] = pd.to_datetime(df_curve_training_data[self.installdate]).dt.year
        df_curve_training_data[self.decommissiondate] = np.where(pd.notna(df_curve_training_data[self.decommissiondate]), pd.to_datetime(df_curve_training_data[self.decommissiondate]).dt.year, -1)
        df_curve_training_data = df_curve_training_data.astype({self.installdate: int, self.decommissiondate: int})
        self.logger.debug('df_curve_training_data=%s' % log_df_info(df_curve_training_data, head=-1))

        # initialize parameter
        mean_or_scale_final = 0.0
        stddev_or_shape_final = 0.0

        # if mean value and stddev value are specified by user, use them to generate the degradation curve
        if mean_or_scale is not None and stddev_or_shape is not None:
            self.logger.debug('generate degradation curve with user defined parameters')
            mean_or_scale_final = float(mean_or_scale)
            stddev_or_shape_final = float(stddev_or_shape)
        else:
            # generate NORMAL distribution by input data
            if distribution_type == 'NORMAL':
                self.logger.debug('calculate the mean value and stddev value for normal distribution...')
                mean_or_scale_final, stddev_or_shape_final, df_cfp = self.generate_normal_distribution(df_curve_training_data)
                self.logger.debug('calculate done')

            # generate WEIBULL distribution by input data
            if distribution_type == 'WEIBULL':   
                self.logger.debug('calculate the scale value and shape value for normal distribution...')
                mean_or_scale_NORMAL, stddev_or_shape_NORMAL, df_cfp = self.generate_normal_distribution(df_curve_training_data)
                mean_or_scale_final, stddev_or_shape_final = self.generate_weibull_distribution(mean_or_scale_NORMAL, stddev_or_shape_NORMAL, df_cfp)
                self.logger.debug('calculate done')

        # return the final curve
        return self.generate_final_curve(distribution_type, mean_or_scale_final, stddev_or_shape_final)
                
    def generate_normal_distribution(self, df_curve_training_data):
        # df_curve_training_data: [assetId, installationDate, removeDate]
        # step1. pre-processing
        date_service = df_curve_training_data[self.decommissiondate] - df_curve_training_data[self.installdate]
        df_curve_training_data['retired_flag'] = np.where(date_service>=0, 1, 0)
        df_curve_training_data['date_service'] = np.where(date_service>=0, date_service, 2000 - df_curve_training_data[self.installdate])  # should be datetime.datetime.now().year, 2000 as test year for sample data
        self.logger.debug('df_curve_training_data_pre_processing=\n%s' % df_curve_training_data)

        # step2. calculate the exposed table
        df_exposed_table = df_curve_training_data.groupby(['date_service']).agg({'retired_flag':'sum', 'date_service':'count'})
        df_exposed_table.rename(columns={'date_service': 'pre_exposed_number'}, inplace=True)
        df_exposed_table.sort_index(inplace=True, ascending=False)
        
        df_exposed_table['exposed_number'] = df_exposed_table.pre_exposed_number.cumsum()
        del df_exposed_table['pre_exposed_number']
        df_exposed_table = df_exposed_table.reset_index()
        df_exposed_table.rename(columns={'retired_flag':'retired_number', 'date_service':'age'}, inplace=True)
        self.logger.debug('df_exposed_table=\n%s' % df_exposed_table.head(200))

        # step3. calculate cumulative failure probablity (cfp) table
        # df_exposed_table: [age, retired_number, exposed_number]
        max_age = df_exposed_table['age'].max()
        
        
        
        max_age_idx = df_exposed_table[df_exposed_table['age'] == max_age].index.values.astype(int)[0] # 0, first row
        #print('------------ ' + str(max_age_idx))
        a1 = (df_exposed_table.loc[max_age_idx, 'retired_number']) 
        a2 = (df_exposed_table.loc[max_age_idx, 'exposed_number'])
        if (a1 == a2) : 
            print ("Number of exposed assets = number of retired assets in max_age, meaning all the asset observed failed at max_age")
            # If max_age-1 exist in the df, drop the max_age row as outlier - some asset may survive after max_age year, we just need enough time to observe.
            # If max_age-1 doesn't exist in the df, modify max_age row to max_age-1
            if ( (max_age-1) == df_exposed_table.loc[max_age_idx+1, 'age'] ):
                #print ("max_age-1 is in !!! drop max_age row")
                df_exposed_table = df_exposed_table.drop(df_exposed_table.index[max_age_idx]) 
            else:    
                df_exposed_table.loc[max_age_idx, 'retired_number'] = 0
                df_exposed_table.loc[max_age_idx, 'age'] = max_age - 1
        #print('df_exposed_table 2 =\n%s' % df_exposed_table.head(200))        
        #print('------------')
        
        
        
        df_cfp = df_exposed_table[df_exposed_table['retired_number'] !=0]
        self.logger.debug('df_cfp_input=\n%s' % df_cfp.head())
        df_cfp['pre_cumulative_probability'] = df_cfp['retired_number'] / df_cfp['exposed_number']
        cfp_first_line = [df_cfp['age'].min()-1, -1, -1, 0.001]
        cfp_last_line = [max_age, -1, -1, 0]
        df_cfp = pd.DataFrame(np.array([cfp_first_line, cfp_last_line]), columns=['age', 'retired_number', 'exposed_number', 'pre_cumulative_probability']).append(df_cfp, ignore_index=True)
        df_cfp.sort_values('age', inplace=True)
        df_cfp['cumulative_probability'] = df_cfp.pre_cumulative_probability.cumsum()
        df_cfp['z'] = norm.ppf(df_cfp['cumulative_probability'])
        self.logger.debug('df_cfp=\n%s' % df_cfp.head(200))

        # step4. estimate optimal mean value and stddev value
        #df_cfp: [age, retired_number, exposed_number, pre_cumulative_probability, cumulative_probability, z]
        age_mean = df_cfp['age'].mean()
        z_mean = df_cfp['z'].mean()
        df_cfp['pre_Szx'] = (df_cfp['z'] - df_cfp['z'].mean()) * (df_cfp['age'] - df_cfp['age'].mean())
        df_cfp['pre_Szz'] = (df_cfp['z'] - df_cfp['z'].mean()) * (df_cfp['z'] - df_cfp['z'].mean())
        Szx = df_cfp['pre_Szx'].sum()
        Szz = df_cfp['pre_Szz'].sum()
        stddev_or_shape_NORMAL = Szx / Szz
        mean_or_scale_NORMAL = age_mean - stddev_or_shape_NORMAL * z_mean
        self.logger.debug('mean_or_scale_NORMAL=\n%s' % mean_or_scale_NORMAL)
        self.logger.debug('stddev_or_shape_NORMAL=\n%s' % stddev_or_shape_NORMAL)
        return mean_or_scale_NORMAL, stddev_or_shape_NORMAL, df_cfp

    def generate_weibull_distribution(self, mean_or_scale_NORMAL, stddev_or_shape_NORMAL, df_cfp):
        # step1. get the survival table based on cfp table
        self.logger.debug('WEIBULL_input_df_cfp=\n%s' % df_cfp)
        df_cfp['survival_probablity'] = np.where(df_cfp['age'] == df_cfp['age'].min(), 1, 1 - df_cfp['cumulative_probability'])
        df_cfp_survival = df_cfp.reset_index()
        self.logger.debug('df_cfp_survival=\n%s' % df_cfp_survival)

        # step2. get initial alpha (scale) and beta (shape)
        # initial beta
        initial_beta = 0.0
        beta_criteia = sys.float_info.max
        for beta in np.arange(0.1, 100, 0.001):
            beta_estimation = ((1+2/beta)**(0.5+2/beta) * math.exp(-(1+2/beta)) * (1 + 1/12/(1+2/beta))) \
                                      / ((1+1/beta)**(1+2/beta) * math.exp(-(2+2/beta)) * (1+1/12/(1+1/beta))**2 * math.sqrt(2*math.pi)) 
            beta_criteia_now = abs(beta_estimation - (1 + stddev_or_shape_NORMAL ** 2 / mean_or_scale_NORMAL ** 2))
            if beta_criteia_now < beta_criteia:
                beta_criteia = beta_criteia_now
                initial_beta = beta
        self.logger.debug('initial_beta=\n%s' % initial_beta)
        # initial alpha
        initial_alpha = math.sqrt(stddev_or_shape_NORMAL**2 / ( math.sqrt(2*math.pi)*((1+2/initial_beta)**(0.5+2/initial_beta) \
                                    * math.exp(-(1+2/initial_beta)) * (1 + 1/12/(1+2/initial_beta))) - (2*math.pi*(1+1/initial_beta)**(1+2/initial_beta) \
                                    * math.exp(-(2+2/initial_beta)) * (1+1/12/(1+1/initial_beta))**2) ) )
        self.logger.debug('initial_alpha=\n%s' % initial_alpha)

        # step3. use gradient decent method to search optimal alpha and beta
        # initialization
        age_array = df_cfp_survival['age'].tolist()
        SP_array = df_cfp_survival['survival_probablity'].tolist() # SP for survivial probability
        rowcount = len(age_array)
        alpha = initial_alpha # initial value of alpha
        beta = initial_beta # initial value of beta
        eps = 0.01 # step length
        precision = 0.003 # stop criteria, it is not set too small to avoid overfitting
        performance_objective = 0 # performance object function
        gradient_alpha = 0
        gradient_beta = 0
        err_objective_iter = 0
        gradient_alpha_iter = 0
        gradient_beta_iter = 0
        
        # initial value for performance objective function
        for i in range(0,rowcount):       
            err_objective_iter = err_objective_iter + math.pow(math.log(SP_array[i]) + math.pow((age_array[i] / alpha), beta),2) 
        performance_objective = err_objective_iter
        err_objective_iter = 0
        self.logger.debug("the init performance objective function vaule is: " + str(performance_objective))

        # main part of gradient decent
        iteration = 0
        while abs(performance_objective) > precision:          
            for i in range(0,rowcount):
                err_objective_iter = err_objective_iter + math.pow(math.log(SP_array[i]) + math.pow((age_array[i] / alpha), beta), 2)
                gradient_alpha_iter = gradient_alpha_iter + 2 * (math.log(SP_array[i]) + math.pow((age_array[i] / alpha), beta)) * beta * math.pow((age_array[i] / alpha), beta-1) * (-age_array[i] / alpha / alpha)
                gradient_beta_iter = gradient_beta_iter + 2 * (math.log(SP_array[i]) + math.pow((age_array[i] / alpha), beta)) * math.pow((age_array[i] / alpha), beta) * math.log(age_array[i] / alpha)
            gradient_alpha = gradient_alpha_iter
            gradient_alpha_iter = 0
            gradient_beta = gradient_beta_iter
            gradient_beta_iter = 0
            performance_objective = err_objective_iter
            performance_objective_iter = 0
            alpha = alpha - eps * gradient_alpha # gradient decent
            beta = beta - eps * gradient_beta # gradient decent          
            iteration = iteration + 1
            #print(iteration)
            if iteration == 1000 :
                break
        self.logger.debug("the optimal alpha is " + str(alpha))
        self.logger.debug("the optimal beta is " + str(beta))

        mean_or_scale_WEIBULL = alpha
        stddev_or_shape_WEIBULL = beta
        self.logger.debug('mean_or_scale_WEIBULL=\n%s' % mean_or_scale_WEIBULL)
        self.logger.debug('stddev_or_shape_WEIBULL=\n%s' % stddev_or_shape_WEIBULL)
        return mean_or_scale_WEIBULL, stddev_or_shape_WEIBULL

    def generate_final_curve(self, distribution_type, mean_or_scale_final, stddev_or_shape_final):
        self.logger.debug('generate final degradation curve')
        #final_degradation_curve = []
        final_degradation_curve = dict()
        
        if distribution_type == 'WEIBULL':
            for age in range(0, 101, 1):
                failure_probablity_for_age = (1- math.exp(-((age / mean_or_scale_final) ** stddev_or_shape_final))) * 100  #cumulative density function of WEIBULL
                #final_degradation_curve.append([age, failure_probablity_for_age])
                final_degradation_curve[age] = failure_probablity_for_age
                
        elif distribution_type == 'NORMAL':
            for age in range(0, 101, 1):
                failure_probablity_for_age = norm(mean_or_scale_final, stddev_or_shape_final).cdf(age)  #cumulative density function of NORMAL
                #final_degradation_curve.append([(]age, failure_probablity_for_age])
                final_degradation_curve[age] = failure_probablity_for_age
        else:
            raise('distribution type is invalid')
        self.logger.debug('final_degradation_curve=\n%s' % final_degradation_curve)    
        # here return the list curve, to consider both old and new pipeline mode
        #return str(final_degradation_curve).strip('[]')
        return final_degradation_curve

## How to override base class
If you want to customize some functions in the model template, you can just override the function. For example **`DegradationCurveEstimator(BaseEstimator)`**, It is based on the base class **`BaseEstimator`** you can:
+ override the existing method in **`BaseEstimator`**, like **`train_model`** method
+ add new function to configure the algorithm

    def train_model(self, df):
        # parse statistics_distribution_args
        distribution_type = self.statistics_distribution_args["distribution_type"]
        mean_or_scale = self.statistics_distribution_args["mean_or_scale"]
        stddev_or_shape = self.statistics_distribution_args["stddev_or_shape"]

        # distribution should not be none
        if distribution_type is None:
            raise('distribution_type should be NORMAL or WEIBULL')
        
        degradation_curve_pipline = DegradationCurve(distribution_type, mean_or_scale, stddev_or_shape, self.installdate, self.decommissiondate)
        # using sample data to test degradation curve calculation
        #df_curve_training_data = degradation_curve_pipline.sample_data()
        # use real data from DB2
        df_curve_training_data = df
        self.logger.debug('df_curve_training_data=\n%s' % df_curve_training_data.head())

        final_degradation_curve = degradation_curve_pipline.fit(df_curve_training_data, distribution_type, mean_or_scale, stddev_or_shape)
        degradation_curve_model = dict()
        #degradation_curve_model["target"] =  "degradation_curve"
        degradation_curve_model["final_degradation_curve"] =  final_degradation_curve

        return degradation_curve_model

#### Base class `BaseEstimator`

In [None]:
class BaseEstimator(BaseEstimatorFunction):
    '''Base class for estimators, supporting training/prediction/scoring.

    Note that though the AS base class supports multiple targets per estimator, we 
    are stick to single target per estimator for now. So this class assume the 
    given targets and predictions are always of an 1-element array. Also, when 
    prediction is not needed, the passed in preditions should be an array of one 
    element 'None'.
    '''
    def __init__(self, features, targets, predictions, features_for_training=None, label_names=None, **kwargs):
        super().__init__(features, targets, predictions)
        self.models = dict()
        self.model_extras = defaultdict(list)
        self.logger = get_logger(self)
        self.features_for_training = features_for_training
        self.label_names = label_names
        self.local_model = True
        self.model_timestamp = None
        self.training_timestamp = None

    def get_model_name(self, target_name, suffix=None):
        if suffix is None:
            suffix = self.model_timestamp
        return self.generate_model_name(target_name=target_name, prefix=None, suffix=suffix)
    
    def generate_model_name(self, target_name, prefix=None, suffix=None):
        name = ['apm', 'pmi', 'model']

        if prefix is not None:
            if isinstance(prefix, str):
                prefix = [prefix]
            if len(prefix) > 0:
                name += prefix
        name.extend([self._entity_type.logical_name, self.name, target_name])
        name = '/'.join(name)

        if suffix is not None:
            if isinstance(suffix, datetime):
                name += '_' + str(calendar.timegm(suffix.timetuple()))
            else:
                name += '_' + str(suffix)

        return name

    def _get_target_name(self):
        return '_' if (self.predictions is None or len(self.predictions) == 0) else self.predictions[0]

    def _load_model(self, bucket):
        model_name = self.get_model_name(target_name=self._get_target_name()) # load with default suffix timestamp
        return (model_name, self.load_model(model_name, bucket, self.local_model))

    def load_model(self, model_path, bucket, local):
        if local:
            # local FS
            model = None
            try:
                with open(model_path, mode='rb') as file:
                    model = file.read()
            except FileNotFoundError as e:
                pass
            return pickle.loads(model) if model is not None else None
        else:
            return self._entity_type.db.cos_load(filename=model_path, bucket=bucket, binary=True)

    def _save_model(self, bucket, new_model, suffix=None, local=True):
        filename = self.get_model_name(target_name=self._get_target_name(), suffix=suffix) # save with explicity suffix timestamp set

        objects = [(filename, new_model, True, True)] # model itself always pickle dumped and binary
        extras = self.get_model_extra(new_model, filename)
        objects.extend(extras)
        for fname, obj, picket_dump, binary in objects:
            self.save_model(obj, fname, bucket, picket_dump, binary, local)

        # add model to internal list for prediction usage
        self.models[filename] = new_model
        if len(extras) > 0:
            self.model_extras[filename].extend(extras)

    def save_model(self, new_model, model_path, bucket, pickle_dump, binary, local):
        if local:
            mode = 'w'
            if pickle_dump:
                new_model = pickle.dumps(new_model)
            if binary:
                mode += 'b'

            try:
                mkdirp(model_path)
            except:
                pass
            with open(model_path, mode=mode) as file:
                file.write(new_model)
        else:
            try:
                if pickle_dump:
                    self._entity_type.db.cos_save(persisted_object=new_model, filename=model_path, bucket=bucket, binary=binary)
                else:
                    # work-around to be able to not pickle save to cos
                    ret = self._entity_type.db.cos_client._cos_api_request('PUT', bucket=bucket, key=model_path, payload=new_model, binary=binary)
                    if ret is None:
                        self.logger.warn('Not able to PUT %s to COS bucket %s', (model_path, bucket))
            except requests.exceptions.ReadTimeout as err:
                self.logger.warn('timeout saving %s to cos: %s' % (model_path, err))

        self.logger.debug('saved %s' % model_path)

    def get_model_extra(self, new_model, model_path):
        '''Return extra objects to be saved along with the model as a list of (path, object, pickle_dump, binary) tuplies.

        Normal estimator only has one model object to be saved to COS. Some estimtors might want to save 
        other objects, possibly caching/deriving from the model object, for other usage. You can override 
        this method to return a list of such additional objects, in the form of (cos_path, object, pickle_dump, binary) tuple.

        It is recommended to construct your extra object cos_path based on the given model_path, with 
        different suffix appended.
        '''
        return []

    def get_models_for_training(self, db, df, bucket=None):
        model_name, model = self._load_model(bucket=bucket)

        if model is not None:
            self.models[model_name] = model
            return []
        else:
            return [model]

    def get_models_for_predict(self, db, bucket=None):
        if len(self.predictions) == 0 or self.predictions[0] is None:
            return []
        else:
            return list(self.models.values())

    def conform_index(self,df,entity_id_col = None, timestamp_col = None):
        # workaround for avoiding base class adding columns
        return df

    def add_training_preprocessor(self, stage):
        self.add_preprocessor(stage)

    def execute_training_preprocessing(self, df):
        if len(self._preprocessors) == 0:
            return df
        else:
            return super().execute_preprocessing(df)

    def get_df_for_training(self, df):
        features = [] + self.features
        if self.features_for_training is not None:
            features.extend(self.features_for_training)

        df = df[features]
        df = df.reset_index(drop=True)

        return df

    def execute_train_test_split(self,df):
        # TODO disable splitting for now
        return (df, None)

    def get_df_for_prediction(self, df):
        df_for_prediction = df[self.features]
        self.logger.debug('df_for_prediction: %s' % log_df_info(df_for_prediction, head=5))
        # self.logger.debug('df_for_prediction: %s' % str(df_for_prediction.isna().any(axis='columns')))
        # df_for_prediction = df_for_prediction.dropna()
        # self.logger.debug('df_for_prediction_dropna: %s' % log_df_info(df_for_prediction, head=5))
        return df_for_prediction

    def predict(self, model, df):
        return list(zip(model.predict(df), model.predict_proba(df))) if model is not None else None

    def get_prediction_result_value_index(self):
        raise RuntimeError('required but not implemented')

    def process_prediction_result(self, df, prediction_result, model):
        self.logger.debug('prediction_result_length=%d, prediction_result=%s' % (len(prediction_result), str(prediction_result[:10])))

        if prediction_result is None:
            df[self.predictions[0]] = None

            self.logger.debug('No suitable model found. Created null predictions')
        else:
            for idx in self.get_prediction_result_value_index():
                if not all([isinstance(p, (tuple, list, np.ndarray)) for p in prediction_result]):
                    break

                prediction_result = [p[idx] for p in prediction_result]

            df[self.predictions[0]] = prediction_result

            self.logger.debug('final_prediction_result_length=%d, final_prediction_result=%s' % (len(prediction_result), str(prediction_result[:10])))

        return df

    def execute(self, df=None):
        self.logger.debug('df_input: %s' % log_df_info(df, head=5))

        self.logger.debug('self.model_timestamp=%s' % self.model_timestamp)

        db = self._entity_type.db
        bucket = self.get_bucket_name()

        self.training_timestamp = None

        # transform incoming data using any preprocessors
        # include whatever preprocessing stages are required by implementing a set_preprocessors method
        required_models = self.get_models_for_training(db=db, df=df, bucket=bucket)
        if len(required_models) > 0:   
            # Training

            # only do preprocessing and splitting once
            df_train = self.execute_training_preprocessing(df)
            df_train = self.get_df_for_training(df_train)
            self.logger.debug('df_train: %s' % log_df_info(df_train, head=5))
            # df_train, df_test = self.execute_train_test_split(df_train)

            for model in required_models:
                self.logger.info('training model: %s' % model) 

                new_model = self.train_model(df_train)

                self.training_timestamp = str(calendar.timegm(datetime.utcnow().timetuple()))

                # TODO add evaluation of the new model here, before saving it
                # if df_test is not None:
                #     new_model.test(df_test)                
                #     self.evaluate_and_write_model(new_model = new_model,
                #                                   current_model = model,
                #                                   db = db,
                #                                   bucket=bucket)

                self._save_model(bucket=bucket, new_model=new_model, suffix=self.training_timestamp, local=self.local_model)

                # switch to the new one just trained
                self.model_timestamp = self.training_timestamp
        elif self.model_timestamp is not None:
            self.training_timestamp = self.model_timestamp

        # Predictions

        df_for_prediction = None
        for idx, model in enumerate(self.get_models_for_predict(db=db, bucket=bucket)):        
            # TODO deal with multiple predictions
            if df_for_prediction is None:
                df_for_prediction = self.get_df_for_prediction(df)
            df = self.process_prediction_result(df, self.predict(model, df_for_prediction), model)

        if df_for_prediction is None:
            # no prediction needed, return empty df
            df = df[[]]

        self.logger.debug('df_final: %s' % log_df_info(df, head=5))

        return df