# Preprocessing tutorial

In this notebook, we'll explain the major functionality of the `preprocessing` package. The example dataset below contains deliberately erroneous data. This is meant to demonstrate functionality. When using your own data, you may not necessarily need every function.

We will describe preprocessing steps for both historical training and live inference scenarios. The key difference between these two scenarios is that we can remove and impute data with bad data quality for training. However for live inference, we need provide warning on bad data quality while keep the system running with proper assumptions.

Below shows a typical data engineering flow for the historical training scenario. The preprocessing flow for the live inference follows similar step except that we will focus on reporting data anomalies and avoid removing and changing the data. 


![flowchart](./_images/_preprocessing_steps.svg)


In the following, we will first provide a detailed step by step guide on based on the work flow for training. Then we will show how to conduct data anomaly detection in live scenario at the end.

## Setup


In [1]:
# Resolve path when used in a use case project example
import sys
from pathlib import Path

import pandas as pd

sys.path.insert(0, str(Path("../../").resolve()))

# Import logging
import logging

logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger("__name__")

## Load Data

The preprocessing toolkit assumes that data extraction is already completed since it is specific to the source system and architecture in use. The data loading step targets ingesting pre-extracted data, typically provided as a configuration file that defines the various parameters and metadata for processing the data.

For this purpose, our toolkit includes a function `get_tag_config` which reads a configuration file and maps it to a specific `Pydantic` model based on a given schema. This function supports loading configurations from CSV or YAML files and, after loading, it returns a `TagsConfig` object that encapsulates all configuration parameters for further processing. These configurations are typically structured as per expertise inputs, and this process simplifies translating structured tag information into a format that our preprocessing functions can utilize.

To efficiently load your tag configurations, you can use the `get_tag_config` function as demonstrated below:


In [2]:

from preprocessing.configs import get_tag_config
from preprocessing.datasets import DATA_DIR  # This is the path to the data directory

tags_raw_config = get_tag_config(
    path_to_tag_config=DATA_DIR / "sample_tags_raw_config.csv",
    config_loader="csv",  # Replace with "yaml" if you are using a YAML file
    parameters_schema="raw",  # Replace with the specific schema you need
    delimiter=";",  # Replace with the delimiter used in your configuration file
)

# Now `tags_config` holds the loaded tag configurations, ready for preprocessing stages.
tags_raw_config.to_df().head(20)

Unnamed: 0,tag_name,raw_tag,description,display_name
0,iron_feed,Sys_A/PLT.ABC_.T0001,percentage of Iron in feed,% Iron Feed
1,silica_feed,Sys_A/PLT.ABC_.T0002,percentage of silica in feed,% Silica Feed
2,starch_flow,Sys_A/PLT.ABC_.T0003,Starch Flow meter,Starch Flow
3,amina_flow,Sys_A/PLT.ABC_.T0004,Amina Flow meter,Amina Flow
4,ore_pulp_flow,Sys_A/PLT.ABC_.T0005,Ore Pulp Flow meter,Ore Pulp Flow
5,ore_pulp_ph,Sys_A/PLT.ABC_.T0006,Ore Pulp cell A pH,Ore Pulp pH
6,ore_pulp_density,Sys_A/PLT.ABC_.T0007,Ore Pulp Density meter,Ore Pulp Density
7,air_flow01,Sys_B/PLT.ABC_.T0008,FC 101 Air Flow,Flotation Column 01 Air Flow
8,air_flow02,Sys_B/PLT.ABC_.T0009,FC 102 Air Flow,Flotation Column 02 Air Flow
9,air_flow03,Sys_B/PLT.ABC_.T0010,FC 103 Air Flow,Flotation Column 03 Air Flow


Based on this list and other input of the data system, the data engineer will create the pipeline to extract the raw data and also create meta information of the data along extraction. Sometimes the meta information can be directly extract from the data base. Most of the time, the meta information need to be consolidate with expert input during extraction. Below shows what are some useful meta information should be consolidated during extraction.

### Data Configurations Schema

In our toolkit, the configuration schemas are defined using Pydantic models, which helps in validating and parsing configuration data. Below is a mapping of the supported schemas:

- `"meta"` for `TagMetaParameters`
- `"resample"` for `TagResampleParameters`
- `"outliers"` for `TagOutliersParameters`
- `"on_off"` for `TagOnOffDependencyParameters`
- `"impute"` for `TagImputationParameters`

The `get_tag_config` function utilizes this schema mapping to correctly parse the configurations and instantiate objects containing the defined parameters for each kind of tag.

In [3]:
## show the meta_schema information

tags_meta_config = get_tag_config(
    path_to_tag_config=DATA_DIR / "sample_tags_meta_config.csv",
    config_loader="csv",
    parameters_schema="meta",
    delimiter=";",
)
tags_meta_config.to_df().head(20)

Unnamed: 0,tag_name,data_source,data_type,tag_type,unit,min,max,extract_freq
0,iron_feed,Sys_A,DataType.NUMERIC,TagType.INPUT,%,45.0,65.0,5min
1,silica_feed,Sys_A,DataType.NUMERIC,TagType.INPUT,%,2.0,30.0,5min
2,starch_flow,Sys_A,DataType.NUMERIC,TagType.INPUT,cc/min,2000.0,6000.0,5min
3,amina_flow,Sys_A,DataType.NUMERIC,TagType.INPUT,cc/min,300.0,700.0,5min
4,ore_pulp_flow,Sys_A,DataType.NUMERIC,TagType.INPUT,cc/min,370.0,410.0,5min
5,ore_pulp_ph,Sys_A,DataType.NUMERIC,TagType.INPUT,ph,9.0,11.0,5min
6,ore_pulp_density,Sys_A,DataType.NUMERIC,TagType.INPUT,density,1.5,1.8,5min
7,air_flow01,Sys_B,DataType.NUMERIC,TagType.INPUT,cc/min,200.0,300.0,5min
8,air_flow02,Sys_B,DataType.NUMERIC,TagType.INPUT,cc/min,200.0,300.0,5min
9,air_flow03,Sys_B,DataType.NUMERIC,TagType.INPUT,cc/min,200.0,300.0,5min


Now in this tutorial, let's assume we already have these above meta information and use the following extracted raw data. We start to demonstrate the preprocessing functionalities in the following sections.

In [4]:
# Load example data

from preprocessing import datasets as preprocessing_datasets

df = preprocessing_datasets.get_sample_preprocessed_data()

Before starting the preprocessing, Let's first describe our dataset to get basic descriptive statistics for each column:

In [5]:
df.describe().round(2)

