## Aggregation pipelines

In MongoDB it is possible to concatenate multiple operations within a single command by using aggregation pipelines. This can include some of the things we've seen before, such as queries, projections, sorting and pagination, and also additional operations which are not available as part of the previous commands.

An aggregation pipeline in MongoDB consists of one or more stage, each with their own possible operators. Most stages are rather free when it comes to the order in which they have to be used or how many times they can be used, but keep in mind that some of them have restrictions about when and how many times they can be used.

To use an aggregation pipeline in pymongo, we use the `aggregate` method for collections. This receives a list of dictionaries as its main parameter. Each dictionary must have a single key, corresponding to the pipeline stage and the value defining the working of the stage. The output of each stage will then be used as input for the following, all the way until the list is exhausted.

In [None]:
from pymongo import MongoClient

client = MongoClient(host='localhost', port=27017, username='mongo', password='mongo')

objects = client.alerce.objects  # This is the collection we'll be using

We'll now check some of the more commonly used stages.

### `$match`

The stage `$match` is equivalent to performing a query and uses the same operators we've seen for the `find` method first parameter. This stage doesn't include the possibility of using a projection (there is a special stage for that).

In [None]:
docs = objects.aggregate([{'$match': {'ndet': {'$gte': 400}}}])
docs

The output of `aggregate` is a `CommandCursor`. This is different from the `Cursor` we saw for the output of `find`, but it is still iterable. 

Unfortunately, this type does not have the `explain` method implemented, although there is a somewhat cumbersome workaround if required:

In [None]:
db = client.alerce  # We need the database, not the collection
           # method     # collec   # argument (by name as keyword argument)        # explain
db.command('aggregate', 'objects', pipeline=[{'$match': {'ndet': {'$gte': 400}}}], explain=True)

**`mongosh`:** The aggregate method works in the exact same way, although in this case the `explain` method is actually implemented:

```bash
   db.objects.aggregate([{$match: {ndet: {$gte: 400}}}]).explain()
```

### `$project`

As the name implies, the `$project` stage is equivalent to the projection we've seen in the previous module:

In [None]:
docs = objects.aggregate([{'$project': {'ndet': True, '_id': False}}])

for doc in docs:
    print(doc)

Each stage can be concatenated in any order. Keep in mind that the field being used might change due to renaming and the order of the stages. The following two blocks give the same result:

In [None]:
docs = objects.aggregate([
    {'$project': {'detections': '$ndet', '_id': False}},  # Renaming the field
    {'$match': {'detections': {'$gte': 400}}}  # We need to use the new name
])

for doc in docs:
    print(doc)

In [None]:
docs = objects.aggregate([
    {'$match': {'ndet': {'$gte': 400}}},  # Using ndet    
    {'$project': {'detections': '$ndet', '_id': False}},  # Renaming the field
])

for doc in docs:
    print(doc)

However, it is important to note that, in terms of performance they are both very different. By starting with the match, we only need to rename the field for the 3 documents matched documents. Using the reverse order, we'll be renaming the field for the whole collection and then selecting the relevant documents.

**It is recommended to start a pipeline with a `$match` that limits as much as possible the number of results since this is the only stage that uses indexing. Other stages applied over large collections or poorly filtered results can take a long time to finish.**

### `$set` and `$addFields`

These stages do the same thing, although `$set` is only available starting on MongoDB 4.2. Their behaviour is similar to that of `$project` when creating a new field. The difference comes in the fact that the new fields are added to the existing ones instead of having to select what is going to be in the output:

In [None]:
docs = objects.aggregate([
    {'$match': {'ndet': {'$gte': 400}}},
    {'$set': {'deltamjd': {'$subtract': ['$lastmjd', '$firstmjd']}}},
])

for doc in docs:
    print(doc)

The new fields are always added at the end of the dictionary. More than one field can be added in a single stage:

In [None]:
docs = objects.aggregate([
    {'$match': {'ndet': {'$gte': 400}}},
    {'$set': {
        'deltamjd': {
            '$subtract': ['$lastmjd', '$firstmjd']
        },
        'stamp_classified': {  # Checks if at least one of the classifier names contains 'stamp_classifier'
            '$in': ['stamp_classifier', '$probabilities.classifier_name']
        }
    }},
])

for doc in docs:
    print(doc)

### `$unwind`

The stage `$unwind` is used for arrays and it will "disassemble" the array, resulting on one document for each array element among all the retrieved documents:

