# **Amazon Lookout for Equipment** - Demonstration on an anonymized expander dataset
*Part 5: Scheduling regular inference calls*

In [None]:
BUCKET = '<YOUR_BUCKET_NAME_HERE>'
PREFIX = 'data/scheduled_inference'

## Initialization
---
In this notebook, we will update the repository structure to add an inference directory in the data folder:
```
/lookout-equipment-demo
|
+-- data/
|   |
|   +-- inference/
|   |   |
|   |   |-- input/
|   |   |
|   |   \-- output/
|   |
|   +-- labelled-data/
|   |   \-- labels.csv
|   |
|   \-- training-data/
|       \-- expander/
|           |-- subsystem-01
|           |   \-- subsystem-01.csv
|           |
|           |-- subsystem-02
|           |   \-- subsystem-02.csv
|           |
|           |-- ...
|           |
|           \-- subsystem-24
|               \-- subsystem-24.csv
|
+-- dataset/
|   |-- labels.csv
|   |-- tags_description.csv
|   |-- timeranges.txt
|   \-- timeseries.zip
|
+-- notebooks/
|   |-- 1_data_preparation.ipynb
|   |-- 2_dataset_creation.ipynb
|   |-- 3_model_training.ipynb
|   |-- 4_model_evaluation.ipynb
|   \-- 5_inference_scheduling.ipynb        <<< This notebook <<<
|
+-- utils/
    |-- lookout_equipment_utils.py
    \-- lookoutequipment.json
```

### Imports

In [None]:
%%sh
pip -q install --upgrade pip
pip -q install --upgrade awscli boto3 sagemaker
aws configure add-model --service-model file://../utils/lookoutequipment.json --service-name lookoutequipment

In [None]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [None]:
import boto3
import datetime
import os
import pandas as pd
import pprint
import pyarrow as pa
import pyarrow.parquet as pq
import sagemaker
import s3fs
import sys
import time
import uuid
import warnings

# Helper functions for managing Lookout for Equipment API calls:
sys.path.append('../utils')
import lookout_equipment_utils as lookout

### Parameters

In [None]:
warnings.filterwarnings('ignore')

DATA       = os.path.join('..', 'data')
RAW_DATA   = os.path.join('..', 'dataset')
INFER_DATA = os.path.join(DATA, 'inference')


os.makedirs(os.path.join(INFER_DATA, 'input'), exist_ok=True)
os.makedirs(os.path.join(INFER_DATA, 'output'), exist_ok=True)

ROLE_ARN = sagemaker.get_execution_role()
REGION_NAME = boto3.session.Session().region_name

In [None]:
lookout_client = lookout.get_client(region_name=REGION_NAME)

## Create an inference scheduler
---
While navigating to the model details part of the console, you will see that you have no inference scheduled yet:

![Schedule Starting point](../assets/schedule_start.png)

### Scheduler configuration
Let's create a new inference schedule: some parameters are mandatory, while others offer some added flexibility.

#### Parameters

* Set `DATA_UPLOAD_FREQUENCY` at which the data will be uploaded for inference. Allowed values are `PT5M`, `PT10M`, `PT15M`, `PT30M` and `PT1H`. This is both the frequency of the inference scheduler and how often data are uploaded to the source bucket
* Set `INFERENCE_DATA_SOURCE_BUCKET` to the S3 bucket of your inference data
* Set `INFERENCE_DATA_SOURCE_PREFIX` to the S3 prefix of your inference data
* Set `INFERENCE_DATA_OUTPUT_BUCKET` to the S3 bucket where you want inference results
* Set `INFERENCE_DATA_OUTPUT_PREFIX` to the S3 prefix where you want inference results
* Set `ROLE_ARN_FOR_INFERENCE` to the role to be used to **read** data to infer on and **write** inference output

In [None]:
# Name of the inference scheduler you want to create
INFERENCE_SCHEDULER_NAME = 'lookout-demo-model-v1-scheduler'

# Name of the model on which you want to create this inference scheduler
MODEL_NAME_FOR_CREATING_INFERENCE_SCHEDULER = 'lookout-demo-model-v1'

# Mandatory parameters:
INFERENCE_DATA_SOURCE_BUCKET = BUCKET
INFERENCE_DATA_SOURCE_PREFIX = f'{PREFIX}/input/'
INFERENCE_DATA_OUTPUT_BUCKET = BUCKET
INFERENCE_DATA_OUTPUT_PREFIX = f'{PREFIX}/output/'
ROLE_ARN_FOR_INFERENCE = ROLE_ARN
DATA_UPLOAD_FREQUENCY = 'PT5M'

#### Optional parameters