Unnamed: 0,Sys_A/PLT.ABC_.T0001,Sys_A/PLT.ABC_.T0002,Sys_A/PLT.ABC_.T0003,Sys_A/PLT.ABC_.T0004,Sys_A/PLT.ABC_.T0005,Sys_A/PLT.ABC_.T0006,Sys_A/PLT.ABC_.T0007,Sys_B/PLT.ABC_.T0008,Sys_B/PLT.ABC_.T0009,Sys_B/PLT.ABC_.T0010,...,Sys_B/PLT.ABC_.T0015,Sys_B/PLT.ABC_.T0016,Sys_B/PLT.ABC_.T0017,Sys_B/PLT.ABC_.T0018,Sys_B/PLT.ABC_.T0019,Sys_B/PLT.ABC_.T0020,Sys_B/PLT.ABC_.T0021,Sys_A/PLT.ABC_.T0022,Sys_A/PLT.ABC_.T0023,Sys_A/PLT.ABC_.T0024
count,4097.0,4097.0,4097.0,4097.0,4097.0,4097.0,4097.0,4097.0,4097.0,4097.0,...,4097.0,4097.0,4097.0,4097.0,4097.0,4097.0,4097.0,4097.0,4097.0,4415.0
mean,inf,14.65,2869.14,488.15,397.58,9.77,1.68,280.15,277.16,281.08,...,520.24,522.65,531.35,420.32,425.25,429.94,421.02,65.05,2.33,1.0
std,,6.81,950.48,83.69,8.37,0.38,0.06,29.41,29.42,28.37,...,122.18,116.05,138.59,76.59,75.03,75.51,72.56,1.12,1.12,0.07
min,42.74,1.31,54.6,242.93,376.84,8.75,1.52,175.89,178.19,177.2,...,181.93,224.91,135.21,165.73,214.74,203.7,185.06,62.05,0.6,0.0
25%,52.67,8.94,2168.97,436.04,398.85,9.54,1.65,250.09,250.1,250.09,...,416.47,449.25,405.37,351.49,350.98,354.13,350.94,64.37,1.44,1.0
50%,56.08,13.85,2908.34,502.45,399.84,9.8,1.7,299.84,299.53,299.89,...,499.62,499.82,499.59,401.27,401.13,407.55,400.99,65.21,2.0,1.0
75%,59.72,19.6,3528.73,549.52,400.59,10.03,1.72,299.95,299.98,299.95,...,599.71,599.33,600.22,496.2,497.78,497.81,462.28,65.86,3.01,1.0
max,inf,33.4,6270.16,736.98,418.07,10.81,1.83,312.3,309.89,302.78,...,859.03,827.78,884.84,675.63,674.07,698.51,655.5,68.01,5.53,1.0


We can also utilise the `create_summary_table` function to get an initial sense of quality issues in the data

In [6]:
from preprocessing import create_summary_table

create_summary_table(df, tags_meta_config)

Unnamed: 0,count,mean,std,min,25%,50%,75%,max,null_count,inf_count,below_range_min_count,above_range_max_count,percent_below_range_min,percent_above_range_max
Sys_A/PLT.ABC_.T0001,4097.0,inf,,42.74,52.67,56.08,59.72,inf,318,3,201.0,24.0,4.906029,0.585794
Sys_A/PLT.ABC_.T0002,4097.0,14.651733,6.808236,1.31,8.94,13.85,19.6,33.4,318,0,45.0,198.0,1.098365,4.832804
Sys_A/PLT.ABC_.T0003,4097.0,2869.142009,950.48008,54.595483,2168.968993,2908.340847,3528.727412,6270.158798,318,0,205.0,205.0,5.003661,5.003661
Sys_A/PLT.ABC_.T0004,4097.0,488.145318,83.689937,242.927477,436.037967,502.454283,549.522256,736.982378,318,0,205.0,205.0,5.003661,5.003661
Sys_A/PLT.ABC_.T0005,4097.0,397.578386,8.370683,376.837604,398.851356,399.842656,400.589883,418.070232,318,0,205.0,205.0,5.003661,5.003661
Sys_A/PLT.ABC_.T0006,4097.0,9.767642,0.378027,8.753389,9.540878,9.79585,10.030779,10.80737,318,0,205.0,205.0,5.003661,5.003661
Sys_A/PLT.ABC_.T0007,4097.0,1.68038,0.063775,1.519926,1.651352,1.695705,1.72179,1.83243,318,0,205.0,205.0,5.003661,5.003661
Sys_B/PLT.ABC_.T0008,4097.0,280.151579,29.410048,175.885579,250.089767,299.837839,299.95135,312.295415,318,0,205.0,205.0,5.003661,5.003661
Sys_B/PLT.ABC_.T0009,4097.0,277.159709,29.423337,178.18843,250.096872,299.526717,299.9799,309.887767,318,0,205.0,205.0,5.003661,5.003661
Sys_B/PLT.ABC_.T0010,4097.0,281.082104,28.373511,177.202665,250.087672,299.888089,299.946594,302.783,318,0,205.0,205.0,5.003661,5.003661


More explicitly our quality issues are,

- `"Sys_A/PLT.ABC_.T0001"` contains `inf` values
- All columns contain some `NaN` values
- Majority of columns contain outliers based on the values specified in the tag meta data configuration
  

***

## 1 Renaming

First we rename the raw tag names to a tag name that is more meaningful. Usually the raw tag names extracted from the database may not have any physical meaning, therefore we will need expert input to map the raw names to a unique and informative name that can be used for python programming. 

Below shows an example of the rename map where 

- `raw_tag` is the raw name if the tag from the database
- `tag_name` is the unique meaningful name for the tag thag can be used easily in python programming

<div class="alert alert-info">
<b>Note</b>

Feature name should represent very clear and human-readable phisical property, not a copy-pasted tag identifier.

* An example of a good feature name is `pump_pressure_kPa | H105`. It represents the **human-readable physical property of the process**, unit of measurement, and the reference to the original tag this physical property is calculated based on. If the preprocessing recipe or formula for this specific feature changes, a new feature called `pump_pressure_kPa | H105` will be created, while `pump_pressure_kPa | H105` will still exist and use the old recipe or the old formula. 

* An example of a bad feature name is `ZO.RHONH955.H105.SP` because it is not human-readable, and it is hard to tie the tag identifier with its physical meaning. Additionally, full tag identifiers are never used by SMEs in day-to-day communications.


In [7]:
tags_raw_config.to_df().head(25)

Unnamed: 0,tag_name,raw_tag,description,display_name
0,iron_feed,Sys_A/PLT.ABC_.T0001,percentage of Iron in feed,% Iron Feed
1,silica_feed,Sys_A/PLT.ABC_.T0002,percentage of silica in feed,% Silica Feed
2,starch_flow,Sys_A/PLT.ABC_.T0003,Starch Flow meter,Starch Flow
3,amina_flow,Sys_A/PLT.ABC_.T0004,Amina Flow meter,Amina Flow
4,ore_pulp_flow,Sys_A/PLT.ABC_.T0005,Ore Pulp Flow meter,Ore Pulp Flow
5,ore_pulp_ph,Sys_A/PLT.ABC_.T0006,Ore Pulp cell A pH,Ore Pulp pH
6,ore_pulp_density,Sys_A/PLT.ABC_.T0007,Ore Pulp Density meter,Ore Pulp Density
7,air_flow01,Sys_B/PLT.ABC_.T0008,FC 101 Air Flow,Flotation Column 01 Air Flow
8,air_flow02,Sys_B/PLT.ABC_.T0009,FC 102 Air Flow,Flotation Column 02 Air Flow
9,air_flow03,Sys_B/PLT.ABC_.T0010,FC 103 Air Flow,Flotation Column 03 Air Flow


