# IoT Microdemos


## Pre-Aggregation

Storing data on the most granular level is usually only required for a limited amount of time (e.g. for detailled error analysis). For visualizations and long-term storage, higher-level aggregations are usually important - especially in dashboards, there are only a few pixels available, it is not necessary to load millions of records to show charts.

Data can easily be aggregated in MongoDB, with the help of $merge (avvailable since MongoDB 4.2), the output of aggregations can be added to exiting collections.

For the merge to work, we need a unique index in the target collection(s):

In [None]:
import pymongo
import os
import datetime
import bson
from bson.json_util import loads, dumps, RELAXED_JSON_OPTIONS
import random
from pprint import pprint

CONNECTIONSTRING = "localhost:27017"

# Establish Database Connection
client = pymongo.MongoClient(CONNECTIONSTRING)
db = client.iot

# Clean out the tables
db.iot_PT1M.drop()
db.iot_PT5M.drop()
db.iot_PT1H.drop()
db.iot_PT1D.drop()

# Create Indexes in the pre aggregation collections
print(db.iot_PT1M.create_index([("device", pymongo.ASCENDING), ("ts", pymongo.ASCENDING)], unique=True))
print(db.iot_PT5M.create_index([("device", pymongo.ASCENDING), ("ts", pymongo.ASCENDING)], unique=True))
print(db.iot_PT1H.create_index([("device", pymongo.ASCENDING), ("ts", pymongo.ASCENDING)], unique=True))
print(db.iot_PT1D.create_index([("device", pymongo.ASCENDING), ("ts", pymongo.ASCENDING)], unique=True))

Let us now ingest some raw data by a few devices:

In [None]:
BUCKET_SIZE = 50
CNT_MEASUREMENTS = 100000
STATUS = [ 'operating', 'waiting', 'idle', 'failure' ]

def prepare_collections():
    db.iot.drop()
    # This index keeps the open buckets per device
    db.iot.create_index([('device', 1), ('cnt', 1), ('max_ts', 1)], partialFilterExpression={'cnt': { '$lt': BUCKET_SIZE } } )
    # This index is used for efficient queries per device and timespan
    db.iot.create_index([('device', 1), ('max_ts', 1), ('min_ts', 1)])

def write_batch(ls_batch, collection):
    try:
        result = collection.bulk_write(ls_batch)
        return result
    except pymongo.errors.BulkWriteError as bwe:
        pprint(bwe.details)
        return None

def ingest_data():
    ts = datetime.datetime.now()

    cnt = 0
    ls_batch = []
    while cnt < CNT_MEASUREMENTS: 
        # Plus one second
        ts = ts + datetime.timedelta(0,1)
        m = {
            'ts': ts,
            'temperature': random.randint(0,100),
            'rpm': random.randint(0,2500),
            'status': random.choice(STATUS)
        }
        ls_batch.append(
            pymongo.UpdateOne({
                'device': 4711,
                'max_ts': { '$lte': ts },
                'cnt': { '$lt': BUCKET_SIZE }
            },
            {
                '$push': { 'm': m },
                '$max': { 'max_ts': ts },
                '$min': { 'min_ts': ts },
                '$inc': { 'cnt': 1 }
            },
            upsert=True ))

        cnt = cnt + 1
        if cnt % 150 == 0:
            result = write_batch(ls_batch, db.iot)
            if result != None:
                print ('inserted %s records' % (cnt) )
            ls_batch = []
            
    result = write_batch(ls_batch, db.iot)
    if result != None:
        print ('inserted %s records' % (cnt) )
    ls_batch = []
    
prepare_collections()
ingest_data()

This aggregation Pipeline aggregates the values into onde document per minute:

In [None]:
db.iot.aggregate([
{ "$unwind": { "path": "$m" }}, 
{ "$sort": { "m.ts": 1 }}, 
{ "$group": {
  "_id": {
    "device": "$device",
    "year": { "$year": '$m.ts' },
    "month": { "$month": '$m.ts' },
    "day": { "$dayOfMonth": '$m.ts' },
    "hour": { "$hour": '$m.ts' },
    "minute": { "$minute": '$m.ts' }
  },
  "temperature_avg": { "$avg": "$m.temperature" },
  "temperature_min": { "$min": "$m.temperature" },
  "temperature_max": { "$max": "$m.temperature" },
  "temperature_first": { "$first": "$m.temperature" },
  "temperature_last": { "$last": "$m.temperature" },
  "rpm_avg": { "$avg": "$m.rpm" },
  "rpm_min": { "$min": "$m.rpm" },
  "rpm_max": { "$max": "$m.rpm" },
  "rpm_first": { "$first": "$m.rpm" },
  "rpm_last": { "$last": "$m.rpm" },  
  "status_first": { "$first": "$m.status" },
  "status_last": { "$last": "$m.status" },
}}, 
{ "$addFields": {
  "device": '$_id.device',
  "ts": {
    "$dateFromParts": {
      "year": '$_id.year',
      "month": '$_id.month',
      "day": '$_id.day',
      "hour": '$_id.hour',
      "minute": '$_id.minute'
    }
  }
}}, 
{ "$project": { "_id": 0 }}, 
{ "$merge": {
  "into": 'iot_PT1M',
  "on": ['device', 'ts']
}}
])


