# **Amazon Lookout for Equipment** - Getting started
*Part 2 - Dataset creation*

## Initialization
---
This repository is structured as follow:

```sh
. lookout-equipment-demo
|
├── data/
|   ├── interim                          # Temporary intermediate data are stored here
|   ├── processed                        # Finalized datasets are usually stored here
|   |                                    # before they are sent to S3 to allow the
|   |                                    # service to reach them
|   └── raw                              # Immutable original data are stored here
|
├── getting_started/
|   ├── 1_data_preparation.ipynb
|   ├── 2_dataset_creation.ipynb         <<< THIS NOTEBOOK <<<
|   ├── 3_model_training.ipynb
|   ├── 4_model_evaluation.ipynb
|   ├── 5_inference_scheduling.ipynb
|   └── 6_cleanup.ipynb
|
└── utils/
    └── lookout_equipment_utils.py
```

### Notebook configuration update
Amazon Lookout for Equipment being a very recent service, we need to make sure that we have access to the latest version of the AWS Python packages. If you see a `pip` dependency error, check that the `boto3` version is ok: if it's greater than 1.17.48 (the first version that includes the `lookoutequipment` API), you can discard this error and move forward with the next cell:

In [None]:
!pip install --quiet --upgrade sagemaker tqdm

import boto3
print(f'boto3 version: {boto3.__version__} (should be >= 1.17.48 to include Lookout for Equipment API)')

# Restart the current notebook to ensure we take into account the previous updates:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

### Imports

In [None]:
import config
import os
import pandas as pd
import sagemaker
import sys
import time

from datetime import datetime

# Helper functions for managing Lookout for Equipment API calls:
sys.path.append('../utils')
import lookout_equipment_utils as lookout

In [None]:
PROCESSED_DATA = os.path.join('..', 'data', 'processed', 'getting-started')
TRAIN_DATA     = os.path.join(PROCESSED_DATA, 'training-data')

ROLE_ARN       = sagemaker.get_execution_role()
REGION_NAME    = boto3.session.Session().region_name
DATASET_NAME   = config.DATASET_NAME
BUCKET         = config.BUCKET
PREFIX         = config.PREFIX_TRAINING

In [None]:
# List of the directories from the training data 
# directory: each directory corresponds to a subsystem:
components = []
for root, dirs, files in os.walk(f'{TRAIN_DATA}'):
    for subsystem in dirs:
        if subsystem != '.ipynb_checkpoints':
            components.append(subsystem)
        
components

In [None]:
df = pd.DataFrame(columns=['Timestamp', 'Tag1', 'Tag2', 'Tag3'])
df = df.append({'Timestamp': '2019-01-01', 'Tag1': 0.25, 'Tag2': 1.1, 'Tag3': -3.14}, ignore_index=True)
df.to_csv(file1, index=None)

## Create a dataset
---

### Create data schema

First we need to setup the schema of your dataset. In the cell below, we define `DATASET_COMPONENT_FIELDS_MAP`. `DATASET_COMPONENT_FIELDS_MAP` is a Python dictonary (hashmap). The key of each entry in the dictionary is the `Component` name, and the value of each entry is a list of column names. The column names must exactly match the header in your CSV files. The order of the column names also need to exactly match:

```json
DATASET_COMPONENT_FIELDS_MAP = {
    "Component1": ['Timestamp', 'Tag1', 'Tag2',...],
    "Component2": ['Timestamp', 'Tag1', 'Tag2',...]
    ...
    "ComponentN": ['Timestamp', 'Tag1', 'Tag2',...]
}
```

We also need to make sure the component name **matches exactly** the name of the folder in S3 (everything is **case sensitive**). As an example, when creating the data schema for the example we are using here, we will build a the dictionary that will look like this:
```json
DATASET_COMPONENT_FIELDS_MAP = {
    "centrifugal-pump": ['Timestamp', 'Sensor0', 'Sensor1',... , 'Sensor29']
}
```
The following cell builds this map, then convert it into a JSON schema that follows the following format, which is ready to be processed by Lookout for Equipment:

```json
{
  "Components": [
    {
      "ComponentName": "centrifugal-pump",
      "Columns": [
        {"Name": "Timestamp", "Type": "DATETIME"},
        {"Name": "Sensor0", "Type": "DOUBLE"},
        {"Name": "Sensor1", "Type": "DOUBLE"},
        {"Name": "Sensor2", "Type": "DOUBLE"},
        {"Name": "Sensor3", "Type": "DOUBLE"},
          
        ...
          
        {"Name": "Sensor29", "Type": "DOUBLE"}
      ]
    }
  ]
}
```