In [None]:
docs = objects.aggregate([
    {'$match': {'ndet': {'$gte': 400}}},
    {'$unwind': '$probabilities'},
])

for doc in docs:
    print(doc)

As you can see, now we retrieved repeated `_id`s, and each output document correspond to one element of the original `probabilities` array. The field `probabilities` is now an element of the array we began with.

This allows us to get a single entry when searching by, for instance, class and probability:

In [None]:
classifier = 'stamp_classifier'
class_ = 'VS'
min_prob = 0.7

docs = objects.aggregate([
    {
        '$match': {
            'probabilities': {
                '$elemMatch': {
                    'classifier_name': classifier,
                    'class_name': class_,
                    'probability': {'$gte': min_prob}
                }
            }
        }
    },
    {  # Remember that in the last stage we still have the full 'probabilities' array
        '$set': {
            'probabilities': {
                '$filter': {
                    'input': '$probabilities',
                    'cond': {
                        '$and': [
                            {'$eq': ['$$this.classifier_name', classifier]},
                            {'$eq': ['$$this.class_name', class_]},
                            {'$gte': ['$$this.probability', min_prob]}
                        ]
                    }
                }
            }
        }
    },
    {
        '$unwind': '$probabilities'
    },
])

for doc in docs:
    print(doc)

**Warning:** Unfortunately, due to how the selection of array elements works, both the `$set` and the `$match` stages should match for a query like the one above, but there is no control over it. It is very easy to be testing some queries and then forget to update the value or the field in either the `$filter` or the `$elemMatch` operators, resulting in valid but meaningless queries. *Pay close attention when creating this types of queries.* 

### `$group`

This stage allows grouping the results based on an expression. For instance, we'll search here for the first ranked class and then group the results based on the class name:

In [None]:
classifier = 'stamp_classifier'
ranking = 1

docs = objects.aggregate([
    {
        '$match': {
            'probabilities': {
                '$elemMatch': {
                    'classifier_name': classifier,
                    'ranking': ranking
                }
            }
        }
    },
    {
        '$set': {
            'probabilities': {
                '$filter': {
                    'input': '$probabilities',
                    'cond': {
                        '$and': [
                            {'$eq': ['$$this.classifier_name', classifier]},
                            {'$eq': ['$$this.ranking', ranking]}
                        ]
                    }
                }
            }
        }
    },
    {
        '$unwind': '$probabilities'
    },
    {
        '$group': {
            '_id': '$probabilities.class_name'
        }
    }
])

for doc in docs:
    print(doc)

Above can be seen that, by default, only the `_id` field is present. This is always the field or expression on which the grouping will be based. Any other field on the output can be defined in the same `$group` stage and also allow for expressions:

In [None]:
classifier = 'stamp_classifier'
ranking = 1

docs = objects.aggregate([
    {
        '$match': {
            'probabilities': {
                '$elemMatch': {
                    'classifier_name': classifier,
                    'ranking': ranking
                }
            }
        }
    },
    {
        '$set': {
            'probabilities': {
                '$filter': {
                    'input': '$probabilities',
                    'cond': {
                        '$and': [
                            {'$eq': ['$$this.classifier_name', classifier]},
                            {'$eq': ['$$this.ranking', ranking]}
                        ]
                    }
                }
            }
        }
    },
    {
        '$unwind': '$probabilities'
    },
    {
        '$group': {
            '_id': '$probabilities.class_name',
            'nobjects': {'$count': {}},  # Count documents (always receives empty dict)
            'earliestmjd': {'$min': '$firstmjd'},
            'latestmjd': {'$max': '$lastmjd'}
        }
    },
    {
        '$match': {
            'nobjects': {'$gte': 10}  # keep only groups with more than ten elements
        }
    }
])

for doc in docs:
    print(doc)

Note that `$count` is only available from MongoDB version 5.0 onwards. For older versions, an analogous behaviour can be achieved using `{'$sum': 1}`.

### `$bucket` and `$bucketAuto`

These stages are similar to `$group`, but now the grouping is based on binning rather than the results of an expression. For instance:

In [None]:
docs = objects.aggregate([
    {
        '$bucketAuto': {
            'groupBy': '$ndet',  # receives an expression
            'buckets': 4,  # number of buckets
            'output': {  # fields in the output document
                'count': {'$count': {}},
                'minmjd': {'$min': '$firstmjd'},
                'maxmjd': {'$max': '$lastmjd'}
            },
        }
    },
])

for doc in docs:
    print(doc)