Roll up into PT5M:

In [None]:
db.iot_PT1M.aggregate([
# Ensure Index Usage via this sort stage
{ "$sort": {
  'device':1,
  'ts': 1
}}, 
{"$group": {
  "_id": {
    "device": "$device",
    "year": { "$year": '$ts' },
    "month": { "$month": '$ts' },
    "day": { "$dayOfMonth": '$ts' },
    "hour": { "$hour": '$ts' },
    "minute": { "$subtract": [
        { "$minute": '$ts' },
        { "$mod": [ { "$minute": '$ts' }, 5 ] }
      ] 
   }
  },
  "temperature_avg": { "$avg": '$temperature_avg' },
  "temperature_min": { "$min": '$temperature_min' },
  "temperature_max": { "$max": '$temperature_max' },
  "temperature_first": { "$first": '$temperature_first' },
  "temperature_last": { "$last": '$temperature_last' },
  "rpm_avg": { "$avg": '$rpm_avg' },
  "rpm_min": { "$min": '$rpm_min' },
  "rpm_max": { "$max": '$rpm_max' },
  "rpm_first": { "$first": '$rpm_first' },
  "rpm_last": { "$last": '$rpm_last' },
  "status_first": { "$first": '$status_first' },
  "status_last": { "$last": '$status_last' }
}}, 
{ "$addFields": {
  "device": '$_id.device',
  "ts": {
    "$dateFromParts": {
      "year": '$_id.year',
      "month": '$_id.month',
      "day": '$_id.day',
      "hour": '$_id.hour',
      "minute": '$_id.minute'
    }
  }
}}, 
{ "$project": { "_id": 0 }}, 
{ "$merge": {
  "into": 'iot_PT5M',
  "on": [ 'device', 'ts' ]
}}])

The only thing that changes for higher level roll ups is the $group stage from now on Roll-Up into 1 hour:

In [None]:
db.iot_PT5M.aggregate([
# Ensure Index Usage via this sort stage
{ "$sort": {
  'device':1,
  'ts': 1
}},  
{"$group": {
  "_id": {
    "device": "$device",
    "year": { "$year": '$ts' },
    "month": { "$month": '$ts' },
    "day": { "$dayOfMonth": '$ts' },
    "hour": { "$hour": '$ts' }
  },
  "temperature_avg": { "$avg": '$temperature_avg' },
  "temperature_min": { "$min": '$temperature_min' },
  "temperature_max": { "$max": '$temperature_max' },
  "temperature_first": { "$first": '$temperature_first' },
  "temperature_last": { "$last": '$temperature_last' },
  "rpm_avg": { "$avg": '$rpm_avg' },
  "rpm_min": { "$min": '$rpm_min' },
  "rpm_max": { "$max": '$rpm_max' },
  "rpm_first": { "$first": '$rpm_first' },
  "rpm_last": { "$last": '$rpm_last' },
  "status_first": { "$first": '$status_first' },
  "status_last": { "$last": '$status_last' }
}},
{ "$addFields": {
  "device": '$_id.device',
  "ts": {
    "$dateFromParts": {
      "year": '$_id.year',
      "month": '$_id.month',
      "day": '$_id.day',
      "hour": '$_id.hour'
    }
  }
}}, 
{ "$project": { "_id": 0 }}, 
{ "$merge": {
  "into": 'iot_PT1H',
  "on": [ 'device', 'ts' ]
}}])

Roll-up into 1 day:

In [None]:
db.iot_PT1H.aggregate([
# Ensure Index Usage via this sort stage
{ "$sort": {
  'device':1,
  'ts': 1
}}, 
{"$group": {
  "_id": {
    "device": "$device",
    "year": { "$year": '$ts' },
    "month": { "$month": '$ts' },
    "day": { "$dayOfMonth": '$ts' }
  },
  "temperature_avg": { "$avg": '$temperature_avg' },
  "temperature_min": { "$min": '$temperature_min' },
  "temperature_max": { "$max": '$temperature_max' },
  "temperature_first": { "$first": '$temperature_first' },
  "temperature_last": { "$last": '$temperature_last' },
  "rpm_avg": { "$avg": '$rpm_avg' },
  "rpm_min": { "$min": '$rpm_min' },
  "rpm_max": { "$max": '$rpm_max' },
  "rpm_first": { "$first": '$rpm_first' },
  "rpm_last": { "$last": '$rpm_last' },
  "status_first": { "$first": '$status_first' },
  "status_last": { "$last": '$status_last' }
}},
{ "$addFields": {
  "device": '$_id.device',
  "ts": {
    "$dateFromParts": {
      "year": '$_id.year',
      "month": '$_id.month',
      "day": '$_id.day'
    }
  }
}}, 
{ "$project": { "_id": 0 }}, 
{ "$merge": {
  "into": 'iot_PT1D',
  "on": [ 'device', 'ts' ]
}}])