In [None]:
import json
from typing import List, Dict

def create_data_schema_from_dir(root_dir):
    """
    Generates a data schema compatible for Lookout for Equipment from a local directory
    
    PARAMS
    ======
        root_dir: string
            A path pointing to the root directory where all the CSV files are located
            
    RETURNS
    =======
        schema: string
            A JSON-formatted string ready to be used as a schema for an L4E dataset
    """
    # List of the directories from the training data 
    # directory: each directory corresponds to a subsystem:
    components = []
    for _, dirs, _ in os.walk(root_dir):
        for subsystem in dirs:
            if subsystem != '.ipynb_checkpoints':
                components.append(subsystem)

    # Loops through each subdirectory found in the root dir:
    DATASET_COMPONENT_FIELDS_MAP = dict()
    for subsystem in components:
        subsystem_tags = ['Timestamp']
        for root, _, files in os.walk(f'{root_dir}/{subsystem}'):
            for file in files:
                fname = os.path.join(root, file)
                current_subsystem_df = pd.read_csv(fname, nrows=1)
                subsystem_tags = subsystem_tags + current_subsystem_df.columns.tolist()[1:]

            DATASET_COMPONENT_FIELDS_MAP.update({subsystem: subsystem_tags})
            
    schema = create_data_schema(DATASET_COMPONENT_FIELDS_MAP)
    
    return schema

def create_data_schema(component_fields_map: Dict):
    """
    Generates a JSON formatted string from a dictionary
    
    PARAMS
    ======
        component_fields_map: dict
            A dictionary containing a field maps for the dataset schema
            
    RETURNS
    =======
        schema: string
            A JSON-formatted string ready to be used as a schema for a dataset
    """
    schema = json.dumps(
        _create_data_schema_map(
            component_fields_map=component_fields_map
        )
    )
    
    return schema

def _create_data_schema_map(component_fields_map: Dict):
    """
    Generate a dictionary with the JSON format expected by Lookout for Equipment
    to be used as the schema for a dataset at ingestion, training time and
    inference time
    
    PARAMS
    ======
        component_fields_map: dict
            A dictionary containing a field maps for the dataset schema

    RETURNS
    =======
        data_schema: dict
            A dictionnary containing the detailed schema built from the original
            dictionary mapping
    """
    # Build the schema for the current component:
    component_schema_list = [_create_component_schema(
            component_name, 
            component_fields_map[component_name]
        ) for component_name in component_fields_map
    ]
    
    # The root of the schema is a "Components" tag:
    data_schema = dict()
    data_schema['Components'] = component_schema_list

    return data_schema

def _create_component_schema(component_name: str, field_names: List):
    """
    Build a schema for a given component and fieds list
    
    PARAMS
    ======
        component_name: string
            Name of the component to build a schema for
        
        field_names: list of strings
            Name of all the fields included in this component
            
    RETURNS
    =======
        component_schema: dict 
            A dictionnary containing the detailed schema for a given component
    """
    if len(field_names) == 0:
        raise Exception(f'Field names for component {component_name} should not be empty')
    if len(field_names) == 1:
        raise Exception(f'Component {component_name} must have at least one sensor beyond the timestamp')
        
    # The first field is a timestamp:
    col_list  = [{'Name': field_names[0], 'Type': 'DATETIME'}]
    
    # All the others are float values:
    col_list = col_list + [
        {'Name': field_name, 'Type': 'DOUBLE'} 
        for field_name in field_names[1:]
    ]
    
    # Build the schema for this component:
    component_schema = dict()
    component_schema['ComponentName'] = component_name
    component_schema['Columns'] = col_list
            
    return component_schema

In [None]:
schema = create_data_schema_from_dir(TRAIN_DATA)

pp = pprint.PrettyPrinter(depth=5)
pp.pprint(eval(schema))

In [None]:
schema_input = {
    'component1': ['Timestamp'],
    'component2': ['Timestamp', 'Tag4', 'Tag5', 'Tag6']
}
create_data_schema(schema_input)

In [None]:
schema_expected_output = {
    'Components': [
        {
            'ComponentName': 'component1',
            'Columns': [
                {'Name': 'Timestamp', 'Type': 'DATETIME'},
                {'Name': 'Tag1', 'Type': 'DOUBLE'},
                {'Name': 'Tag2', 'Type': 'DOUBLE'},
                {'Name': 'Tag3', 'Type': 'DOUBLE'}
            ]
        },
        {
            'ComponentName': 'component2',
            'Columns': [
                {'Name': 'Timestamp', 'Type': 'DATETIME'},
                {'Name': 'Tag4', 'Type': 'DOUBLE'},
                {'Name': 'Tag5', 'Type': 'DOUBLE'},
                {'Name': 'Tag6', 'Type': 'DOUBLE'}
            ]
        }
    ]
}
schema_expected_output