We use this rename map to change the raw tag name in the data to the meaningful `tag_name`.

In [8]:
from preprocessing.utils import rename_tags

df = rename_tags(tags_raw_config=tags_raw_config, data_to_rename=df)
df.head()



Unnamed: 0,date,iron_feed,silica_feed,starch_flow,amina_flow,ore_pulp_flow,ore_pulp_ph,ore_pulp_density,air_flow01,air_flow02,...,column_level01,column_level02,column_level03,column_level04,column_level05,column_level06,column_level07,iron_conc,silica_conc,amina_flow_sample_on_off
0,03/10/2017 01:00,55.2,16.98,3162.625026,578.786678,398.753368,10.113487,1.729558,251.166672,250.226086,...,450.383776,446.891845,450.474523,449.912259,455.792161,464.38331,450.532747,66.91,1.31,0
1,03/10/2017 02:00,55.2,16.98,3133.256389,537.219661,399.871822,10.129742,1.667784,249.880589,250.21405,...,449.373361,450.249356,450.081222,450.328806,448.722983,455.501528,451.3877,67.06,1.11,0
2,03/10/2017 03:00,55.2,16.98,3479.482944,591.906744,398.763806,10.048403,1.732711,250.161328,250.104167,...,449.972878,450.868711,450.901822,451.145822,451.134189,459.981311,450.296722,66.97,1.27,0
3,03/10/2017 04:00,55.2,16.98,3228.036436,593.170106,399.866983,9.918614,1.731056,250.208772,250.204761,...,487.940706,491.462111,487.387206,494.528183,495.664011,502.76385,494.939889,66.75,1.36,0
4,03/10/2017 05:00,55.2,16.98,3327.280739,619.710806,399.615089,9.746029,1.765879,249.9178,250.160494,...,549.031539,549.983156,549.459572,549.975483,549.512533,560.6963,550.271772,66.63,1.34,0


## 2 Cleaning

In this step, we conduct simple data cleaning.

Some of the functions below are quite simple, and a pure pandas alternative may be preferable. However, the functions in `preprocessing` have extensive logging.

Functions with more complex functionality will have more extensive explanation.

In [9]:
from preprocessing import (
    replace_inf_values,
    remove_null_columns,
    unify_timestamp_col_name,
    round_timestamps,
    deduplicate_pandas,
    enforce_schema,
)

#### `replace_inf_values`

As its name suggests, this function simply logs and replaces infinities.

In [10]:
df = replace_inf_values(df)

INFO:preprocessing.cleaning:
number of inf values in data: 
           before_cleaning  after_cleaning
iron_feed                3               0


#### `remove_null_columns`

Next, we'll remove columns that are entirely `NaN`.

In [11]:
df = remove_null_columns(df)

INFO:preprocessing.cleaning:All columns have values. Continuing...


#### `unify_timestamp_col_name`

Next, we'll rename our datetime column.

In [12]:
df = unify_timestamp_col_name(
    datetime_col="date",
    data=df,
    unified_name="timestamp"
)

INFO:preprocessing.cleaning:Rename column 'date' to 'timestamp'.


#### `enforce_schema`

This function ensures data types specified in the tag meta information match with the columns in the dataset.

In [13]:
df = enforce_schema(data=df, meta_config=tags_meta_config)

INFO:preprocessing.cleaning:Converting ['air_flow01', 'air_flow02', 'air_flow03', 'air_flow04', 'air_flow05', 'air_flow06', 'air_flow07', 'amina_flow', 'column_level01', 'column_level02', 'column_level03', 'column_level04', 'column_level05', 'column_level06', 'column_level07', 'iron_conc', 'iron_feed', 'ore_pulp_density', 'ore_pulp_flow', 'ore_pulp_ph', 'silica_conc', 'silica_feed', 'starch_flow'] columns to 'numeric' data type.
INFO:preprocessing.cleaning:Converting ['amina_flow_sample_on_off'] columns to 'boolean' data type.


#### `round_timestamps`

Next, we'll do a simple datetime rounding. In this tutorial, we want to keep our data in a one hour format. But below, we show an example of rounding to a `2h` frequency. The `frequency` argument here can be anything that `Series.dt.round` [here](https://pandas.pydata.org/docs/reference/api/pandas.Series.dt.round.html) accepts.

In [14]:
df.head(3)

Unnamed: 0,timestamp,iron_feed,silica_feed,starch_flow,amina_flow,ore_pulp_flow,ore_pulp_ph,ore_pulp_density,air_flow01,air_flow02,...,column_level01,column_level02,column_level03,column_level04,column_level05,column_level06,column_level07,iron_conc,silica_conc,amina_flow_sample_on_off
0,03/10/2017 01:00,55.2,16.98,3162.625026,578.786678,398.753368,10.113487,1.729558,251.166672,250.226086,...,450.383776,446.891845,450.474523,449.912259,455.792161,464.38331,450.532747,66.91,1.31,0
1,03/10/2017 02:00,55.2,16.98,3133.256389,537.219661,399.871822,10.129742,1.667784,249.880589,250.21405,...,449.373361,450.249356,450.081222,450.328806,448.722983,455.501528,451.3877,67.06,1.11,0
2,03/10/2017 03:00,55.2,16.98,3479.482944,591.906744,398.763806,10.048403,1.732711,250.161328,250.104167,...,449.972878,450.868711,450.901822,451.145822,451.134189,459.981311,450.296722,66.97,1.27,0


In [15]:
df = round_timestamps(frequency="2h", data=df, datetime_col="timestamp")
df.head(3)

INFO:preprocessing.timezones:Rounding 'timestamp' to '2h' frequency.


Unnamed: 0,timestamp,iron_feed,silica_feed,starch_flow,amina_flow,ore_pulp_flow,ore_pulp_ph,ore_pulp_density,air_flow01,air_flow02,...,column_level01,column_level02,column_level03,column_level04,column_level05,column_level06,column_level07,iron_conc,silica_conc,amina_flow_sample_on_off
0,2017-03-10 00:00:00,55.2,16.98,3162.625026,578.786678,398.753368,10.113487,1.729558,251.166672,250.226086,...,450.383776,446.891845,450.474523,449.912259,455.792161,464.38331,450.532747,66.91,1.31,0
1,2017-03-10 02:00:00,55.2,16.98,3133.256389,537.219661,399.871822,10.129742,1.667784,249.880589,250.21405,...,449.373361,450.249356,450.081222,450.328806,448.722983,455.501528,451.3877,67.06,1.11,0
2,2017-03-10 04:00:00,55.2,16.98,3479.482944,591.906744,398.763806,10.048403,1.732711,250.161328,250.104167,...,449.972878,450.868711,450.901822,451.145822,451.134189,459.981311,450.296722,66.97,1.27,0


#### `deduplicate_pandas`

Next, we'll remove duplicate rows.

In [16]:
df = deduplicate_pandas(data=df)
df.sort_values(["timestamp"]).head(3)

