# OMF CONNECT data services Developer Quick Start Guide



### Prerequisites
To follow along with this guide you need access to the following in AVEVA Data Hub
* Client-Credentials Client Id and Secret
* Tenant
* Namespace 

Basic Python and REST knowledge is helpful as this notebook utilizes these to make API calls to AVEVA Data Hub.


# Overview

This guide illustrates an example of creating an OMF connection and configuring it to send data into AVEVA Data Hub and the Sequential Data Store (SDS). 

As an example we will send data for a drone, including its location, battery, and operating temperature. We cover how to create the OMF connection along with the appropriate OMF types and containers, and finally send data to our OMF endpoint. Once the data is sent to AVEVA Data Hub we will validate the ingress process by reading some data values from SDS.

### The quick start is structured as follows:
* [OMF Connection Basics](#omf-connection-basics)
* [Introduction of the Code](#introduction-of-the-code)
* [Configuring OMF Data Flow](#configuring-omf-data-flow)
* [Validation](#validation)

The following cell reads the application settings that are required in order to execute the notebook. See the [readme](README.md) for further instructions. 

In [1]:
import json

def read_config():
    with open('appsettings.json') as c:
        config = json.load(c)
    return config

appsettings = read_config()
resource = appsettings['Resource']
clientId = appsettings['ClientId']
clientSecret = appsettings['ClientSecret']
tenantId = appsettings['TenantId']
namespaceId = appsettings['NamespaceId']
apiversion = appsettings['ApiVersion']
OMFConnectionName = appsettings['OMFConnectionName']
OMFConnectionDescription = appsettings['OMFConnectionDescription']

# OMF Connection Basics
An OMF connection is what allows you to post data using a client-credentials client to what is called the OMF endpoint of a namespace. The OMF endpoint is defined for an OMF connection in a namespace and allows users to make POST calls with OMF messages to be proccessed. We will use this to send all of the OMF messages in this guide. This data will then be ingressed to AVEVA Data Hub and the Sequential Data Store.

An OMF Connection is made up of one or more Client Credential Clients, a Topic, and a Subscription. Data is sent to a Topic via a Client, where the data is buffered and made available for the Subscription. The Subscription relays data from the Topic to the Sequential Data Store in the namespace that the Subscription resides in. 

The following diagram illustrates the flow of a namespace in SDS with 2 OMF endpoints receiving data from different clients in AVEVA Data Hub

![Connection](images/OMF_FLOW.png)

To understand how OMF connections are configured in AVEVA Data Hub it is good to first understand the basics of how client-credentials clients. 

#### Client Credential Client
Client-credentials clients are used for machine-to-machine communication without the presence of a user. These clients are issued an identifier and secret upon creation, which are later used for authentication against AVEVA Data Hub. More than one secret can be created for a client. Because they access resources on AVEVA Data Hub and are not associated to users, these clients can be assigned any of the roles in the tenant. 

## Creating OMF Connections

OMF Connections only need to be created once for an application or system sending data to AVEVA Data Hub and there are two ways to do so, described below.

### Creating an OMF connection using the AVEVA Data Hub Portal

If the OMF Connection does not need to be created programmatically, the following video shows how you can create the connection in the AVEVA Data Hub Portal, which features a simple UI that will guide you through the process: https://www.youtube.com/watch?v=52lAnkGC1IM. If you choose to create, or have already created, the connection in the portal for this guide, skip the [Creating our OMF Connection section](#creating-our-omf-connection).

### Creating an OMF connection programmatically

If the entire process needs to be executed programmatically the process is slighty different. We guide you through creating a connection for our example in the [Creating our OMF Connection section](#creating-our-omf-connection).  


# Introduction of the Code

Following is a script that defines the code that will be used in this notebook, along with documentation for the methods.

#### getToken
This method shows how we can programmatically retrieve a bearer token used for authenticating against AVEVA Data Hub when making API calls. We first get the token endpoint from the publicly available identity endpoint, and then use that endpoint to make a POST call specifying our client id and secret to get a response including our token. This will by default expire after 3600 seconds and will allow us to make API calls carrying the privileges of the client specified. 

#### createOMFConnection
This method creates a new OMF Connection in Aveva Data Hub mapped to the specified ClientIds by making a POST call to the endpoint. A given ClientId may only be mapped to one OmfConnection per namespace.

#### checkOMFConnectionState
This method verifies the current state of the OMF connection in Aveva Data Hub. This method is used to make sure that the creation of the OMF connection is finished before proceeding with the next steps of the code.

#### createType, createContainer, sendData
These methods send OMF messages to the OMF endpoint of the specified namespace in order to create types, containers, or post data. Note the headers added to the specific methods; these are required when sending OMF messages to indicate how to process the sent message.

#### getSdsType, getSdsStream, getLastValue
These methods make API calls to SDS in order to retrieve types, streams, and the last recorded value for a certain stream

In [2]:
import json
import requests
import time

def getToken():
    # Get OAuth endpoint configuration
    endpoint = json.loads(requests.get(
        f'{resource}/identity/.well-known/openid-configuration').content)
    token_endpoint = endpoint.get('token_endpoint')

    tokenInformation = requests.post(
        token_endpoint,
        data={'client_id': clientId,
                'client_secret': clientSecret,
                'grant_type': 'client_credentials'})

    token = json.loads(tokenInformation.content)

    expiration = token.get('expires_in', None)
    if expiration is None:
        raise Exception(
            f'Failed to get token, check client id/secret: {token["error"]}')

    expiration = float(expiration) + time.time()
    token = token['access_token']
    return token

def createOMFConnection(OMFConnectionJson):
    token = getToken()
    headers = {"Authorization": f'Bearer {token}', "Content-Type": "application/json"}
    response = requests.post(f'{resource}/api/{apiversion}/Tenants/{tenantId}/Namespaces/{namespaceId}/OmfConnections', 
                            data=OMFConnectionJson, 
                            headers=headers)

    if response.status_code == 202:
        return json.loads(response.text)["Id"]
    else:
        raise Exception(f'Failed to create OMF Connection with message: {response.text}, status code: {response.status_code}')

def checkOMFConnectionState(OMFConnectionIdRead):
    token = getToken()
    headers = {"Authorization": f'Bearer {token}', "Content-Type": "application/json"}
    response = requests.get(f'{resource}/api/{apiversion}/Tenants/{tenantId}/Namespaces/{namespaceId}/OmfConnections/{OMFConnectionIdRead}', 
                            headers=headers)
    return json.loads(response.text)['State']

def getSdsType(typeId):
    token = getToken()
    headers = {
        "Authorization": f'Bearer {token}', 
    }

    response = requests.get(f'{resource}/api/{apiversion}/Tenants/{tenantId}/Namespaces/{namespaceId}/Types/{typeId}', 
                            headers=headers)
    print(json.loads(response.text))
    print()

def getSdsStream(streamId):
    token = getToken()
    headers = {
        "Authorization": f'Bearer {token}', 
    }

    response = requests.get(f'{resource}/api/{apiversion}/Tenants/{tenantId}/Namespaces/{namespaceId}/Streams/{streamId}', 
                            headers=headers)
    print(json.loads(response.text))
    print()

def getLastValue(streamId):
    token = getToken()
    headers = {
        "Authorization": f'Bearer {token}', 
    }

    response = requests.get(f'{resource}/api/{apiversion}/Tenants/{tenantId}/Namespaces/{namespaceId}/Streams/{streamId}/Data/Last', 
                            headers=headers)
    print(json.loads(response.text))
    print()

def createType(typeJson):
    token = getToken()
    headers = {
        "Authorization": f'Bearer {token}', 
        "Content-Type": "application/json",
        "omfversion": "1.2",
        "action": "create",
        "messageformat": "json",
        "messagetype": "type"
    }

    response = requests.post(f'{resource}/api/{apiversion}/Tenants/{tenantId}/Namespaces/{namespaceId}/omf', 
                            data=typeJson, 
                            headers=headers)

    if response.status_code == 202:
        print(f'Request sent with status code 202, operation-id = ' + json.loads(response.text)["Operation-Id"])
    else:
        raise Exception(f'Failed to create type with message: {response.text}, status code: {response.status_code}')

def createContainer(containerJson):
    token = getToken()
    headers = {
        "Authorization": f'Bearer {token}', 
        "Content-Type": "application/json",
        "omfversion": "1.2",
        "action": "create",
        "messageformat": "json",
        "messagetype": "container"
    }

    response = requests.post(f'{resource}/api/{apiversion}/Tenants/{tenantId}/Namespaces/{namespaceId}/omf', 
                            data=containerJson, 
                            headers=headers)

    if response.status_code == 202:
        print(f'Request sent with status code 202, operation-id = ' + json.loads(response.text)["Operation-Id"])
    else:
        raise Exception(f'Failed to create container with message: {response.text}, status code: {response.status_code}')

def sendData(dataJson):
    token = getToken()
    headers = {
        "Authorization": f'Bearer {token}', 
        "Content-Type": "application/json",
        "omfversion": "1.2",
        "action": "create",
        "messageformat": "json",
        "messagetype": "data"
    }

    response = requests.post(f'{resource}/api/{apiversion}/Tenants/{tenantId}/Namespaces/{namespaceId}/omf', 
                            data=dataJson, 
                            headers=headers)

    if response.status_code == 202:
        print(f'Request sent with status code 202, operation-id = ' + json.loads(response.text)["Operation-Id"])
    else:
        raise Exception(f'Failed to send data with message: {response.text}, status code: {response.status_code}')

def cleanup():
    token = getToken()
    headers = {"Authorization": f'Bearer {token}'}
    print(f'Deleting OMFConnection {OMFConnectionId}')
    requests.delete(f'{resource}/api/{apiversion}/Tenants/{tenantId}/Namespaces/{namespaceId}/OmfConnections/{OMFConnectionId}', 
                    headers=headers)

def test_finished():
    global finished
    assert finished

    cleanup()

## Creating our OMF Connection

Using the methods defined in the code block above, we will create an OMF connection

In [3]:
# Create the OMF Connection
OMFConnection = {
    "Name": OMFConnectionName,
    "Description": OMFConnectionDescription,
    "ClientIds": [
        clientId
    ],
}

OMFConnectionId = createOMFConnection(json.dumps(OMFConnection))

#query Cds to get the status of the OMF connection creation
while checkOMFConnectionState(OMFConnectionId) != 'Active':
    print(f'Creating OMF Connection in CONNECT data services')
    time.sleep(5)


print(f'Created OMF Connection Id: {OMFConnectionId}')


Created OMF Connection Id: ebfd2a25-2694-49cb-9d33-f896c18f4370


# Checkpoint
At this point in the guide you should have an OMF Connection created in AVEVA Data Hub for the namespace and client Id used in this notebook. If not, please see the [OMF Connection Basics](#omf-connection-basics) section and follow the steps before continuing.

# Configuring the OMF Data Flow

OMF message types fall into three categories: Type, Container, and Data, which are described below. Each message type creates a different type of data and contains keywords that define characteristics of the data. Most of the message types are used to create the structure of the data and give it meaning

## OMF Types
Types are used within OMF to define the structure of data. Dynamic types are used by containers to create streams of sequentially indexed data. Static types are used by the endpoint to define non-streamed data, such as assets. Enum types are used to create an array name/value pairs to create a limited set of values for a property in a dynamic or static type. The type messages are used to create and delete types, which we configure by setting the *action* header in our call to *create* or *delete*.

A type message is interpreted by AVEVA Data Hub as an SdsType in the Sequential Data Store. Because SdsTypes are immutable, update operations are not supported.

OMF types best practices:
* Reuse types whenever possible, avoid creating one type per stream when not necessary. For example, if you have 5 different containers that send integer data, you should only create 1 type and reuse that for all 5 containers. When OMF types are ingressed to AVEVA Data Hub they are translated into SDS types, and creating a large number of duplicated types could complicate working with your data across AVEVA Data Hub. Types also do not need be specific to OMF applications, you could reuse a type from a different OMF application or one that is already available on AVEVA Data Hub.

* All properties within a type should be inter-dependent, where the properties are used together to make sense of the data.  For example, we will be creating a type for drone location, where the properties will be Timestamp, Longitude, Latitude, and Altitude.  Removing or omitting one of these properties would result in the type being incomplete, and unable to store the drone’s location properly. Also, adding a property to this Type, such as the battery measurement, is not advised, because it is not a dependent property, and we may want to report this value at a different frequency. Missing a property at an event causes a default value to be recorded which will either misrepresent or break interpolation of that property around that event key.

For more information on types, see https://docs.aveva.com/bundle/omf/page/types/type-messages.html

## Creating our OMF Types

We will create two OMF types for this guide, one type for sending location data including coordinates and altitude, and a simple type to be shared for temperature and battery percentage data.

The following defines our first type
```json
{
    "id": "DroneLocationType",
    "type": "object",
    "classification": "dynamic",
    "extrapolation": "all",
    "properties": {
        "Timestamp": { "type": "string", "isindex": "true", "format": "date-time" },
        "Long": { "type": "number", "format": "float32" },
        "Lat": { "type": "number", "format": "float32" },
        "Alt": { "type": "number", "format": "float32" }
    }
}
```
Next we will define a simple time value type that will be shared between the simple data streams
```json
{
    "id": "DroneDataType",
    "type": "object",
    "classification": "dynamic",
    "extrapolation": "all",
    "properties": {
        "Timestamp": { "type": "string", "isindex": "true", "format": "date-time" },
        "Value": { "type": "number", "format": "float32" }
    }
}
```

Below we call our createType method to create these types. Note that OMF messages support a JSON array of objects so that we can batch all of our containers to create in one HTTP call.

In [4]:
droneTypes = [{
    "id": "DroneLocationType",
    "type": "object",
    "classification": "dynamic",
    "properties": {
        "Timestamp": { "type": "string", "isindex": True, "format": "date-time" },
        "Long": { "type": "number", "format": "float32" },
        "Lat": { "type": "number", "format": "float32" },
        "Alt": { "type": "number", "format": "float32" }
    }
},
{
    "id": "DroneDataType",
    "type": "object",
    "classification": "dynamic",
    "properties": {
        "Timestamp": { "type": "string", "isindex": True, "format": "date-time" },
        "Value": { "type": "number", "format": "float32" }
    }
}]

createType(json.dumps(droneTypes))

Request sent with status code 202, operation-id = 6c5b9ba3b781934c9822597d14f1cf1e


## OMF Request Responses
In the output from preceeding code you should see an operation-id. When posting OMF messages to AVEVA Data Hub, a successful request returns a 202 Accepted response code along with an operation-id. This response indicates that the supplied JSON was accepted as valid and that the request has been submitted for processing. This response is from the OMF API accepting the request body, the actual OMF message is still being processed by AVEVA Data Hub and a successful processing of the message is not guaranteed at this point. 

If there are any issues with the data flow, contact AVEVA Technical Support with this operation-id. 

The following diagram illustrates the basic flow of an OMF message request
![OMFMessage](images/omfmessage.png)


## OMF Containers
An OMF container message uses an OMF type as a template to create a way to collect and group data events, and the container message is interpreted as an SdsStream in the Sequential Data Store. OMF Containers can be defined for Types whose classification is dynamic, and provides streams of data events. Each Container has a unique ID defined by the user.

Immediately after a type has been registered using a type message, containers may be created using that type. Each container represents an instance of the OMF type, meaning that for this example, if we had 100 drones we would create one container for each drone's location, where every container would be of type DroneLocationType.

For more information on containers, see https://docs.aveva.com/bundle/omf/page/containers/container-messages.html



## Creating our OMF Containers

The next step in writing OMF data is to create our OMF containers.

The following are our OMF container definitions

```json
{
    "id": "DroneLocationContainer",
    "typeid": "DroneLocationType"
}

{
    "id": "DroneBatteryContainer",
    "typeid": "DroneDataType"
}

{
    "id": "DroneTemperatureContainer",
    "typeid": "DroneDataType"
}
```
Note that, as mentioned in the types section, we are using the DroneDataType for both the battery and temperature containers following the best practices for OMF types.

We are using very simple containers that simply specify the TypeId and then use default properties.  Please see the OMF Container documentation for a list of all the possible options.

Below we call our createContainer method to create these containers, note that OMF messages support a JSON array of objects so that we have the option to batch all of our containers to create in one HTTP call or send one container per call.

In [5]:
locationAndBatteryContainer = [{
    "id": "DroneLocationContainer",
    "typeid": "DroneLocationType"
},
{
    "id": "DroneBatteryContainer",
    "typeid": "DroneDataType"
}]

createContainer(json.dumps(locationAndBatteryContainer))

tempContainer = [{
    "id": "DroneTemperatureContainer",
    "typeid": "DroneDataType"
}]

createContainer(json.dumps(tempContainer))

Request sent with status code 202, operation-id = 81e964db5ae606fcb1bbdc1c698eb487
Request sent with status code 202, operation-id = bbbc257c645c30df48ab059334f6c0f4


## Sending OMF Data

An OMF data message sends actual data events, like time-series data, to be stored. A data message is mapped to generic SDS values in the Sequential Data Store, where every index can have at most one value.

As our types and containers are now defined, we will write data to the containers using simple example data based on our types' properties.

In [6]:
data = [{
    "containerid": "DroneTemperatureContainer",
    "values": [{
        "Timestamp": "2021-12-11T22:23:23.430Z",
        "Value": "98.3"
    },
    {
        "Timestamp": "2021-12-11T22:24:23.430Z",
        "Value": "96.3"
    },
    {
        "Timestamp": "2021-12-11T22:25:23.430Z",
        "Value": "99.3"
    }]
},
{
    "containerid": "DroneLocationContainer",
    "values": [{
        "Timestamp": "2021-12-11T22:23:23.430Z",
        "Long": "-77.0364",
        "Lat": "38.8951",
        "Alt": "230"
    },
    {
        "Timestamp": "2021-12-11T22:24:23.430Z",
        "Long": "-77.0364",
        "Lat": "38.8951",
        "Alt": "230"
    },
    {
        "Timestamp": "2021-12-11T22:25:23.430Z",
        "Long": "-77.0364",
        "Lat": "38.8951",
        "Alt": "230"
    }]
},
{
    "containerid": "DroneBatteryContainer",
    "values": [{
        "Timestamp": "2021-12-11T22:23:23.430Z",
        "Value": "24"
    },
    {
        "Timestamp": "2021-12-11T22:24:23.430Z",
        "Value": "24"
    },
    {
        "Timestamp": "2021-12-11T22:25:23.430Z",
        "Value": "23"
    }]
}]

sendData(json.dumps(data))

Request sent with status code 202, operation-id = b321904856db99407292875189df430a


## Overall workflow of developing and running an OMF application

To illustrate the workflow or developing and running your OMF application we can divide it into two parts. 

The first part that is only performed once where we define our data to create types and containers as required, followed by the second part where we continuously write data to our OMF connection using the containers previously created.

![Workflow](images/application.png)


## Validation

At this point in the guide we have created and sent
* Types defining our data structures
* Containers to collect and group data events in streams
* Data for our OMF structures

To validate that the data messages made it to AVEVA Data Hub and were ingressed properly, we will make a call to the SDS APIs to read the created SdsTypes, SdsStreams, and data values.

In [7]:
print('DroneLocationType SdsType = ')
getSdsType('DroneLocationType')

print('DroneDataType SdsType = ')
getSdsType('DroneDataType')

print('DroneLocationContainer SdsStream = ')
getSdsStream('DroneLocationContainer')

print('DroneBatteryContainer SdsStream = ')
getSdsStream('DroneBatteryContainer')

print('DroneTemperatureContainer SdsStream = ')
getSdsStream('DroneTemperatureContainer')

print('DroneLocationContainer Data = ')
getLastValue('DroneLocationContainer')

print('DroneBatteryContainer Data = ')
getLastValue('DroneBatteryContainer')

print('DroneTemperatureContainer Data = ')
getLastValue('DroneTemperatureContainer')

global finished
finished = True

DroneLocationType SdsType = 
{'Id': 'DroneLocationType', 'Name': 'DroneLocationType', 'Description': '', 'SdsTypeCode': 1, 'Properties': [{'Id': 'Timestamp', 'Name': 'Timestamp', 'Description': '', 'IsKey': True, 'SdsType': {'Id': 'DroneLocationType.Timestamp', 'Name': 'DroneLocationType.Timestamp', 'Description': '', 'SdsTypeCode': 16}}, {'Id': 'Long', 'Name': 'Long', 'Description': '', 'SdsType': {'Id': 'DroneLocationType.Long', 'Name': 'DroneLocationType.Long', 'Description': '', 'SdsTypeCode': 13}}, {'Id': 'Lat', 'Name': 'Lat', 'Description': '', 'SdsType': {'Id': 'DroneLocationType.Lat', 'Name': 'DroneLocationType.Lat', 'Description': '', 'SdsTypeCode': 13}}, {'Id': 'Alt', 'Name': 'Alt', 'Description': '', 'SdsType': {'Id': 'DroneLocationType.Alt', 'Name': 'DroneLocationType.Alt', 'Description': '', 'SdsTypeCode': 13}}]}

DroneDataType SdsType = 
{'Id': 'DroneDataType', 'Name': 'DroneDataType', 'Description': '', 'SdsTypeCode': 1, 'Properties': [{'Id': 'Timestamp', 'Name': 'Timest

In [8]:
test_finished()

Deleting OMFConnection ebfd2a25-2694-49cb-9d33-f896c18f4370


#### We have now ingressed data using OMF into AVEVA Data Hub and the Sequential Data Store