In [None]:
test_output == schema_expected_output

In [None]:
DATASET_COMPONENT_FIELDS_MAP = dict()
for subsystem in components:
    subsystem_tags = ['Timestamp']
    for root, _, files in os.walk(f'{TRAIN_DATA}/{subsystem}'):
        for file in files:
            fname = os.path.join(root, file)
            current_subsystem_df = pd.read_csv(fname, nrows=1)
            subsystem_tags = subsystem_tags + current_subsystem_df.columns.tolist()[1:]

        DATASET_COMPONENT_FIELDS_MAP.update({subsystem: subsystem_tags})

import pprint
pp = pprint.PrettyPrinter(depth=5)
pp.pprint(eval(create_data_schema(DATASET_COMPONENT_FIELDS_MAP)))

In [None]:
        
lookout_dataset = lookout.LookoutEquipmentDataset(
    dataset_name=DATASET_NAME,
    component_fields_map=DATASET_COMPONENT_FIELDS_MAP,
    region_name=REGION_NAME,
    access_role_arn=ROLE_ARN
)

If you wanted to use the console, the following string would be the one to use to configure the **dataset schema**:

![Dataset creation with schema](assets/dataset-schema.png)

In [None]:
import pprint
pp = pprint.PrettyPrinter(depth=5)
pp.pprint(eval(lookout_dataset.dataset_schema))

### Create the dataset
The following method encapsulate the [**CreateDataset**](https://docs.aws.amazon.com/lookout-for-equipment/latest/ug/API_CreateDataset.html) API:

```python
lookout_client.create_dataset(
    DatasetName=self.dataset_name,
    DatasetSchema={
        'InlineDataSchema': "schema"
    }
)
```

In [None]:
lookout_dataset.create()

The dataset is now created, but it is empty and ready to receive some timeseries data that we will ingest from the S3 location prepared in the previous notebook:

![Dataset created](assets/dataset-created.png)

## Ingest data into a dataset
---
Let's double check the values of all the parameters that will be used to ingest some data into an existing Lookout for Equipment dataset:

In [None]:
ROLE_ARN, BUCKET, PREFIX, DATASET_NAME

Launch the ingestion job in the Lookout for Equipment dataset: the following method encapsulates the [**StartDataIngestionJob**](https://docs.aws.amazon.com/lookout-for-equipment/latest/ug/API_StartDataIngestionJob.html) API:

```python
lookout_client.start_data_ingestion_job(
    DatasetName=DATASET_NAME,
    RoleArn=ROLE_ARN, 
    IngestionInputConfiguration={ 
        'S3InputConfiguration': { 
            'Bucket': BUCKET,
            'Prefix': PREFIX
        }
    }
)
```

In [None]:
response = lookout_dataset.ingest_data(BUCKET, PREFIX)

The ingestion is launched. With this amount of data (around 50 MB), it should take between less than 5 minutes:

![dataset_schema](assets/dataset-ingestion-in-progress.png)

We use the following cell to monitor the ingestion process by calling the [**DescribeDataIngestionJob**](https://docs.aws.amazon.com/lookout-for-equipment/latest/ug/API_DescribeDataIngestionJob.html) API every 60 seconds:

In [None]:
# Get the ingestion job ID and status:
data_ingestion_job_id = response['JobId']
data_ingestion_status = response['Status']

# Wait until ingestion completes:
print("=====Polling Data Ingestion Status=====\n")
lookout_client = lookout.get_client(region_name=REGION_NAME)
print(str(pd.to_datetime(datetime.now()))[:19], "|", data_ingestion_status)

while data_ingestion_status == 'IN_PROGRESS':
    time.sleep(60)
    describe_data_ingestion_job_response = lookout_client.describe_data_ingestion_job(JobId=data_ingestion_job_id)
    data_ingestion_status = describe_data_ingestion_job_response['Status']
    print(str(pd.to_datetime(datetime.now()))[:19], "|", data_ingestion_status)
    
print("\n=====End of Polling Data Ingestion Status=====")

In case any issue arise, you can inspect the API response available as a JSON document:

In [None]:
lookout_client.describe_data_ingestion_job(JobId=data_ingestion_job_id)

The ingestion should now be complete as can be seen in the console:

![Ingestion done](assets/dataset-ingestion-done.png)

## Conclusion
---

In this notebook, we created a **Lookout for Equipment dataset** and ingested the S3 data previously uploaded into this dataset. **Move now to the next notebook to train a model based on these data.**