INFO:preprocessing.cleaning:Dataframe shape before dedup: (4415, 25)
INFO:preprocessing.cleaning:Dataframe shape after dedup: (4257, 25)
INFO:quality_check_logger:Dropped 158 duplicate timestamps


Unnamed: 0,timestamp,iron_feed,silica_feed,starch_flow,amina_flow,ore_pulp_flow,ore_pulp_ph,ore_pulp_density,air_flow01,air_flow02,...,column_level01,column_level02,column_level03,column_level04,column_level05,column_level06,column_level07,iron_conc,silica_conc,amina_flow_sample_on_off
0,2017-03-10 00:00:00,55.2,16.98,3162.625026,578.786678,398.753368,10.113487,1.729558,251.166672,250.226086,...,450.383776,446.891845,450.474523,449.912259,455.792161,464.38331,450.532747,66.91,1.31,0
1,2017-03-10 02:00:00,55.2,16.98,3133.256389,537.219661,399.871822,10.129742,1.667784,249.880589,250.21405,...,449.373361,450.249356,450.081222,450.328806,448.722983,455.501528,451.3877,67.06,1.11,0
2,2017-03-10 04:00:00,55.2,16.98,3479.482944,591.906744,398.763806,10.048403,1.732711,250.161328,250.104167,...,449.972878,450.868711,450.901822,451.145822,451.134189,459.981311,450.296722,66.97,1.27,0


## 3 Outlier handeling


After the first basic cleaning step, we would like to deal with the outliers in the dataset.
The `min` and `max` from the meta schema can be used as a starting point for determining outliers, however they may not always be available or accurate, therefore we will receive expert input on what are the more reasonable ranges and how to handle outliers (e.g. drop or clip). These information should be consolidated in an input parameter file for outlier handling.

It is important to selecting the appropriate outlier detection methods and handling methods based on the nature of the outliers. Below we provide a summary guide for how to detect and deal with them in different scenarios.

#### 3.1 Outliers caused by Process and Business change
These "outliers" are related to the underlying process change of the data. They should be labelled correctly to the corresponding process and only be used for the purpose of modeling the related process. Potential scenarios includes:
* Unreliable / Unsteady period
* Unusually incidents, e.g. system shutdown due to power cut, upgrade, etc.
* change of equipments, materials, procedures

Usually, we will need business guidance to identify these outliers. For example the process engineering can help the data engineer to use sensor values to define steady period filters. We can also use data driven method such as [***Weibull Analysis***](../../../../diagnostics/src/diagnostics/notebooks/weibull_analysis.ipynb) to diagnose these different operating regimes.

When modeling, these outliers are usually removed to differentiate between different modes of the process.

