# How to create an aggregation pipeline

Setup the connection to MongoDB and access the SensorData collection. For details, see the tutorial "how to access the sensor data".

In [None]:
from pymongo import MongoClient
from bson.json_util import dumps
uri = "mongodb+srv://<username>:<password>@sandbox-qb8uy.mongodb.net/test?retryWrites=true"
client = MongoClient(uri)
impExplorer = client.impExplorer
sensordata = impExplorer.SensorData

Check if the number of documents in the SensorData collection can be retrieved.

In [None]:
sensordata.count_documents({})

Setup to aggregation pipeline and execute it

In [None]:
from datetime import datetime, timedelta

# filter for the sensor data of the past 24 hours
last_24hours = datetime.utcnow() - datetime(1970, 1, 1, 0, 0, 0) + timedelta(hours=-24)
match_stage = { "$match": {
    "sensorTimestamp": {"$gte": (last_24hours.total_seconds() * 1000)}
}}

# date/time conversions to get aggregation level (two choices are implemented)
timestamp_stage_1 = { "$addFields": {
    "sensorDatePrt": {
        "$dateToParts": {"date": {"$toDate": "$sensorTimestamp"} }
    }
}}
timestamp_stage_2 = { "$addFields": {
    # hourly
    "sensorDateAgg1":{
        "$dateToString": {
            "date": {"$toDate": "$sensorTimestamp"}, 
            "format": "%Y-%m-%d %H:00", 
            "timezone": "Europe/Berlin"
        }
    },
    # 15 minute intervals
    "sensorDateAgg2":{
        "$dateToString": {
            "date": {
                "$dateFromParts": {
                    "year": "$sensorDatePrt.year",
                    "month": "$sensorDatePrt.month",
                    "day": "$sensorDatePrt.day",
                    "hour": "$sensorDatePrt.hour",
                    "minute": {
                        "$switch": {
                            "branches": [
                                {"case":{ "$gte": ["$sensorDatePrt.minute", 45] }, "then": 45},
                                {"case":{ "$gte": ["$sensorDatePrt.minute", 30] }, "then": 30},
                                {"case":{ "$gte": ["$sensorDatePrt.minute", 15] }, "then": 15},
                            ], "default": 0
                        }
                    }
                }
            },
            "format": "%Y-%m-%d %H:%M", 
            "timezone": "Europe/Berlin"
        }
    }
}}

# aggregate the data to the sensorDate string
group_stage = { "$group": {
    "_id": "$sensorDateAgg2", # edit here to choose the desired aggregation level
    "nbr": {"$sum": 1},
    "temp_avg": {"$avg": "$temperature"}, 
    "temp_min": {"$min": "$temperature"},
    "temp_max": {"$max": "$temperature"}, 
    "temp_std": {"$stdDevSamp": "$temperature"}
}}

# restructure and rename aggregated data
project_stage = { "$project": {
    "_id": 1,
    "nbr": 1,
    "temperature": {
        "mean": "$temp_avg", 
        "std": "$temp_std", 
        "min": "$temp_min", 
        "max": "$temp_max"
    }
}}

# sort data
sort_stage = { "$sort": { "_id": -1 } }

# add stages to pipeline
pipeline = [
    match_stage,
    timestamp_stage_1,
    timestamp_stage_2,
    group_stage,
    project_stage,
    sort_stage
]

# execute pipeline
last_24hours_temperature = sensordata.aggregate( pipeline )

Show the queried results

In [None]:
print(dumps(last_24hours_temperature, indent=2))