* Set `DATA_DELAY_OFFSET_IN_MINUTES` to the number of minutes you expect the data to be delayed to upload. It's a time buffer to upload data.
* Set ``INPUT_TIMEZONE_OFFSET``. The allow values are : +00:00, +00:30, -01:00, ... +11:30, +12:00, -00:00, -00:30, -01:00, ... -11:30, -12:00
* Set `TIMESTAMP_FORMAT`. The allowed values `EPOCH`, `yyyy-MM-dd-HH-mm-ss` or `yyyyMMddHHmmss`. This is the format of timestamp which is the suffix of the input data file name. This is used by Lookout Equipment to understand which files to run inference on (so that you don't need to remove previous files to let the scheduler finds which one to run on).
* Set `COMPONENT_TIMESTAMP_DELIMITER`. The allowed values `-`, `_` or ` `. This is the delimiter character that is used to separate the component from the timestamp in the input filename.

In [None]:
DATA_DELAY_OFFSET_IN_MINUTES = None
INPUT_TIMEZONE_OFFSET = '+00:00'
COMPONENT_TIMESTAMP_DELIMITER = '_'
TIMESTAMP_FORMAT = 'yyyyMMddHHmmss'

### Create the inference scheduler
The CreateInferenceScheduler API creates a scheduler **and** starts it: this means that this starts costing you right away. However, you can stop and start an existing scheduler at will (see at the end of this notebook):

In [None]:
scheduler = lookout.LookoutEquipmentScheduler(
    scheduler_name=INFERENCE_SCHEDULER_NAME,
    model_name=MODEL_NAME_FOR_CREATING_INFERENCE_SCHEDULER,
    region_name=REGION_NAME
)

scheduler_params = {
    'input_bucket': INFERENCE_DATA_SOURCE_BUCKET,
    'input_prefix': INFERENCE_DATA_SOURCE_PREFIX,
    'output_bucket': INFERENCE_DATA_OUTPUT_BUCKET,
    'output_prefix': INFERENCE_DATA_OUTPUT_PREFIX,
    'role_arn': ROLE_ARN_FOR_INFERENCE,
    'upload_frequency': DATA_UPLOAD_FREQUENCY,
    'delay_offset': DATA_DELAY_OFFSET_IN_MINUTES,
    'timezone_offset': INPUT_TIMEZONE_OFFSET,
    'component_delimiter': COMPONENT_TIMESTAMP_DELIMITER,
    'timestamp_format': TIMESTAMP_FORMAT
}

scheduler.set_parameters(**scheduler_params)

## Prepare the inference data
---
Let's prepare and send some data in the S3 input location our scheduler will monitor:

In [None]:
# Let's load all our original signals:
all_tags_fname = os.path.join(DATA, 'training-data', 'expander.parquet')
table = pq.read_table(all_tags_fname)
all_tags_df = table.to_pandas()
del table
all_tags_df.head()

Let's load the tags description: this dataset comes with a tag description file including:

* `Tag`: the tag name as it is recorded by the customer in his historian system (for instance the [Honeywell process history database](https://www.honeywellprocess.com/en-US/explore/products/advanced-applications/uniformance/Pages/uniformance-phd.aspx))
* `UOM`: the unit of measure for the recorded signal
* `Subsystem`: an ID linked to the part of the asset this sensor is attached to

From there, we can collect the list of components (subsystem column):

In [None]:
tags_description_fname = os.path.join(RAW_DATA, 'tags_description.csv')
tags_description_df = pd.read_csv(tags_description_fname)
components = tags_description_df['Subsystem'].unique()
tags_description_df.head()

To build our sample inference dataset, we will extract the last few minutes of the evaluation period of the original time series:

In [None]:
# How many sequences do we want to extract:
num_sequences = 3

# The scheduling frequency in minutes:
frequency = 5

# Loops through each sequence:
start = all_tags_df.index.max() + datetime.timedelta(minutes=-frequency * (num_sequences) + 1)
for i in range(num_sequences):
    end = start + datetime.timedelta(minutes=+frequency - 1)
    
    # Rounding time to the previous 5 minutes:
    tm = datetime.datetime.now()
    tm = tm - datetime.timedelta(
        minutes=tm.minute % frequency,
        seconds=tm.second,
        microseconds=tm.microsecond
    )
    tm = tm + datetime.timedelta(minutes=+frequency * (i))
    current_timestamp = (tm).strftime(format='%Y%m%d%H%M%S')

    # For each sequence, we need to loop through all components:
    print(f'Extracting data from {start} to {end}:')
    new_index = None
    for component in components:
        # Extracting the dataframe for this component and this particular time range:
        signals = list(tags_description_df.loc[(tags_description_df['Subsystem'] == component), 'Tag'])
        signals_df = all_tags_df.loc[start:end, signals]
        
        # We need to reset the index to match the time 
        # at which the scheduler will run inference:
        if new_index is None:
            new_index = pd.date_range(
                start=tm,# + datetime.timedelta(minutes=-frequency), 
                periods=signals_df.shape[0], 
                freq='1min'
            )
        signals_df.index = new_index
        signals_df.index.name = 'Timestamp'
        signals_df = signals_df.reset_index()
        signals_df['Timestamp'] = signals_df['Timestamp'].dt.strftime('%Y-%m-%dT%H:%M:%S.%f')

        # Export this file in CSV format:
        component_fname = os.path.join(INFER_DATA, 'input', f'{component}_{current_timestamp}.csv')
        signals_df.to_csv(component_fname, index=None)
    
    start = start + datetime.timedelta(minutes=+frequency)
    
# Upload the whole folder to S3, in the input location:
INFERENCE_INPUT = os.path.join(INFER_DATA, 'input')
!aws s3 cp --recursive --quiet $INFERENCE_INPUT s3://$BUCKET/$PREFIX/input
    
# Now that we've prepared the data, create the scheduler by running:
create_scheduler_response = scheduler.create()

Our scheduler is now running and its inference history is currently empty:

![Scheduler created](../assets/schedule_created.png)

## Get inference results
---

### List inference executions

**Let's now wait for 5-15 minutes to give some time to the scheduler to run its first inferences.** Once the wait is over, we can use the ListInferenceExecution API for our current inference scheduler. The only mandatory parameter is the scheduler name.

You can also choose a time period for which you want to query inference executions for. If you don't specify it, then all executions for an inference scheduler will be listed. If you want to specify the time range, you can do this:

```python
START_TIME_FOR_INFERENCE_EXECUTIONS = datetime.datetime(2010,1,3,0,0,0)
END_TIME_FOR_INFERENCE_EXECUTIONS = datetime.datetime(2010,1,5,0,0,0)
```

Which means the executions after `2010-01-03 00:00:00` and before `2010-01-05 00:00:00` will be listed.

You can also choose to query for executions in particular status, the allowed status are `IN_PROGRESS`, `SUCCESS` and `FAILED`.

In [None]:
START_TIME_FOR_INFERENCE_EXECUTIONS = None
END_TIME_FOR_INFERENCE_EXECUTIONS = None
EXECUTION_STATUS = None

execution_summaries = []

while len(execution_summaries) == 0:
    execution_summaries = scheduler.list_inference_executions(
        start_time=START_TIME_FOR_INFERENCE_EXECUTIONS,
        end_time=END_TIME_FOR_INFERENCE_EXECUTIONS,
        execution_status=EXECUTION_STATUS
    )
    if len(execution_summaries) == 0:
        print('WAITING FOR THE FIRST INFERENCE EXECUTION')
        time.sleep(60)
        
    else:
        print('FIRST INFERENCE EXECUTED\n')
        break
            
execution_summaries

We have configured this scheduler to run every five minutes. After a few minutes we can also see the history in the console populated with its first execution:

![Inference history](../assets/schedule_inference_history.png)

When the scheduler starts (for example `datetime.datetime(2021, 1, 27, 9, 15)`, it looks for **a single** CSV file located in the input location with a filename that contains a timestamp set to the previous step. For example, a file named:

* subsystem-01_2021012709**10**00.csv will be found and ingested
* subsystem-01_2021012708**15**00.csv will **not be** ingested (it will be ingested at the next inference execution)

In addition, when opening the file `subsystem-01_20210127091000.csv`, it will look for any row with a date that is between the DataStartTime and the DataEndTime of the inference execution. If it doesn't find such a row, an internal exception will be thrown.

### Get actual prediction results

After each successful inference, a CSV file is positionned in the output location of your bucket. Each inference creates a new folder with a single `results.csv` file in it. Let's read these files and display their content here:

In [None]:
results_df = scheduler.get_predictions()
results_df.to_csv(os.path.join(INFER_DATA, 'output', 'results.csv'))
results_df

## Inference scheduler operations
---
### Stop inference scheduler
**Be frugal**, running the scheduler is the main cost driver of Amazon Lookout for Equipment. Use the following API to stop an already running inference scheduler. This will stop the periodic inference executions:

In [None]:
scheduler.stop()

### Start an inference scheduler
You can restart any `STOPPED` inference scheduler using this API:

In [None]:
scheduler.start()

### Delete an inference scheduler
You can delete a **stopped** scheduler you have no more use of: you can only have one scheduler per model.

In [None]:
scheduler.stop()
scheduler.delete()

## Conclusion
---
In this notebook, we used the model created in part 3 of this notebook, configured a scheduler and extracted the predictions obtained after it executed a few inferences.