As can be seen above, `$bucketAuto` will automatically generate the bins, trying to split the documents as evenly as possible. The limits are stored in the field `_id` of the returned documents. There is the additional option for granularity to try and fix the limits to specified levels (in the example below, using the [Renard series](https://en.wikipedia.org/wiki/Renard_series) R10; for other options check the [documentation](https://www.mongodb.com/docs/v6.0/reference/operator/aggregation/bucketAuto/)):

In [None]:
docs = objects.aggregate([
    {
        '$bucketAuto': {
            'groupBy': '$ndet',  # receives an expression
            'buckets': 4,  # number of buckets
            'output': {  # fields in the output document
                'count': {'$count': {}},
                'minmjd': {'$min': '$firstmjd'},
                'maxmjd': {'$max': '$lastmjd'}
            },
            'granularity': 'R10'  # Using Renard series
        }
    },
])

for doc in docs:
    print(doc)

In the case of `$bucket`, the binning limits must be explicitly provided:

In [None]:
classifier = 'stamp_classifier'

docs = objects.aggregate([
    {
        '$match': {
            'probabilities.classifier_name': classifier,
        }
    },
    {
        '$set': {
            'probabilities': {
                '$filter': {
                    'input': '$probabilities',
                    'cond': {
                        '$eq': ['$$this.classifier_name', classifier],
                    }
                }
            }
        }
    },
    {
        '$unwind': '$probabilities'
    },
    {
        '$bucket': {
            'groupBy': '$probabilities.probability',
            'boundaries': [0, .25, .5, .75, 1],  # bin boundaries
            'output': {
                'nobjects': {'$count': {}},
                'top_class': {
                    '$top': {  # get top (first) result based on a sort
                        'output': '$probabilities.class_name',  # can be any expression
                        'sortBy': {'probabilities.probability': -1}
                    }
                },
                'ids': {'$addToSet': '$_id'}  # generate array with _id of all documents in bucket
            },
            'default': 'oob'  # documents outside the buckets will be put in this _id
        }
    }
])

for doc in docs:
    print(doc)

The bin boundaries are always inclusive of the bottom limit and exclusive of the top. This explains the out of bound result we found:

In [None]:
objects.find_one({'_id': 'AL17ldggwgicoxfuy'})

The classification in version 1.0.4 of the stamp classifier gives an exact 1 for the probabiltity of this object being an asteroid, thus falling outside the boundary of our bucket.

### `$sort`, `$limit` and `$skip`

The `aggregate` method doesn't support sort, limit and skip as options in the way we saw for `find`. They can still be used, but as stages in the pipeline. Both `$skip` and `$limit` are used in the same way as the option based usage we saw for `find`:

In [None]:
docs = objects.aggregate([
    {'$project': {'ndet': True, 'firstmjd': True, 'lastmjd': True}},
    {'$skip': 5},
    {'$limit': 10}
])

for doc in docs:
    print(doc)

Note that the order matters:

In [None]:
docs = objects.aggregate([
    {'$project': {'ndet': True, 'firstmjd': True, 'lastmjd': True}},
    {'$limit': 10},
    {'$skip': 5},
])

for doc in docs:
    print(doc)

In the first usage we skipped the first five results and then return up to ten documents. In the second we return up to ten documents (without skipping), and *then* we skip the first five of those documents.

For `$sort`, we now use a dictionary that relates the field and the direction, unlike the use as an option for `find`, where we had to use a list of pairs. Otherwise, the results are the same:

In [None]:
docs = objects.aggregate([
    {'$project': {'ndet': True, 'firstmjd': True, 'lastmjd': True}},
    {'$sort': {'ndet': 1, 'firstmjd': -1}},  # sort first by ndet ascending, then by firstmjd descending
    {'$skip': 5},
    {'$limit': 10}
])

for doc in docs:
    print(doc)

### `$replaceRoot` and `$replaceWith`

These stages work in the same way, but have slightly different syntax, with `$replaceWith` only avalable starting in MongoDB 4.2. They work by accessing nested documents and returning those as a result:

In [None]:
docs = objects.aggregate([
    {'$unwind': '$probabilities'},
    {'$replaceWith': '$probabilities'},  # use the probabilities subdocument as return documents
    {'$limit': 5}
])

for doc in docs:
    print(doc)

In the case of `$replaceRoot`, the option needs to be passed somewhat differently, but otherwise works in the exact same way:

In [None]:
docs = objects.aggregate([
    {'$unwind': '$probabilities'},
    {'$replaceRoot': {'newRoot': '$probabilities'}},
    {'$limit': 5}
])

for doc in docs:
    print(doc)

### `$out`

This stage is only available starting on MongoDB 4.4. Unlike other stages, this *must* be the last stage and thus cannot be in the pipeline more than once. It is used to create a new collection with the results or write the results into an existing collection:

In [None]:
docs = objects.aggregate([
    {'$unwind': '$probabilities'},
    {'$replaceWith': {'$mergeObjects': [{'aid': '$_id'}, '$probabilities']}},  # add the aid to the probabilities document 
    {'$out': {'db': 'alerce', 'coll': 'probabilities'}}  # write in (existing) database 'alerce' and (new) collection 'probabilities'
])

for doc in docs:
    print(doc)

You can notice that there is no output when circulating `docs`. This is normal behaviour when using `$out`. Let's check the new collection:

In [None]:
docs = client.alerce.probabilities.find({}, limit=4)

for doc in docs:
    print(doc)

### `$lookup`

This stage allows for joining two collections (they must be in the same database) based on a shared key:

In [None]:
from pprint import pprint

docs = objects.aggregate([
    {'$project': {'probabilities': False}},  # remove existing probabilities to avoid duplicate field
    {
        '$lookup': {
            'from': 'probabilities',  # collection to join
            'localField': '_id',  # field in current collection (objects) to perform the join over
            'foreignField': 'aid',  # field in external collection (probabilities) to perform the join over
            'as': 'probs'  # name of the joint field in the collection
        }
    },
    {'$limit': 1}  #  we'll see one as an example
])

for doc in docs:
    pprint(doc)

It is also possible to use another pipeline within the second collection (starting on MongoDB 5.0):

In [None]:
docs = objects.aggregate([
    {
        '$match': {
            'ndet': {'$gt': 200, '$lt': 400}
        }
    },
    {
        '$project': {
            'probabilities': False  # remove existing probabilities to avoid duplicate field
        }
    },
    {
        '$lookup': {
            'from': 'probabilities',  # collection to join
            'localField': '_id',  # field in current collection (objects) to perform the join over
            'foreignField': 'aid',  # field in external collection (probabilities) to perform the join over
            'as': 'probs',  # name of the joint field in the collection
            'pipeline': [
                {
                    '$match': {
                        'classifier_name': 'stamp_classifier', 
                        'ranking': 1,
                        'class_name': 'VS',
                        'probability': {'$gte': 0.7}
                    }
                },
                {
                    '$project': {
                        '_id': False,
                        'aid': False
                    }
                }
            ]
        }
    }
])

for doc in docs:
    pprint(doc)

You can notice that most objects have empty probabilities. That is because all objects from the collection on which `aggregate` is called that are present at the stage of the pipeline before `$lookup` will always be preserved, even if there is no match in the collection we're joining from. Essentially, it is always a left outer join. We can use another stage to mimic an inner join, but keep in mind that if the input collection is too large, this step might be highly inefficient, even in the second collection has very few elements.

In [None]:
docs = objects.aggregate([
    {
        '$match': {
            'ndet': {'$gt': 200, '$lt': 400}
        }
    },
    {
        '$project': {
            'probabilities': False  # remove existing probabilities to avoid duplicate field
        }
    },
    {
        '$lookup': {
            'from': 'probabilities',  # collection to join
            'localField': '_id',  # field in current collection (objects) to perform the join over
            'foreignField': 'aid',  # field in external collection (probabilities) to perform the join over
            'as': 'probs',  # name of the joint field in the collection
            'pipeline': [
                {
                    '$match': {
                        'classifier_name': 'stamp_classifier', 
                        'ranking': 1,
                        'class_name': 'VS',
                        'probability': {'$gte': 0.7}
                    }
                },
                {
                    '$project': {
                        '_id': False,
                        'aid': False
                    }
                }
            ]
        }
    },
    {
        '$match': {
            'probs': {'$ne': []}  # only probs that are non-empty lists
        }
    }
])

for doc in docs:
    pprint(doc)

## Summary

There are several other stages we've not covered here. Check [here](https://www.mongodb.com/docs/v6.0/reference/operator/aggregation-pipeline/#std-label-aggregation-pipeline-operator-reference) for references on all of them, including those seen above.

Aggregation pipelines can be used to do more complex queries and have a fine tuned manipulation of the returned fields. However, if a simple query is all that's required, it is best to simply use `find`.

Recommendations for performance:

* The first stages in the pipeline should limit as much as possible the number of documents
* Be mindful of the number of stages, keep the pipeline as short as possible