# IoT Micro démos

## Pré-Aggrégation
Le stockage des données au niveau le plus granulaire n'est généralement nécessaire que pour une durée limitée (par exemple pour une analyse détaillée des erreurs). Pour les visualisations et le stockage à long terme, les agrégations de niveau supérieur sont généralement importantes - en particulier dans les tableaux de bord, il n'y a que quelques pixels disponibles, il n'est pas nécessaire de charger des millions d'enregistrements pour afficher les graphiques.

Les données peuvent facilement être agrégées dans MongoDB, avec l'aide de $merge (disponible depuis MongoDB 4.2), la sortie des agrégations peut être ajoutée aux collections existantes.

Pour que la fusion fonctionne, nous avons besoin d'un index unique dans la (les) collection(s) cible(s).

Dans ce notebook, nous verrons comment faire successivement des aggregations à partir des données brutes à:
* 1 minute
* 5 minutes
* 1 heure
* 1 journée

## Initialisation de la démo

In [2]:
# Installation des librairies non présentes
!pip3 install dnspython

Collecting dnspython
[?25l  Downloading https://files.pythonhosted.org/packages/f5/2d/ae9e172b4e5e72fa4b3cfc2517f38b602cc9ba31355f9669c502b4e9c458/dnspython-2.1.0-py3-none-any.whl (241kB)
[K     |█▍                              | 10kB 12.7MB/s eta 0:00:01[K     |██▊                             | 20kB 17.3MB/s eta 0:00:01[K     |████                            | 30kB 10.4MB/s eta 0:00:01[K     |█████▍                          | 40kB 8.6MB/s eta 0:00:01[K     |██████▊                         | 51kB 4.5MB/s eta 0:00:01[K     |████████▏                       | 61kB 4.7MB/s eta 0:00:01[K     |█████████▌                      | 71kB 5.3MB/s eta 0:00:01[K     |██████████▉                     | 81kB 6.0MB/s eta 0:00:01[K     |████████████▏                   | 92kB 5.2MB/s eta 0:00:01[K     |█████████████▌                  | 102kB 5.7MB/s eta 0:00:01[K     |███████████████                 | 112kB 5.7MB/s eta 0:00:01[K     |████████████████▎               | 122kB 5.7MB/s 

In [3]:
# chargement des librairies python
import pymongo
import os
import datetime
import bson
from bson.json_util import loads, dumps, RELAXED_JSON_OPTIONS
import random
from pprint import pprint

CONNECTIONSTRING = "mongodb+srv://demo_user:mongodb@demo.mfctp.mongodb.net/iot_demo?retryWrites=true&w=majority"

# Establish Database Connection
client = pymongo.MongoClient(CONNECTIONSTRING)
db = client.iot_demo

# Clean out the tables
# Commented to get daily aggregation example
#db.iot_PT1M.drop()
#db.iot_PT5M.drop()
#db.iot_PT1H.drop()
#db.iot_PT1D.drop()

# Create Indexes in the pre aggregation collections
print(db.iot_PT1M.create_index([("device", pymongo.ASCENDING), ("ts", pymongo.ASCENDING)], unique=True))
print(db.iot_PT5M.create_index([("device", pymongo.ASCENDING), ("ts", pymongo.ASCENDING)], unique=True))
print(db.iot_PT1H.create_index([("device", pymongo.ASCENDING), ("ts", pymongo.ASCENDING)], unique=True))
print(db.iot_PT1D.create_index([("device", pymongo.ASCENDING), ("ts", pymongo.ASCENDING)], unique=True))

device_1_ts_1
device_1_ts_1
device_1_ts_1
device_1_ts_1


Insérons maintenant quelques données: 

In [4]:
BUCKET_SIZE = 50
CNT_MEASUREMENTS = 100000
STATUS = [ 'operating', 'waiting', 'idle', 'failure' ]

def prepare_collections():
    db.iot.drop()
    # Index pour les buckes ouverts par device
    db.iot.create_index([('device', 1), ('cnt', 1), ('max_ts', 1)], partialFilterExpression={'cnt': { '$lt': BUCKET_SIZE } } )
    # Index pour les requêtes par device et par plage de temps
    db.iot.create_index([('device', 1), ('max_ts', 1), ('min_ts', 1)])

def write_batch(ls_batch, collection):
    try:
        result = collection.bulk_write(ls_batch)
        return result
    except pymongo.errors.BulkWriteError as bwe:
        pprint(bwe.details)
        return None

def ingest_data():
    ts = datetime.datetime.now()

    cnt = 0
    ls_batch = []
    while cnt < CNT_MEASUREMENTS: 
        # Plus one second
        ts = ts + datetime.timedelta(0,1)
        m = {
            'ts': ts,
            'temperature': random.randint(0,100),
            'rpm': random.randint(0,2500),
            'status': random.choice(STATUS)
        }
        ls_batch.append(
            pymongo.UpdateOne({
                'device': 4711,
                'max_ts': { '$lte': ts },
                'cnt': { '$lt': BUCKET_SIZE }
            },
            {
                '$push': { 'm': m },
                '$max': { 'max_ts': ts },
                '$min': { 'min_ts': ts },
                '$inc': { 'cnt': 1 }
            },
            upsert=True ))

        cnt = cnt + 1
        if cnt % 150 == 0:
            result = write_batch(ls_batch, db.iot)
            if result != None:
                print ('inserted %s records' % (cnt) )
            ls_batch = []
            
    result = write_batch(ls_batch, db.iot)
    if result != None:
        print ('inserted %s records' % (cnt) )
    ls_batch = []
    
prepare_collections()
ingest_data()

inserted 150 records
inserted 300 records
inserted 450 records
inserted 600 records
inserted 750 records
inserted 900 records
inserted 1050 records
inserted 1200 records
inserted 1350 records
inserted 1500 records
inserted 1650 records
inserted 1800 records
inserted 1950 records
inserted 2100 records
inserted 2250 records
inserted 2400 records
inserted 2550 records
inserted 2700 records
inserted 2850 records
inserted 3000 records
inserted 3150 records
inserted 3300 records
inserted 3450 records
inserted 3600 records
inserted 3750 records
inserted 3900 records
inserted 4050 records
inserted 4200 records
inserted 4350 records
inserted 4500 records
inserted 4650 records
inserted 4800 records
inserted 4950 records
inserted 5100 records
inserted 5250 records
inserted 5400 records
inserted 5550 records
inserted 5700 records
inserted 5850 records
inserted 6000 records
inserted 6150 records
inserted 6300 records
inserted 6450 records
inserted 6600 records
inserted 6750 records
inserted 6900 re

## Aggregation des données à la minute
Ce pippeline d'aggrégation permet d'agréger les valeurs avec un document par device et par minute.

In [5]:
db.iot.aggregate([
{ "$unwind": { "path": "$m" }}, 
{ "$sort": { "m.ts": 1 }}, 
{ "$group": {
  "_id": {
    "device": "$device",
    "year": { "$year": '$m.ts' },
    "month": { "$month": '$m.ts' },
    "day": { "$dayOfMonth": '$m.ts' },
    "hour": { "$hour": '$m.ts' },
    "minute": { "$minute": '$m.ts' }
  },
  "temperature_avg": { "$avg": "$m.temperature" },
  "temperature_min": { "$min": "$m.temperature" },
  "temperature_max": { "$max": "$m.temperature" },
  "temperature_first": { "$first": "$m.temperature" },
  "temperature_last": { "$last": "$m.temperature" },
  "rpm_avg": { "$avg": "$m.rpm" },
  "rpm_min": { "$min": "$m.rpm" },
  "rpm_max": { "$max": "$m.rpm" },
  "rpm_first": { "$first": "$m.rpm" },
  "rpm_last": { "$last": "$m.rpm" },  
  "status_first": { "$first": "$m.status" },
  "status_last": { "$last": "$m.status" },
}}, 
{ "$addFields": {
  "device": '$_id.device',
  "ts": {
    "$dateFromParts": {
      "year": '$_id.year',
      "month": '$_id.month',
      "day": '$_id.day',
      "hour": '$_id.hour',
      "minute": '$_id.minute'
    }
  }
}}, 
{ "$project": { "_id": 0 }}, 
{ "$merge": {
  "into": 'iot_PT1M',
  "on": ['device', 'ts']
}}
], allowDiskUse = True)

<pymongo.command_cursor.CommandCursor at 0x7f644521cc90>

Affichons maintenant un document issu de cette aggrégation

In [6]:
result = db.iot_PT1M.find_one()

pprint(result)

{'_id': ObjectId('5fd08d734a618ba100079d60'),
 'device': 4711,
 'rpm_avg': 1388.1333333333334,
 'rpm_first': 1130,
 'rpm_last': 226,
 'rpm_max': 2482,
 'rpm_min': 74,
 'status_first': 'failure',
 'status_last': 'idle',
 'temperature_avg': 48.93333333333333,
 'temperature_first': 24,
 'temperature_last': 19,
 'temperature_max': 100,
 'temperature_min': 0,
 'ts': datetime.datetime(2020, 12, 9, 17, 32)}


Aggrégons maintenant les données à 5 minutes à partir de l'agrégation précédente.

In [7]:
db.iot_PT1M.aggregate([
# s'assurer de l'utilisation de l'index via cette étape de tri
{ "$sort": {
  'device':1,
  'ts': 1
}}, 
{"$group": {
  "_id": {
    "device": "$device",
    "year": { "$year": '$ts' },
    "month": { "$month": '$ts' },
    "day": { "$dayOfMonth": '$ts' },
    "hour": { "$hour": '$ts' },
    "minute": { "$subtract": [
        { "$minute": '$ts' },
        { "$mod": [ { "$minute": '$ts' }, 5 ] }
      ] 
   }
  },
  "temperature_avg": { "$avg": '$temperature_avg' },
  "temperature_min": { "$min": '$temperature_min' },
  "temperature_max": { "$max": '$temperature_max' },
  "temperature_first": { "$first": '$temperature_first' },
  "temperature_last": { "$last": '$temperature_last' },
  "rpm_avg": { "$avg": '$rpm_avg' },
  "rpm_min": { "$min": '$rpm_min' },
  "rpm_max": { "$max": '$rpm_max' },
  "rpm_first": { "$first": '$rpm_first' },
  "rpm_last": { "$last": '$rpm_last' },
  "status_first": { "$first": '$status_first' },
  "status_last": { "$last": '$status_last' }
}}, 
{ "$addFields": {
  "device": '$_id.device',
  "ts": {
    "$dateFromParts": {
      "year": '$_id.year',
      "month": '$_id.month',
      "day": '$_id.day',
      "hour": '$_id.hour',
      "minute": '$_id.minute'
    }
  }
}}, 
{ "$project": { "_id": 0 }}, 
{ "$merge": {
  "into": 'iot_PT5M',
  "on": [ 'device', 'ts' ]
}}])

<pymongo.command_cursor.CommandCursor at 0x7f64451f6210>

Affichons un exemple de document.

In [8]:
result = db.iot_PT5M.find_one()

pprint(result)

{'_id': ObjectId('5fd090674a618ba10007e020'),
 'device': 4711,
 'rpm_avg': 1280.4866666666667,
 'rpm_first': 1738,
 'rpm_last': 158,
 'rpm_max': 2495,
 'rpm_min': 20,
 'status_first': 'waiting',
 'status_last': 'idle',
 'temperature_avg': 50.61,
 'temperature_first': 49,
 'temperature_last': 31,
 'temperature_max': 99,
 'temperature_min': 0,
 'ts': datetime.datetime(2020, 12, 9, 14, 5)}


La seule modification dans les pipelines d'aggrégation qui va maintenant être effectuée va être au niveau de l'étape $group.

Agrégeons les données à l'heure.

In [9]:
db.iot_PT5M.aggregate([
# S"assurer de l'utilisation de l'index via l'utilisation de cette étape de tri
{ "$sort": {
  'device':1,
  'ts': 1
}},  
{"$group": {
  "_id": {
    "device": "$device",
    "year": { "$year": '$ts' },
    "month": { "$month": '$ts' },
    "day": { "$dayOfMonth": '$ts' },
    "hour": { "$hour": '$ts' }
  },
  "temperature_avg": { "$avg": '$temperature_avg' },
  "temperature_min": { "$min": '$temperature_min' },
  "temperature_max": { "$max": '$temperature_max' },
  "temperature_first": { "$first": '$temperature_first' },
  "temperature_last": { "$last": '$temperature_last' },
  "rpm_avg": { "$avg": '$rpm_avg' },
  "rpm_min": { "$min": '$rpm_min' },
  "rpm_max": { "$max": '$rpm_max' },
  "rpm_first": { "$first": '$rpm_first' },
  "rpm_last": { "$last": '$rpm_last' },
  "status_first": { "$first": '$status_first' },
  "status_last": { "$last": '$status_last' }
}},
{ "$addFields": {
  "device": '$_id.device',
  "ts": {
    "$dateFromParts": {
      "year": '$_id.year',
      "month": '$_id.month',
      "day": '$_id.day',
      "hour": '$_id.hour'
    }
  }
}}, 
{ "$project": { "_id": 0 }}, 
{ "$merge": {
  "into": 'iot_PT1H',
  "on": [ 'device', 'ts' ]
}}])

<pymongo.command_cursor.CommandCursor at 0x7f6445181c90>

Affichons un exemple de document.

In [10]:
result = db.iot_PT1H.find_one()

pprint(result)

{'_id': ObjectId('5fd090bb4a618ba10007e5c4'),
 'device': 4711,
 'rpm_avg': 1254.3269444444445,
 'rpm_first': 1554,
 'rpm_last': 555,
 'rpm_max': 2500,
 'rpm_min': 0,
 'status_first': 'idle',
 'status_last': 'idle',
 'temperature_avg': 50.092222222222226,
 'temperature_first': 73,
 'temperature_last': 40,
 'temperature_max': 100,
 'temperature_min': 0,
 'ts': datetime.datetime(2020, 12, 9, 12, 0)}


Aggrégation sur 1 journée.

In [11]:
db.iot_PT1H.aggregate([
# S'assurer de l'utilisation de l'index va cette étape de tri
{ "$sort": {
  'device':1,
  'ts': 1
}}, 
{"$group": {
  "_id": {
    "device": "$device",
    "year": { "$year": '$ts' },
    "month": { "$month": '$ts' },
    "day": { "$dayOfMonth": '$ts' }
  },
  "temperature_avg": { "$avg": '$temperature_avg' },
  "temperature_min": { "$min": '$temperature_min' },
  "temperature_max": { "$max": '$temperature_max' },
  "temperature_first": { "$first": '$temperature_first' },
  "temperature_last": { "$last": '$temperature_last' },
  "rpm_avg": { "$avg": '$rpm_avg' },
  "rpm_min": { "$min": '$rpm_min' },
  "rpm_max": { "$max": '$rpm_max' },
  "rpm_first": { "$first": '$rpm_first' },
  "rpm_last": { "$last": '$rpm_last' },
  "status_first": { "$first": '$status_first' },
  "status_last": { "$last": '$status_last' }
}},
{ "$addFields": {
  "device": '$_id.device',
  "ts": {
    "$dateFromParts": {
      "year": '$_id.year',
      "month": '$_id.month',
      "day": '$_id.day'
    }
  }
}}, 
{ "$project": { "_id": 0 }}, 
{ "$merge": {
  "into": 'iot_PT1D',
  "on": [ 'device', 'ts' ]
}}])

<pymongo.command_cursor.CommandCursor at 0x7f6449b400d0>

Affichage d'un document.

In [12]:
result = db.iot_PT1D.find_one()

pprint(result)

{'_id': ObjectId('5fd22d5a4a618ba1005ce3dd'),
 'device': 4711,
 'rpm_avg': 1253.56508005698,
 'rpm_first': 80,
 'rpm_last': 963,
 'rpm_max': 2500,
 'rpm_min': 0,
 'status_first': 'operating',
 'status_last': 'idle',
 'temperature_avg': 50.1075623931624,
 'temperature_first': 60,
 'temperature_last': 40,
 'temperature_max': 100,
 'temperature_min': 0,
 'ts': datetime.datetime(2020, 12, 9, 0, 0)}