# 3.2 Outliers caused by data quality
After removing the process/business related outliers, we can check for outliers caused by other unexpected reasons and data quality issues.
Here we only consider univariate outliers. For outlier detection using multivariate models, please refer [here](https://scikit-learn.org/stable/modules/outlier_detection.html#outlier-detection) for some `scikit-learn` implementations and an example of using `EllimpticEnvelope` can be found in this [Brix post](https://brix.quantumblack.com/post/oai-mineral-processing-diagnostics-power-curve-analysis-4fadc:latest/01_power_load_curve_analysis.ipynb).

For univariate outliers, we can usually use defined ranges from expert or some data driven methods to identify the outliers. Below shows an example of input parameters for outlier handling where the `range_min` and `range_max` are provided by experts input.

Below shows an example of input parameters for outlier handling.

In [17]:
tags_outliers_config = get_tag_config(
    path_to_tag_config=DATA_DIR / "sample_tags_outliers_config.csv",
    config_loader="csv",
    parameters_schema="outliers",
    delimiter=";",
)
tags_outliers_config.to_df().head()

Unnamed: 0,tag_name,range_min,range_max,special_values,outlier_rules
0,iron_feed,45.0,65.0,9999.0,OutliersRule.DROP
1,silica_feed,2.0,30.0,,OutliersRule.DROP
2,starch_flow,2000.0,6000.0,-inf,OutliersRule.DROP
3,amina_flow,300.0,700.0,,OutliersRule.DROP
4,ore_pulp_flow,370.0,410.0,,OutliersRule.DROP


#### `get_tag_range`
We can use the `get_tag_range` to retrieve the ranges to be used in outlier processing.

In [18]:
from preprocessing import get_tag_range

td_tag_ranges = get_tag_range(outliers_config=tags_outliers_config)
td_tag_ranges

{'iron_feed': (45.0, 65.0),
 'silica_feed': (2.0, 30.0),
 'starch_flow': (2000.0, 6000.0),
 'amina_flow': (300.0, 700.0),
 'ore_pulp_flow': (370.0, 410.0),
 'ore_pulp_ph': (9.0, 11.0),
 'ore_pulp_density': (1.5, 1.8),
 'air_flow01': (200.0, 300.0),
 'air_flow02': (200.0, 300.0),
 'air_flow03': (200.0, 300.0),
 'air_flow04': (200.0, 300.0),
 'air_flow05': (200.0, 300.0),
 'air_flow06': (200.0, 300.0),
 'air_flow07': (200.0, 300.0),
 'column_level01': (200.0, 800.0),
 'column_level02': (200.0, 800.0),
 'column_level03': (200.0, 800.0),
 'column_level04': (200.0, 800.0),
 'column_level05': (200.0, 800.0),
 'column_level06': (200.0, 800.0),
 'column_level07': (200.0, 800.0),
 'iron_conc': (62.0, 78.0),
 'silica_conc': (1.0, 4.0),
 'amina_flow_sample_on_off': (0.0, 1.0)}

#### `calculate_tag_range`

In some situations, the tag ranges may not be given or we would like to derive the outlier range based on the data to start with. For this we can use `calculate_tag_range` to get the ranges of tags based on different methods, including:

- `min-max`: use min and max value of the data
- `IQR`: use 1.5 * interquartile range (IQR) as deviation distance on 1st and 3rd quartile to calculate the lower and upper limit
- `3-sigma` use 3-sigma rule to calculate the lower and upper limit

Users need to provide the data and a list of tag names for which they want to get the ranges:
- `data`: data frame that contains actual value for the tags
- `list_of_tags`: list of tags to calculate the ranges on. When not provided by the user, all the numerical tags in `df` will be used.

When using the data driven method to get the ranges, it is often useful to ensure the calculated ranges are within the observed range by setting `within_observed_range=True`, especially for data that are not normally distributed.

In [19]:
from preprocessing import calculate_tag_range

tag_ranges = calculate_tag_range(
    data=df,
    method="IQR",
    list_of_tags=["iron_conc", "silica_conc", "ore_pulp_ph"],
    within_observed_range=True
)
tag_ranges

{'iron_conc': (62.13500000000001, 68.01),
 'silica_conc': (0.6, 5.364999999999999),
 'ore_pulp_ph': (8.806027247500001, 10.7656296995)}

We can also use the data driven ranges to update the ranges in the outlier parameter data. This can be done by calling the `update_tag_range` method on the outlier configuration with the dictionary of tag ranges to be updated.

In [20]:
from preprocessing import update_tag_range

updated_tag_outliers_config = update_tag_range(tags_outliers_config=tags_outliers_config,
                                 tag_range=tag_ranges)

tag_ranges = get_tag_range(outliers_config=updated_tag_outliers_config)
tag_ranges

{'iron_feed': (45.0, 65.0),
 'silica_feed': (2.0, 30.0),
 'starch_flow': (2000.0, 6000.0),
 'amina_flow': (300.0, 700.0),
 'ore_pulp_flow': (370.0, 410.0),
 'ore_pulp_ph': (8.806027247500001, 10.7656296995),
 'ore_pulp_density': (1.5, 1.8),
 'air_flow01': (200.0, 300.0),
 'air_flow02': (200.0, 300.0),
 'air_flow03': (200.0, 300.0),
 'air_flow04': (200.0, 300.0),
 'air_flow05': (200.0, 300.0),
 'air_flow06': (200.0, 300.0),
 'air_flow07': (200.0, 300.0),
 'column_level01': (200.0, 800.0),
 'column_level02': (200.0, 800.0),
 'column_level03': (200.0, 800.0),
 'column_level04': (200.0, 800.0),
 'column_level05': (200.0, 800.0),
 'column_level06': (200.0, 800.0),
 'column_level07': (200.0, 800.0),
 'iron_conc': (62.13500000000001, 68.01),
 'silica_conc': (0.6, 5.364999999999999),
 'amina_flow_sample_on_off': (0.0, 1.0)}

#### `remove_outlier`

Now we can use the output from the above to remove outliers from the data. The other argument in this function, `rule` has two possible values:

- `"clip"`: apply a clipping so that all values end up in the interval `[range_min, range_max]`.
- `"drop"`: remove rows that contain outlier values.

Usually, `drop` is used when there are enough data. Users should consult business and process expert to understand where `clip` is more appropriate.

In [21]:
from preprocessing import remove_outlier

df_removed_outliers, remove_outliers_summary = remove_outlier(
    data=df,
    outliers_config=tags_outliers_config,
    rule="drop",
)
remove_outliers_summary


Unnamed: 0,tag_name,outlier_percentage
1,iron_feed,4.75
2,silica_feed,5.24
3,starch_flow,21.78
4,amina_flow,6.98
5,ore_pulp_flow,8.46
6,ore_pulp_ph,5.4
7,ore_pulp_density,4.04
8,air_flow01,21.85
9,air_flow02,29.08
10,air_flow03,13.86


## 4 Imputing

Next we'll demonstrate a simple imputing strategy.

(*TODO*: refactor interpolate_cols to allow different interpolation setups from an interpolation input)

In [22]:
from preprocessing import interpolate_cols

#### `interpolate_cols`

This function expects `data` and `cols_list`. If `cols_list` is none, interpolation will be performed on all numeric columns.

Any other keyword arguments will be passed to `DataFrame.interpolate` [here](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.interpolate.html).

In [23]:
imputation_config = get_tag_config(
    path_to_tag_config=DATA_DIR / "sample_tags_imputation_config.csv",
    config_loader="csv",
    parameters_schema="impute",
    delimiter=";",
)

df_interpolated, missing_data_summary = interpolate_cols(data=df_removed_outliers, impute_config=imputation_config,
                                            limit=5)
missing_data_summary


INFO:preprocessing.imputing:Interpolating 'iron_feed' column using 'linear' method.
INFO:preprocessing.imputing:Interpolating 'silica_feed' column using 'linear' method.
INFO:preprocessing.imputing:Interpolating 'starch_flow' column using 'linear' method.
INFO:preprocessing.imputing:Interpolating 'amina_flow' column using 'linear' method.
INFO:preprocessing.imputing:Interpolating 'ore_pulp_flow' column using 'linear' method.
INFO:preprocessing.imputing:Interpolating 'ore_pulp_ph' column using 'linear' method.
INFO:preprocessing.imputing:Interpolating 'ore_pulp_density' column using 'linear' method.
INFO:preprocessing.imputing:Interpolating 'air_flow01' column using 'linear' method.
INFO:preprocessing.imputing:Interpolating 'air_flow02' column using 'linear' method.
INFO:preprocessing.imputing:Interpolating 'air_flow03' column using 'linear' method.
INFO:preprocessing.imputing:Interpolating 'air_flow04' column using 'linear' method.
INFO:preprocessing.imputing:Interpolating 'air_flow05'

Unnamed: 0,tag_name,imputation_rule,missing_count,missing_percentage
0,iron_feed,ImputationRule.LINEAR,202,4.745126
1,silica_feed,ImputationRule.LINEAR,223,5.238431
2,starch_flow,ImputationRule.LINEAR,927,21.775899
3,amina_flow,ImputationRule.LINEAR,297,6.976744
4,ore_pulp_flow,ImputationRule.LINEAR,360,8.45666
5,ore_pulp_ph,ImputationRule.LINEAR,230,5.402866
6,ore_pulp_density,ImputationRule.LINEAR,172,4.040404
7,air_flow01,ImputationRule.LINEAR,930,21.846371
8,air_flow02,ImputationRule.LINEAR,1238,29.081513
9,air_flow03,ImputationRule.LINEAR,590,13.859525


## 5 On off logic

In general, `preprocessing` expects data from some physical process. Often Optimus projects work with sensors for machinery. These sensors usually have a corresponding column specifying whether or not the particular machine is on or not.

In the example here, `"Amina Flow"` has the `"sample_on_off"` column describing whether or not it is "on" or "off". In other words, where `"sample_on_off"` is zero or `False`, `"Amina Flow"` will be set to zero.

In [24]:
from preprocessing import set_off_equipment_to_zero

#### `set_off_equipment_to_zero`

This function carries out the operation described above.

In [25]:
tags_dep_config = get_tag_config(
    path_to_tag_config=DATA_DIR / "sample_tags_on_off_dependencies_config.csv",
    config_loader="csv",
    parameters_schema="on_off",
    delimiter=";",
)

df_on_off = set_off_equipment_to_zero(data=df_interpolated, meta_config=tags_meta_config,
                               on_off_dep_config=tags_dep_config)
df_on_off.head(10)

INFO:preprocessing.on_off_logic:Setting '{'amina_flow'}' to zero when 'amina_flow_sample_on_off' is off.


Unnamed: 0,timestamp,iron_feed,silica_feed,starch_flow,amina_flow,ore_pulp_flow,ore_pulp_ph,ore_pulp_density,air_flow01,air_flow02,...,column_level01,column_level02,column_level03,column_level04,column_level05,column_level06,column_level07,iron_conc,silica_conc,amina_flow_sample_on_off
0,2017-03-10 00:00:00,55.2,16.98,3162.625026,0.0,398.753368,10.113487,1.729558,251.166672,250.226086,...,450.383776,446.891845,450.474523,449.912259,455.792161,464.38331,450.532747,66.91,1.31,0
1,2017-03-10 02:00:00,55.2,16.98,3133.256389,0.0,399.871822,10.129742,1.667784,249.880589,250.21405,...,449.373361,450.249356,450.081222,450.328806,448.722983,455.501528,451.3877,67.06,1.11,0
2,2017-03-10 04:00:00,55.2,16.98,3479.482944,0.0,398.763806,10.048403,1.732711,250.161328,250.104167,...,449.972878,450.868711,450.901822,451.145822,451.134189,459.981311,450.296722,66.97,1.27,0
3,2017-03-10 04:00:00,55.2,16.98,3228.036436,0.0,399.866983,9.918614,1.731056,250.208772,250.204761,...,487.940706,491.462111,487.387206,494.528183,495.664011,502.76385,494.939889,66.75,1.36,0
4,2017-03-10 04:00:00,55.2,16.98,3327.280739,0.0,399.615089,9.746029,1.765879,249.9178,250.160494,...,549.031539,549.983156,549.459572,549.975483,549.512533,560.6963,550.271772,66.63,1.34,0
5,2017-03-10 06:00:00,55.2,16.98,3405.162222,0.0,399.749344,9.892237,1.765064,249.898294,250.111022,...,550.599567,549.929139,549.089244,549.6097,549.2207,561.051644,551.090767,66.85,1.15,0
6,2017-03-10 08:00:00,54.95,17.4,2865.878428,0.0,400.345133,10.180426,1.765422,250.350667,250.276594,...,549.539978,550.77905,550.995183,550.459972,549.721689,569.619278,550.575817,65.76,2.76,0
7,2017-03-10 08:00:00,54.95,17.4,3306.753627,0.0,400.128744,10.180203,1.766156,249.806094,250.236878,...,550.3123,550.0977,549.316628,549.635772,548.447728,567.598956,550.011017,65.89,2.65,0
8,2017-03-10 08:00:00,54.95,17.4,3784.487111,0.0,400.278467,9.940503,1.76005,250.063789,249.903689,...,549.879239,549.245961,550.3712,551.37,549.148528,563.176128,550.217228,66.68,1.73,0
9,2017-03-10 10:00:00,54.95,17.4,3316.753911,0.0,399.773667,10.104197,1.761051,250.096767,250.002978,...,550.143128,549.326794,549.141906,548.212558,546.767772,556.476717,549.219422,66.52,1.81,0


## 6 Resampling

During this step, we resample our data to a desired frequency.

(*TODO*: similar to interpolation, this will need to be refactored so that different resample parameters can be used)

In [26]:
from preprocessing import resample_data

#### `resample_data`

This function resamples the data with the method specified in the resample configuration. Other than the dataset, resample configuration, and column specifying our datetime, `resample_data` accepts:

- `errors`: `"raise"` or `"coerce"`, if `"raise"` is used, an error will be thrown if a column doesn't have a aggregation method in the configuration. Otherwise, the default method will be used.
- `default_method`: method to use if none is specified in the configuration.
- `resample_kwargs`: an optional dictionary of keyword arguments to pass to [DataFrame.resample](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.resample.html).

In [27]:
resample_config = get_tag_config(
    path_to_tag_config=DATA_DIR / "sample_tags_resample_config.csv",
    config_loader="csv",
    parameters_schema="resample",
    delimiter=";",
)

df_resampled = resample_data(
    data=df_on_off,
    resample_config=resample_config,
    timestamp_col="timestamp",
    errors="coerce",
)
df_resampled.head(3)

Unnamed: 0,timestamp,iron_feed,silica_feed,starch_flow,amina_flow,ore_pulp_flow,ore_pulp_ph,ore_pulp_density,air_flow01,air_flow02,...,column_level01,column_level02,column_level03,column_level04,column_level05,column_level06,column_level07,iron_conc,silica_conc,amina_flow_sample_on_off
0,2017-03-09 23:00:00,55.2,16.98,3162.625026,0.0,398.753368,10.113487,1.729558,251.166672,250.226086,...,450.383776,446.891845,450.474523,449.912259,455.792161,464.38331,450.532747,66.91,1.31,0.0
1,2017-03-10 02:00:00,55.2,16.98,3292.014127,0.0,399.529425,9.960697,1.724358,250.042122,250.170868,...,484.079621,485.640833,484.457456,486.494574,486.258429,494.735747,486.724021,66.8525,1.27,0.0
2,2017-03-10 05:00:00,55.2,16.98,3405.162222,0.0,399.749344,9.892237,1.765064,249.898294,250.111022,...,550.599567,549.929139,549.089244,549.6097,549.2207,561.051644,551.090767,66.85,1.15,0.0


## 7 Output the full preprocessing parameters as a single Tag dictionary

Finally, we can output all the preprocessing parameters used above into a single source of truth as below.

In [28]:
from preprocessing import preprocessing_output_summary

summary = preprocessing_output_summary(
        tags_raw_config,
        tags_meta_config,
        tags_outliers_config,
        imputation_config,
        tags_dep_config,
        resample_config,
        remove_outliers_summary,
        missing_data_summary,
)
summary


Unnamed: 0,tag_name,raw_tag,description,display_name,data_source,data_type,tag_type,unit,min,max,...,special_values,outlier_rules,imputation_rule,on_off_dependencies,resample_method,resample_freq,resample_offset,outlier_percentage,missing_count,missing_percentage
0,iron_feed,Sys_A/PLT.ABC_.T0001,percentage of Iron in feed,% Iron Feed,Sys_A,DataType.NUMERIC,TagType.INPUT,%,45.0,65.0,...,9999.0,OutliersRule.DROP,ImputationRule.LINEAR,[],ResampleMethod.MEAN,3H,2H,4.75,202,4.745126
1,silica_feed,Sys_A/PLT.ABC_.T0002,percentage of silica in feed,% Silica Feed,Sys_A,DataType.NUMERIC,TagType.INPUT,%,2.0,30.0,...,,OutliersRule.DROP,ImputationRule.LINEAR,[],ResampleMethod.MEAN,3H,2H,5.24,223,5.238431
2,starch_flow,Sys_A/PLT.ABC_.T0003,Starch Flow meter,Starch Flow,Sys_A,DataType.NUMERIC,TagType.INPUT,cc/min,2000.0,6000.0,...,-inf,OutliersRule.DROP,ImputationRule.LINEAR,[],ResampleMethod.MEAN,3H,2H,21.78,927,21.775899
3,amina_flow,Sys_A/PLT.ABC_.T0004,Amina Flow meter,Amina Flow,Sys_A,DataType.NUMERIC,TagType.INPUT,cc/min,300.0,700.0,...,,OutliersRule.DROP,ImputationRule.LINEAR,[amina_flow_sample_on_off],ResampleMethod.MEAN,3H,2H,6.98,297,6.976744
4,ore_pulp_flow,Sys_A/PLT.ABC_.T0005,Ore Pulp Flow meter,Ore Pulp Flow,Sys_A,DataType.NUMERIC,TagType.INPUT,cc/min,370.0,410.0,...,,OutliersRule.DROP,ImputationRule.LINEAR,[],ResampleMethod.MEAN,3H,2H,8.46,360,8.45666
5,ore_pulp_ph,Sys_A/PLT.ABC_.T0006,Ore Pulp cell A pH,Ore Pulp pH,Sys_A,DataType.NUMERIC,TagType.INPUT,ph,9.0,11.0,...,,OutliersRule.DROP,ImputationRule.LINEAR,[],ResampleMethod.MEAN,3H,2H,5.4,230,5.402866
6,ore_pulp_density,Sys_A/PLT.ABC_.T0007,Ore Pulp Density meter,Ore Pulp Density,Sys_A,DataType.NUMERIC,TagType.INPUT,density,1.5,1.8,...,,OutliersRule.DROP,ImputationRule.LINEAR,[],ResampleMethod.MEAN,3H,2H,4.04,172,4.040404
7,air_flow01,Sys_B/PLT.ABC_.T0008,FC 101 Air Flow,Flotation Column 01 Air Flow,Sys_B,DataType.NUMERIC,TagType.INPUT,cc/min,200.0,300.0,...,,OutliersRule.DROP,ImputationRule.LINEAR,[],ResampleMethod.MEAN,3H,2H,21.85,930,21.846371
8,air_flow02,Sys_B/PLT.ABC_.T0009,FC 102 Air Flow,Flotation Column 02 Air Flow,Sys_B,DataType.NUMERIC,TagType.INPUT,cc/min,200.0,300.0,...,,OutliersRule.DROP,ImputationRule.LINEAR,[],ResampleMethod.MEAN,3H,2H,29.08,1238,29.081513
9,air_flow03,Sys_B/PLT.ABC_.T0010,FC 103 Air Flow,Flotation Column 03 Air Flow,Sys_B,DataType.NUMERIC,TagType.INPUT,cc/min,200.0,300.0,...,,OutliersRule.DROP,ImputationRule.LINEAR,[],ResampleMethod.MEAN,3H,2H,13.86,590,13.859525


## 8 Anomaly detection in live data

Here we show how to use the anomaly detector class to identify and report data anomaly. We will continue using the example data set above by looking at the data after the basic cleaning step, but before the outlier handling step.

Data anomalies can be caused by various reasons and users usually need to define specific detection methods for their use case. We provide the `AnomalyDetector` as the base class to build on and also a few commonly used detectors for live inference. 

In general, an `AnomalyDetector` is applied on the high frequency time-series data before `resample`. 

* **Initialization**: the class is initialized by providing 
    * the `time_window` for detecting anomalies ( usually the `resample` frequency is a good time window for detection)
* **`detect` method**: the `detect` method defines how the anomaly should be detected and it outputs a dataframe that records
  * `name`: the name of the tag/feature
  * `is_anomaly`: indicating whether the data is an anomaly
  * `time_window`: indicate the time window for checking the anomaly
  * `comments`: idnicate any additional information about the anomaly

Below we show two detectors:
*`RangeDetector` that detects anomalies based on expected data range
* `MissingValueDetector` that detects unexpected missing values

#### `RangeDetector`

Below we show how to apply the RangeDetector on one control variable `ore_pulp_ph`. 

In [29]:
from preprocessing import RangeDetector

# create some mock live data to test the range detector
df_live = df.set_index('timestamp').resample('5h').first()

# create a range detector for `ore_pulp_ph` tag
range_detector = RangeDetector(
    time_window="6h", # use 6 hours window
    threshold=0.5, # when 50% of the data is outside the range it will be considered an anomaly
    tag_range=tag_ranges, # use the provided tag range
)

ore_pulp_ph_anomaly_table = range_detector.detect(df_live['iron_conc'])

# show the first 5 rows of the anomaly table where the ore_pulp_ph is outside the range
ore_pulp_ph_anomaly_table[ore_pulp_ph_anomaly_table["is_anomaly"]].head(5)

Unnamed: 0,timestamp,name,is_anomaly,anomaly_type,time_window,comments,outlier_percentage,lower_bound,upper_bound
25,2017-03-16 06:00:00,iron_conc,True,out of range,6h,iron_conc is out of range more than threshold ...,1.0,62.135,68.01
26,2017-03-16 12:00:00,iron_conc,True,out of range,6h,iron_conc is out of range more than threshold ...,1.0,62.135,68.01
27,2017-03-16 18:00:00,iron_conc,True,out of range,6h,iron_conc is out of range more than threshold ...,1.0,62.135,68.01
28,2017-03-17 00:00:00,iron_conc,True,out of range,6h,iron_conc is out of range more than threshold ...,1.0,62.135,68.01
29,2017-03-17 06:00:00,iron_conc,True,out of range,6h,iron_conc is out of range more than threshold ...,1.0,62.135,68.01


This table can be passed to front end monitoring dash board to inform user about the data quality of the ore pulp ph. In reality, we would be interested in monitoring a list of key variables. Similar table can be created for all the variables of interest.


#### `MissingValueDetector` 

Similar to the above example, we show how to create missing value anomaly table the same tag.

In [30]:
from preprocessing import MissingValuesDetector

missing_detector = MissingValuesDetector(
    time_window="6h", # use 6 hours window
    threshold=0.5, # when more than 50% of the data is missing, raise an anomaly
)
ph_missing_table = missing_detector.detect(df_live['ore_pulp_ph'])

# show the first 5 rows of the anomaly table where the ore_pulp_ph is missing
ph_missing_table[ph_missing_table["is_anomaly"]].head(5)

Unnamed: 0,timestamp,name,is_anomaly,anomaly_type,time_window,comments,missing_percentage
25,2017-03-16 06:00:00,ore_pulp_ph,True,missing values,6h,ore_pulp_ph is missing more than threshold = 5...,1.0
26,2017-03-16 12:00:00,ore_pulp_ph,True,missing values,6h,ore_pulp_ph is missing more than threshold = 5...,1.0
27,2017-03-16 18:00:00,ore_pulp_ph,True,missing values,6h,ore_pulp_ph is missing more than threshold = 5...,1.0
28,2017-03-17 00:00:00,ore_pulp_ph,True,missing values,6h,ore_pulp_ph is missing more than threshold = 5...,1.0
29,2017-03-17 06:00:00,ore_pulp_ph,True,missing values,6h,ore_pulp_ph is missing more than threshold = 5...,1.0


### User defined anomaly detection method

We show an example of how to create a user defined anomaly detector. 

Assume that for the same tag `ore_pulp_ph`, we would expect the ph to only change a small amount, say no more than 0.5, within the check window. If we observe any change more than that, then it is an anomaly.

In [31]:
from preprocessing import AnomalyDetector

class ChangeLimitDetector(AnomalyDetector):
    """A class for detecting anomalies based on Limit of value change.

    Attributes:
        time_window (str): The time window for detecting anomalies, e.g. "1H".
        threshold (float): The threshold for detecting anomalies.
        max_change (float): The maximum change allowed in the time window.

    Methods:
        detect(data): Detect anomalies in a time series based on missing values.
    """
    def __init__(
        self,
        time_window: str,
        max_change: float,
    ):
        super().__init__(time_window)
        self.max_change = max_change

    def detect(self, data: pd.Series) -> pd.DataFrame:
        """Detect anomalies in a time series based on maximum change in value.

        Args:
            data: The time series with datetime index to be checked for anomalies.

        Returns:
            A dataframe with columns:
             - "name": indicating the name of the tag/feature
             - "is_anomaly": indicating whether each data point is an anomaly
             - "comments": indicating any additional information about the anomaly
             - "time_window": indicating the window in which the anomaly was detected
             - "max_change": indicating the maximum change in the time window
        """
        time_window = self.time_window
        max_change = self.max_change

        df_anomaly = pd.DataFrame(
            {
                "max_value": data.resample(time_window).max(),
                "min_value": data.resample(time_window).min(),
            },
        )
        df_anomaly["max_change"] = df_anomaly["max_value"] - df_anomaly["min_value"]

        df_anomaly["is_anomaly"] = df_anomaly["max_change"] > max_change
        df_anomaly["comments"] = ""
        anomaly_comments = (
            f"{data.name} is changing more than {max_change}"
            f" in the {time_window} window. "
        )
        df_anomaly.loc[df_anomaly["is_anomaly"], "comments"] = anomaly_comments
        df_anomaly["name"] = data.name
        df_anomaly["time_window"] = time_window

        return df_anomaly[[
            "name", "is_anomaly", "time_window","comments", "max_change",
        ]].reset_index()

In [32]:
# Apply the ChangeLimitDetector to the `ore_pulp_ph` tag
ph_change_detector = ChangeLimitDetector(
    time_window="6h", # use 6 hours window
    max_change=0.5, # when the change is more than 0.5, raise an anomaly
)
ph_change_table = ph_change_detector.detect(df_live['ore_pulp_ph'])

# show the first 5 rows of the anomaly table where the ore_pulp_ph is changing more than 0.5
ph_change_table[ph_change_table["is_anomaly"]].head(5)

Unnamed: 0,timestamp,name,is_anomaly,time_window,comments,max_change
155,2017-04-17 18:00:00,ore_pulp_ph,True,6h,ore_pulp_ph is changing more than 0.5 in the 6...,0.69518
200,2017-04-29 00:00:00,ore_pulp_ph,True,6h,ore_pulp_ph is changing more than 0.5 in the 6...,0.515501
210,2017-05-01 12:00:00,ore_pulp_ph,True,6h,ore_pulp_ph is changing more than 0.5 in the 6...,0.923539
245,2017-05-10 06:00:00,ore_pulp_ph,True,6h,ore_pulp_ph is changing more than 0.5 in the 6...,0.565771
260,2017-05-14 00:00:00,ore_pulp_ph,True,6h,ore_pulp_ph is changing more than 0.5 in the 6...,0.634832


### Apply multiple detectors on multiple variables

Above we have shown individual anomaly detectors on a single tag. In reality, we might want to apply multiple different anomaly detectors on a set of variables. 
For example, 

* on `ore_pulp_ph`, we want check range, missing value and change limit
* on `iron_conc`, we want to check range and missing value

The output will then be combined and feed into the monitoring UI. To combine and group these anomaly information, we also provide a function `detect_data_anomaly`. This function takes a dictionary that describes what `AnomalyDetector` to use for each tag and apply them on the given data.

In [33]:
from preprocessing import detect_data_anomaly

# create a dictionary with the anomaly detectors for each tag
anomaly_detector_config = {
    "ore_pulp_ph": [range_detector, missing_detector, ph_change_detector],
    "iron_conc": [range_detector, missing_detector,],
}

anomaly_table = detect_data_anomaly(
    data = df_live,
    anomaly_detectors=anomaly_detector_config,
)

anomaly_table[anomaly_table["is_anomaly"]]

Unnamed: 0,timestamp,name,is_anomaly,anomaly_type,time_window,comments,outlier_percentage,lower_bound,upper_bound,missing_percentage,max_change
25,2017-03-16 06:00:00,ore_pulp_ph,True,out of range,6h,ore_pulp_ph is out of range more than threshol...,1.0,8.806027,10.76563,,
26,2017-03-16 12:00:00,ore_pulp_ph,True,out of range,6h,ore_pulp_ph is out of range more than threshol...,1.0,8.806027,10.76563,,
27,2017-03-16 18:00:00,ore_pulp_ph,True,out of range,6h,ore_pulp_ph is out of range more than threshol...,1.0,8.806027,10.76563,,
28,2017-03-17 00:00:00,ore_pulp_ph,True,out of range,6h,ore_pulp_ph is out of range more than threshol...,1.0,8.806027,10.76563,,
29,2017-03-17 06:00:00,ore_pulp_ph,True,out of range,6h,ore_pulp_ph is out of range more than threshol...,1.0,8.806027,10.76563,,
...,...,...,...,...,...,...,...,...,...,...,...
3016,2017-03-28 00:00:00,iron_conc,True,missing values,6h,iron_conc is missing more than threshold = 50....,,,,1.0,
3017,2017-03-28 06:00:00,iron_conc,True,missing values,6h,iron_conc is missing more than threshold = 50....,,,,1.0,
3018,2017-03-28 12:00:00,iron_conc,True,missing values,6h,iron_conc is missing more than threshold = 50....,,,,1.0,
3019,2017-03-28 18:00:00,iron_conc,True,missing values,6h,iron_conc is missing more than threshold = 50....,,,,1.0,


#### `create_detectors_dict`

Instead of creating the detectors one by one and hard code them into a dictionary, we also provide a function `create_detectors_dict()` to create the dictionary and initialize the anomaly detectors from a dictionary of strings representation. 

In [34]:
## define the parameters to initialize the different anomaly detectors
anomaly_parameters = {
    "preprocessing.MissingValuesDetector": {   # detector name
        "time_window": "3H",  # time window for detecting anomalies
        "threshold": 0.5,  # threshold for detecting anomalies
    },
    "preprocessing.RangeDetector": {  # detector name
        "time_window": "3H",  # time window for detecting anomalies
        "threshold": 0.5,  # threshold for detecting anomalies
        "tag_range": {  # range for detecting anomalies
        "iron_feed": [30, 70],  # range for iron_feed
        "silica_feed": [0, 35],  # range for silica_feed
        "ore_pulp_ph": [8.5, 12],  # range for ore_pulp_ph
    },
    }
}

from preprocessing import create_detectors_dict
create_detectors_dict(
    anomaly_parameters,
    ["iron_feed", "silica_feed", "ore_pulp_ph"],
)

{'iron_feed': [MissingValuesDetector(time_window=3H, threshold=0.5),
  RangeDetector(time_window=3H, threshold=0.5, tag_range={'iron_feed': [30, 70], 'silica_feed': [0, 35], 'ore_pulp_ph': [8.5, 12]})],
 'silica_feed': [MissingValuesDetector(time_window=3H, threshold=0.5),
  RangeDetector(time_window=3H, threshold=0.5, tag_range={'iron_feed': [30, 70], 'silica_feed': [0, 35], 'ore_pulp_ph': [8.5, 12]})],
 'ore_pulp_ph': [MissingValuesDetector(time_window=3H, threshold=0.5),
  RangeDetector(time_window=3H, threshold=0.5, tag_range={'iron_feed': [30, 70], 'silica_feed': [0, 35], 'ore_pulp_ph': [8.5, 12]})]}