# Observability metrics real-time monitoring

## Use case

For the data flow diagram and the use case overview, pls refer to this page

https://support.activeviam.com/confluence/pages/viewpage.action?spaceKey=ATOTI&title=o11y+use+case

## Data model


Open telemetry provides a flexible data structure for metrics and their observations (data points). The list of attributes is extendable. However for the standard metrics, the list of fields is specified - see [Semantic Conventions](https://github.com/open-telemetry/opentelemetry-specification/tree/master/specification/metrics/semantic_conventions).

In this notebook, we create sample data with a set of assumptions:

- Metrics are limited to "System Metrics" and the list of fields is pre-defined based on the [Semantic Conventions System Metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/metrics/semantic_conventions/system-metrics.md).
- Each message provides only 1 metric
- Each message provides only 1 data point (single measurement and no pre-aggregations on the publisher side)

This is a sample message based on the open telemetry [proto specification](https://github.com/open-telemetry/opentelemetry-proto/blob/b54688569186e0b862bf7462a983ccf2c50c0547/opentelemetry/proto/metrics/v1/metrics.proto#L362-L385):
    
    

<details>
  <summary>Expand sample message </summary>

```
{
    "resource": {
        "attributes": [
            {
                "key": "az",
                "value": {"stringValue": "ap-southeast-2a"},
            },  # availability zone: az1, az2, az3
            {
                "key": "container_id",
                "value": {"stringValue": "ascscs11243asc"},
            },  # 5 containers
            {
                "key": "service_id",
                "value": {"stringValue": "myservice"},
            },  # service id: identity service, media service, heartbit, content service 1000
        ]
    },
    "instrumentationLibraryMetrics": [
        {
            "instrumentationLibrary": {
                "name": "github.com/aws/aws-sdk-go/aws/service/s3"
            },
            "metrics": [
                {
                    "name": "storage.operation.duration_μs",  # 500 http.latency, http.errors.count, cpu.utilization, http.request.size
                    "doubleHistogram": {
                        "dataPoints": [
                            {
                                "labels": [
                                    {"key": "status", "value": "success"},
                                    {
                                        "key": "storage.destination",
                                        "value": "rps--ddev",
                                    },
                                    {"key": "storage.operation", "value": "GetObject"},
                                    {"key": "storage.system", "value": "s3"},
                                    {
                                        "key": "trace.id",
                                        "value": "4308d78604e6dded7896a76b9b55b57b",
                                    },
                                ],
                                "startTimeUnixNano": "1605157776531067020",
                                "timeUnixNano": "1605157786531063396",
                                "count": "1",
                                "sum": 33734.0,  # 300 ms
                                "bucketCounts": [
                                    "4674869179239628800",
                                    "4674869179239628800",
                                ],
                                "explicitBounds": [0.0, 100.0],
                            }
                        ]
                    },
                }
            ],
        }
    ],
}
```
    
</details>



We will be flattening all observations into a single data store - Data Points.

## Example business rules

Messages from IT services are streamed in real-time, and the observations are aggregated, for instance using 90% quantile. Aggregated values are monitored and alerts are sent, for instance:

- for premium clients p-90 latency should be within boundaries, send alert if P-90 latency is more than 300 ms.
- if average CPU over the last minute is above a threshold -> alarm

# Memory sizing - approximation

Assumptions about cardinality:
    
    - 90 mln data points per minute
    - 700 different metrics
    - 15000 different resource ids:
        - a few availability zones
        - thousands service ids and a few container ids
    - labels
        - thousands of device ids, or urls, or hosts
    
Based on the example message above, 90 mln points per takes ~0.6 gb of memory - for python in-memory dict object.

In [2]:
message = {
    "resource": {
        "attributes": [
            {
                "key": "az",
                "value": {"stringValue": "ap-southeast-2a"},
            },  # availability zone: az1, az2, az3
            {
                "key": "container_id",
                "value": {"stringValue": "ascscs11243asc"},
            },  # 5 containers
            {
                "key": "service_id",
                "value": {"stringValue": "myservice"},
            },  # service id: identity service, media service, heartbit, content service 1000
        ]
    },
    "instrumentationLibraryMetrics": [
        {
            "instrumentationLibrary": {
                "name": "github.com/aws/aws-sdk-go/aws/service/s3"
            },
            "metrics": [
                {
                    "name": "storage.operation.duration_μs",  # 500 http.latency, http.errors.count, cpu.utilization, http.request.size
                    "doubleHistogram": {
                        "dataPoints": [
                            {
                                "labels": [
                                    {"key": "status", "value": "success"},
                                    {
                                        "key": "storage.destination",
                                        "value": "rps--ddev",
                                    },
                                    {"key": "storage.operation", "value": "GetObject"},
                                    {"key": "storage.system", "value": "s3"},
                                    {
                                        "key": "trace.id",
                                        "value": "4308d78604e6dded7896a76b9b55b57b",
                                    },
                                ],
                                "startTimeUnixNano": "1605157776531067020",
                                "timeUnixNano": "1605157786531063396",
                                "count": "1",
                                "sum": 33734.0,  # 300 ms
                                "bucketCounts": [
                                    "4674869179239628800",
                                    "4674869179239628800",
                                ],
                                "explicitBounds": [0.0, 100.0],
                            }
                        ]
                    },
                }
            ],
        }
    ],
}

from __future__ import print_function

from collections import deque
from itertools import chain
from sys import getsizeof, stderr

try:
    from reprlib import repr
except ImportError:
    pass


def total_size(o, handlers={}, verbose=False):
    """Returns the approximate memory footprint an object and all of its contents.

    Automatically finds the contents of the following builtin containers and
    their subclasses:  tuple, list, deque, dict, set and frozenset.
    To search other containers, add handlers to iterate over their contents:

        handlers = {SomeContainerClass: iter,
                    OtherContainerClass: OtherContainerClass.get_elements}

    """
    dict_handler = lambda d: chain.from_iterable(d.items())
    all_handlers = {
        tuple: iter,
        list: iter,
        deque: iter,
        dict: dict_handler,
        set: iter,
        frozenset: iter,
    }
    all_handlers.update(handlers)  # user handlers take precedence
    seen = set()  # track which object id's have already been seen
    default_size = getsizeof(0)  # estimate sizeof object without __sizeof__

    def sizeof(o):
        if id(o) in seen:  # do not double count the same object
            return 0
        seen.add(id(o))
        s = getsizeof(o, default_size)

        if verbose:
            print(s, type(o), repr(o), file=stderr)

        for typ, handler in all_handlers.items():
            if isinstance(o, typ):
                s += sum(map(sizeof, handler(o)))
                break
        return s

    return sizeof(o)


print(total_size(message) * 90000000 / (2 ** 30), "mb")

620.8475679159164 mb


# Sample data

### Sample meta attributes

In [3]:
def generate_members(prefix, num):
    # This is to generate a list of members of a given size
    num_suffix = num // len(prefix)
    return ["{}_{}".format(p, str(s)) for p in prefix for s in range(num_suffix)]

In [4]:
availability_zones = ["az1", "az2", "az3"]

In [5]:
# Different containers
container_id = ["container"]
container_id_num = 5
cont_list = generate_members(container_id, container_id_num)
cont_list[:5]

['container_0', 'container_1', 'container_2', 'container_3', 'container_4']

In [6]:
# Different services
services_prefix = ["identity service", "media service", "content service"]
services_num = 1000
serv_list = generate_members(services_prefix, services_num)
serv_list[:5]

['identity service_0',
 'identity service_1',
 'identity service_2',
 'identity service_3',
 'identity service_4']

In [7]:
# Different device ids
device_prefix = ["hdgsfysdfbf"]
device_num = 2000
device_ids = generate_members(device_prefix, device_num)
device_ids[:5]

['hdgsfysdfbf_0',
 'hdgsfysdfbf_1',
 'hdgsfysdfbf_2',
 'hdgsfysdfbf_3',
 'hdgsfysdfbf_4']

### System metrics semantic conventions

https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/metrics/semantic_conventions/system-metrics.md



In [8]:
system_metrics = {
    "system.cpu.time": {
        "labels": {
            "state": ["idle", "user", "system", "interrupt"],
            "cpu": [i for i in range(32)],
        },
        "type": "Double",
    },
    "system.cpu.utilization": {
        "labels": {
            "state": ["idle", "user", "system", "interrupt"],
            "cpu": [i for i in range(32)],
        },
        "type": "Double",
    },
    "system.memory.usage": {
        "labels": {
            "state": ["idle", "user", "system", "interrupt"],
        },
        "type": "Int64",
    },
    "system.memory.utilization": {
        "labels": {
            "state": ["idle", "user", "system", "interrupt"],
        },
        "type": "Double",
    },
    "system.paging.usage": {
        "labels": {
            "state": ["used", "free"],
        },
        "type": "Int64",
    },
    "system.paging.utilization": {
        "labels": {
            "state": ["used", "free"],
        },
        "type": "Double",
    },
    "system.paging.faults": {
        "labels": {
            "type": ["minor", "major"],
        },
        "type": "Int64",
    },
    "system.paging.operations": {
        "labels": {
            "type": ["minor", "major"],
        },
        "type": "Int64",
    },
    "system.disk.io": {
        "labels": {"device": device_ids, "direction": ["read", "write"]},
        "type": "Int64",
    },
    "system.disk.operations": {
        "labels": {"device": device_ids, "direction": ["read", "write"]},
        "type": "Int64",
    },
    "system.disk.io_time": {
        "labels": {"device": device_ids},
        "type": "Double",
    },
    "system.disk.operation_time": {
        "labels": {"device": device_ids, "direction": ["read", "write"]},
        "type": "Double",
    },
    "system.disk.merged": {
        "labels": {"device": device_ids, "direction": ["read", "write"]},
        "type": "Int64",
    },
    "system.filesystem.usage": {
        "labels": {
            "device": device_ids,
            "state": ["used", "free", "reserved"],
            "type": ["ext4", "tmpfs"],
            "mode": ["rw", "ro"],
        },
        "type": "Int64",
    },
    "system.filesystem.utilization": {
        "labels": {
            "device": device_ids,
            "state": ["used", "free", "reserved"],
            "type": ["ext4", "tmpfs"],
            "mode": ["rw", "ro"],
        },
        "type": "Double",
    },
    "system.network.dropped": {
        "labels": {
            "device": device_ids,
            "direction": ["transmit", "receive", "reserved"],
        },
        "type": "Int64",
    },
    "system.network.packets": {
        "labels": {
            "device": device_ids,
            "direction": ["transmit", "receive", "reserved"],
        },
        "type": "Int64",
    },
    "system.network.errors": {
        "labels": {
            "device": device_ids,
            "direction": ["transmit", "receive", "reserved"],
        },
        "type": "Int64",
    },
    "system.network.io": {
        "labels": {
            "device": device_ids,
            "direction": ["transmit", "receive", "reserved"],
        },
        "type": "Int64",
    },
    "system.network.connections": {
        "labels": {
            "device": device_ids,
            "protocol": ["tcp", "udp", "atp", "il"],
            "state": [
                "LISTEN",
                "SYN-SENT",
                "SYN-RECEIVED",
                "ESTABLISHED",
                "FIN-WAIT-1",
                "FIN-WAIT-2",
                "CLOSE-WAIT",
                "CLOSING",
                "LAST-ACK",
                "TIME-WAIT",
                "CLOSED",
                "",
            ],
        },
        "type": "Int64",
    },
    "system.process.count": {
        "labels": {"status": ["running", "sleeping", "dead"]},
        "type": "Int64",
    },
}

### Message

In [9]:
import json
import random
import time


def generate_message(data_points_num=1):
    # This function picks a random metric, fetches required labels and populates them randomly

    az = random.choice(availability_zones)
    container = random.choice(cont_list)
    service = random.choice(serv_list)

    metric = random.choice([*system_metrics.keys()])
    dps = []
    for i in range(data_points_num):

        lbl_def = system_metrics[metric]["labels"]

        labels = [
            {"key": l, "value": random.choice(lbl_def[l])} for l in [*lbl_def.keys()]
        ]
        dps = dps + [
            {
                "labels": labels,
                "startTimeUnixNano": str(int(time.time_ns())),
                "count": "1",
                "sum": random.random()
                if system_metrics[metric]["type"] == "Double"
                else random.randint(0, 500),  # 300 ms
                "bucketCounts": [
                    "4674869179239628800",
                    "4674869179239628800",
                ],
                "explicitBounds": [0.0, 100.0],
            }
        ]

    # Populating the message

    message = {
        "resource": {
            "attributes": [
                {
                    "key": "az",
                    "value": {"stringValue": az},
                },  # availability zone: az1, az2, az3
                {
                    "key": "container_id",
                    "value": {"stringValue": container},
                },  # 5 containers
                {
                    "key": "service_id",
                    "value": {"stringValue": service},
                },  # service id: identity service, media service, heartbit, content service 1000
            ]
        },
        "instrumentationLibraryMetrics": [
            {
                "instrumentationLibrary": {
                    "name": "github.com/aws/aws-sdk-go/aws/service/s3"
                },
                "metrics": [
                    {
                        "name": metric,  # 500 http.latency, http.errors.count, cpu.utilization, http.request.size
                        "doubleHistogram": {"dataPoints": dps},
                    }
                ],
            }
        ],
    }

    return message


print(json.dumps(generate_message(), indent=4, sort_keys=False))

{
    "resource": {
        "attributes": [
            {
                "key": "az",
                "value": {
                    "stringValue": "az2"
                }
            },
            {
                "key": "container_id",
                "value": {
                    "stringValue": "container_1"
                }
            },
            {
                "key": "service_id",
                "value": {
                    "stringValue": "content service_0"
                }
            }
        ]
    },
    "instrumentationLibraryMetrics": [
        {
            "instrumentationLibrary": {
                "name": "github.com/aws/aws-sdk-go/aws/service/s3"
            },
            "metrics": [
                {
                    "name": "system.paging.utilization",
                    "doubleHistogram": {
                        "dataPoints": [
                            {
                                "labels": [
                                    {
    

### Serializer

In [10]:
import datetime


def get_dp_labels_from_schema(schema):
    # inferring the list of all labels from the schema
    all_labels = []
    for i, v in schema.items():
        all_labels = all_labels + [*v["labels"]]
    return list(set(all_labels))


def dict_from_key_value(l):
    # this function is creating a dict object
    # from the list of key-value pairs for resource attribues
    # and data point labels
    r = {}
    for item in l:
        if isinstance(item["value"], dict):
            if "stringValue" in [*item["value"].keys()]:
                r[item["key"]] = item["value"]["stringValue"]
            else:
                r[item["key"]] = item["value"]
        else:
            r[item["key"]] = item["value"]

    return r


def message_parser(obj: dict, datapoint_labels, resource_attributes):

    fact = {}

    # Reading the resource attribues
    resource_attributes_dict = dict_from_key_value(obj["resource"]["attributes"])
    for column in resource_attributes:
        fact[column] = (
            resource_attributes_dict[column]
            if column in resource_attributes_dict.keys()
            else ""
        )

    # Reading the metric
    # Assuming a single value is sent in a message - single metric and single data point:
    dp = obj["instrumentationLibraryMetrics"][0]["metrics"][0]["doubleHistogram"][
        "dataPoints"
    ][0]

    fact["name"] = obj["instrumentationLibraryMetrics"][0]["metrics"][0]["name"]
    fact["value"] = dp["sum"]
    fact["startTimeUnixNano"] = dp["startTimeUnixNano"]
    fact["timestamp"] = datetime.datetime.fromtimestamp(
        int(dp["startTimeUnixNano"]) // 1000000000
    ).strftime("%Y-%m-%d %H:%M:%S")

    # Reading labels
    labels_dict = dict_from_key_value(dp["labels"])
    for column in datapoint_labels:
        fact[column] = labels_dict[column] if column in labels_dict.keys() else ""

    return fact


datapoint_labels = get_dp_labels_from_schema(system_metrics)
resource_attributes = ["az", "container_id", "service_id"]

print(
    json.dumps(
        message_parser(generate_message(), datapoint_labels, resource_attributes),
        indent=4,
        sort_keys=False,
    )
)

{
    "az": "az3",
    "container_id": "container_1",
    "service_id": "identity service_57",
    "name": "system.memory.utilization",
    "value": 0.3902306457443364,
    "startTimeUnixNano": "1605685042155214000",
    "timestamp": "2020-11-17 23:37:22",
    "status": "",
    "device": "",
    "mode": "",
    "protocol": "",
    "type": "",
    "cpu": "",
    "direction": "",
    "state": "idle"
}


### csv file

In [31]:
%time
import time
from IPython.display import clear_output




def generate_csv(num_messages):
    with open(
        "data/data_{}_{}.csv".format(str(num_messages), str(time.time_ns())), "w+"
    ) as output:
        output.write(
            ",".join(
                [
                    *message_parser(
                        generate_message(), datapoint_labels, resource_attributes
                    ).keys()
                ]
            )
            + "\n"
        )

        for i in range(num_messages):
            print(i)
            clear_output(wait=True)
            m = generate_message()
            output.write(
                ",".join(
                    [
                        str(x)
                        for x in [
                            *message_parser(
                                m, datapoint_labels, resource_attributes
                            ).values()
                        ]
                    ]
                )
                + "\n"
            )


generate_csv(100)

99


In [12]:
# size on disc - data.csv
!cd data && ls -lah

total 32
drwxr-xr-x  4 anastasiapolyakova  staff   128B 17 Nov 23:37 [1m[36m.[m[m
drwxr-xr-x  7 anastasiapolyakova  staff   224B 17 Nov 23:35 [1m[36m..[m[m
drwxr-xr-x  3 anastasiapolyakova  staff    96B 17 Nov 22:48 [1m[36m.ipynb_checkpoints[m[m
-rw-r--r--  1 anastasiapolyakova  staff    13K 17 Nov 23:37 data_100_1605685042165290000.csv


# atoti prototype

### Launching the app

In [32]:
import atoti as tt
from atoti.config import create_config

config = create_config(
    metadata_db="./metadata.db", max_memory="4G", sampling_mode=tt.sampling.FULL
)
session = tt.create_session(config=config)


# I'm not specifying types, because local dates parser gives exceptions...
types = {
    "az": tt.types.STRING,
    "container_id": tt.types.STRING,
    "service_id": tt.types.STRING,
    "name": tt.types.STRING,
    "value": tt.types.DOUBLE_NULLABLE,
    "startTimeUnixNano": tt.types.LONG_NULLABLE,
    "timestamp": tt.types.LOCAL_DATE_TIME,  # Doesn't work
    "type": tt.types.STRING,
    "cpu": tt.types.STRING,
    "protocol": tt.types.STRING,
    "state": tt.types.STRING,
    "status": tt.types.STRING,
    "mode": tt.types.STRING,
    "device": tt.types.STRING,
    "direction": tt.types.STRING,
}

keys = [f for f in [*types.keys()] if f != "value"]

data_points = session.read_csv("./data/", store_name="Metrics", keys=keys, watch=True)
cube = session.create_cube(data_points)
data_points.shape

Deleting existing "Unnamed" session to create the new one.


{'rows': 283, 'columns': 15}

In [33]:
cube.query()

Unnamed: 0,contributors.COUNT,value.MEAN,value.SUM
0,283,157.87,44676.53


In [15]:
cube

### Adding an aggregation function

In [16]:
cube.measures["p-90"] = tt.agg.quantile(data_points["value"], 0.9)

In [17]:
cube.visualize()

Open the notebook in JupyterLab with the atoti extension installed and enabled to see this widget.

### Streaming data

In this example, the data is store on disc - and atoti is fetching it due to "watch = True" flag:

- messages are already flattened.

In [30]:
!cd data && rm -r -f *

In [34]:
# real-time count
session.url + "/#/dashboard/059"

'http://localhost:53257/#/dashboard/059'

In [21]:
# records_per_file = 1000000
# generate_csv(records_per_file)

999468


In [22]:
# !cd data && ls -lah

total 262272
drwxr-xr-x  7 anastasiapolyakova  staff   224B 17 Nov 23:42 [1m[36m.[m[m
drwxr-xr-x  7 anastasiapolyakova  staff   224B 18 Nov 00:09 [1m[36m..[m[m
drwxr-xr-x  4 anastasiapolyakova  staff   128B 17 Nov 23:38 [1m[36m.ipynb_checkpoints[m[m
-rw-r--r--  1 anastasiapolyakova  staff   124M 18 Nov 00:10 data_1000000_1605685369073996000.csv
-rw-r--r--  1 anastasiapolyakova  staff    13K 17 Nov 23:37 data_100_1605685042165290000.csv
-rw-r--r--  1 anastasiapolyakova  staff   119B 17 Nov 23:42 data_100_1605685324293910000.csv
-rw-r--r--  1 anastasiapolyakova  staff    13K 17 Nov 23:42 data_100_1605685334344474000.csv


In [29]:
print(124 * 90, "mb will be used by 90 mln rows on disc")

11160 mb will be used by 90 mln rows on disc


In [37]:
!mv data_1000000_1605685369073996000.csv data/
start = time.time_ns()

In [40]:
!mv data/data_1000000_1605685369073996000.csv .

## Joining metrics with static data

In [None]:
cube.schema

In [None]:
import pandas as pd

device_attributes = session.read_pandas(
    pd.DataFrame(
        data={
            "device": device_ids,
            "deviceType": [random.choice(["Primary", "Secondary"]) for d in device_ids],
        }
    ),
    store_name="Device Attributes",
)

data_points.join(device_attributes, mapping={"device": "device"})

In [None]:
cube.schema

Now the "deviceType" field can be used to setup limits.

# Next steps

**Non-functional requirements**:
    
- Memory sizing - can we keep 1 day of observations given 90 mln data points per minute?
- Can we shard data, how can we start data (for example by service id)?
- Injestion rate - how fast we can push data into the cube?


**Functional requirements**:
    
ActiveMonitor - real-time alerts on high-cardinality data - how many continuous queries can run simultaneously?
Example of a Monitor: 1 monitor with high cardinality on service id: if any of the services hang - > alert
View time-series in the UI - ideally 1 week - or pre-aggregate?
Function to compute rolling window percentile - time windows
How to evict data?

**Data schema**:
    
- how to manage flexible data schema - key-value messages
- how to serialize 1 message into several facts - if one message contains several metrics, several data points.
