# Introduction to Monggregate

Aggregations pipelines are the tool that allow the MongoDB database management system (DBMS) to really rival its SQL counterparts.
However, they can be hard to get a grip onto, especially for python developers.

Monggregate is a tool to help you build your MongoDB aggregation pipelines in python almost instinctively. 
In order to demonstrate its capabilities, I am going to reuse the examples from this [amazing tutorial](https://www.mongodb.com/developer/languages/python/python-quickstart-aggregation/) by Mark Smith.

## Prerequisites

The tutorial uses the *sample_mflix* dataset and requires at least python 3.6. 

However as of now, Monggregate is only available for python 3.10 .
In order to run it correctly, you will need to add your MongoDB password in .env file as follows
MONGODB_PASSWORD="insert your password"

## Getting Started

In [2]:
import os
from pprint import pprint
from dotenv import load_dotenv

import pymongo
from pymongo import MongoClient
from monggregate import Pipeline

In [3]:
load_dotenv(verbose=True)
PWD = os.environ["MONGODB_PASSWORD"]
MONGODB_URI = f"mongodb+srv://dev:{PWD}@myserver.xciie.mongodb.net/?retryWrites=true&w=majority"

In [4]:
# Connect to your MongoDB cluster:
client = MongoClient(MONGODB_URI)

In [5]:
# Loading sample data
db = client.sample_mflix
movie_collection = db.movies

## Your First Aggregation Pipeline

You can perform ordinary MongoDB Query Language (MQL) queries using aggregation pipelines.
Here we are going to filter movies based on their title and sort them by (release ?) year.

### Findind and Sorting

In [11]:
pipeline = Pipeline(collection="movies")
pipeline.match(
    query={
        "title":"A Star Is Born"
    }
).sort(
    by="year"
)

Pipeline(on_call=<OnCallEnum.EXPORT: 'export'>, collection='movies', stages=[Match(query={'title': 'A Star Is Born'}), Sort(descending=None, ascending=True, by=['year'], query={'year': 1})])

In regular pymongo, you will need to specify that year is a reference to a field by adding "$" in front of it. Here monggregate is smart enough to understand this and will take care of adding it automatically.

In [12]:
results = movie_collection.aggregate(pipeline())
for movie in results:
   print(" * {title}, {first_castmember}, {year}".format(
         title=movie["title"],
         first_castmember=movie["cast"][0],
         year=movie["year"],
   ))

 * A Star Is Born, Janet Gaynor, 1937
 * A Star Is Born, Judy Garland, 1954
 * A Star Is Born, Barbra Streisand, 1976


### Limit the Number of Results

We can limit the numbers of output results by adding one more step that can be chained to the existing steps.
I really wanted that the library has this chaining feature to offer an experience similar to pandas or other libraries

In [13]:
pipeline = Pipeline(collection="movies")
pipeline.match(
    title = "A Star Is Born"
).sort(
    year = pymongo.DESCENDING
).limit(1)

Pipeline(on_call=<OnCallEnum.EXPORT: 'export'>, collection='movies', stages=[Match(query={'title': 'A Star Is Born'}), Sort(descending=None, ascending=True, by=None, query={'year': -1}), Limit(value=1)])

In [14]:
results = movie_collection.aggregate(pipeline())
for movie in results:
   print(" * {title}, {first_castmember}, {year}".format(
         title=movie["title"],
         first_castmember=movie["cast"][0],
         year=movie["year"],
   ))

 * A Star Is Born, Barbra Streisand, 1976


## Look Up Related Data in Other Collections

It might not be obvious for developers that have never used MongoDB, and for SQL users considering the switch to NoSQL from far away but MongoDB does allow you to model relationships thanks to the feature that I am going to present now.

The lookup stage allow you to make a left-join between two collections. For me this is one of the coolest feature of the aggregation pipeline framework.

In [6]:
pipeline = Pipeline(collection="movies")
pipeline.lookup(
    right = "comments", # collection to join with the pipeline reference collection (movies)
    left_on = "_id", # key in the left collection
    right_on = "movie_id", # "foreign_key" in the right collection
    name = "related_comments" # name of the field that will contain the match in the foreign collection
                              # the match will be included as embedded documents in the "new collection"* produced by pipeline
).limit(5)

Pipeline(on_call=<OnCallEnum.EXPORT: 'export'>, collection='movies', stages=[Lookup(right='comments', on=None, left_on='_id', right_on='movie_id', name='related_comments', let=None, pipeline=None, type_=<LookupTypeEnum.SIMPLE: 'simple'>), Limit(value=5)])

*The lookup stage itself does not produce a "new collection", or more precisely, the output that you get is not stored in memory.
In fact, the aggregation pipelines almost never modify the original documents and collections.
In order to modify the original documents and collections and store the modifications in memory, you need to use the out stage.

In [9]:
results = movie_collection.aggregate(pipeline())
for movie in results:
   print(movie["title"], " - Comments: ", movie["related_comments"])

Blacksmith Scene  - Comments:  []
The Italian  - Comments:  []
Hell's Hinges  - Comments:  [{'_id': ObjectId('5a9427648b0beebeb6957a4b'), 'name': 'Gregor Clegane', 'email': 'hafthór_júlíus_björnsson@gameofthron.es', 'movie_id': ObjectId('573a1390f29313caabcd5b9a'), 'text': 'Voluptatum voluptatem nam et accusamus ullam qui explicabo exercitationem. Ut sint facilis aut similique dolorum non. Necessitatibus unde molestias incidunt asperiores nesciunt molestias.', 'date': datetime.datetime(2015, 2, 8, 1, 28, 23)}]
Intolerance: Love's Struggle Throughout the Ages  - Comments:  []
The Blue Bird  - Comments:  []


The results here are not brilliant because most of the movies don't have any comments. Let's filter on movies with a minimum of comments.

### Matching on Array Length

In [17]:
pipeline = Pipeline(collection="movies")
pipeline.lookup(
    right = "comments",
    left_on = "_id",
    right_on = "movie_id",
    name = "related_comments"
).add_fields( # adding a comment_count field to be able to filter out documents without comments
    comment_count = {
        "$size" : "$related_comments"
    }
).match( # filtering out documents that don't have at least 2 comments
    comment_count = {
        "$gt":2
    }
).limit(5)

Pipeline(on_call=<OnCallEnum.EXPORT: 'export'>, collection='movies', stages=[Lookup(right='comments', on=None, left_on='_id', right_on='movie_id', name='related_comments', let=None, pipeline=None, type_=<LookupTypeEnum.SIMPLE: 'simple'>), Set(document={'comment_count': {'$size': '$related_comments'}}), Match(query={'comment_count': {'$gt': 2}}), Limit(value=5)])

In [18]:
results = movie_collection.aggregate(pipeline())
for movie in results:
   print(movie["title"])
   print("Comment count:", movie["comment_count"])

   # Loop through the first 5 comments and print the name and text:
   for comment in movie["related_comments"][:5]:
         print(" * {name}: {text}".format(
            name=comment["name"],
            text=comment["text"]))
   print("============================")

Upstream
Comment count: 3
 * Jordan Medina: Adipisci vel dolores tenetur sit inventore. Doloribus dolor nesciunt voluptas saepe veritatis. Mollitia eum iure ut nam.
 * Theresa Holmes: Unde ut eum doloremque expedita commodi exercitationem. Error soluta temporibus quasi. Libero quam nulla mollitia officia ipsa. Odio harum cupiditate a dignissimos.
 * Mace Tyrell: Assumenda quibusdam vel reprehenderit error. Optio voluptatibus maxime tempore velit. Architecto modi possimus officia minima eum quis quis.
Midnight
Comment count: 3
 * Teresa Thomas: Atque quaerat occaecati mollitia dolore explicabo amet. Perspiciatis natus modi similique harum. Eligendi commodi iure molestias.
 * Tommen Baratheon: Illum ipsa ipsum architecto tenetur nemo facere iste. Doloribus quasi id cum expedita voluptatibus nesciunt accusamus. Nesciunt corrupti est eum ut facere perferendis ex.
 * Lisa Rasmussen: Sed est earum harum ad necessitatibus molestias esse. Deserunt enim molestiae delectus totam odit quos iure. 

## Grouping Documents with $group

In this last example, we will finally understand the "aggregation" in "aggregation framework" by showing how to group documents.

Let's see how many movies have been released each year.

In [19]:
pipeline = Pipeline(collection="movies")
pipeline.group(by="year")

Pipeline(on_call=<OnCallEnum.EXPORT: 'export'>, collection='movies', stages=[Group(by='$year', query={'_id': '$year'})])

Straightforward, almost resembles a SQL query

In [20]:
results = movie_collection.aggregate(pipeline())
for year_summary in results:
   pprint(year_summary)

{'_id': 2006}
{'_id': 1992}
{'_id': '2006è2007'}
{'_id': '1995è'}
{'_id': 1996}
{'_id': 1981}
{'_id': '1987è'}
{'_id': 1940}
{'_id': 1942}
{'_id': 1938}
{'_id': 2002}
{'_id': 2014}
{'_id': 1945}
{'_id': 1997}
{'_id': 1925}
{'_id': 1990}
{'_id': 1911}
{'_id': 1958}
{'_id': 1931}
{'_id': '2003è'}
{'_id': 1921}
{'_id': 1949}
{'_id': 1918}
{'_id': 1941}
{'_id': 1930}
{'_id': 1893}
{'_id': 1988}
{'_id': 2004}
{'_id': 2008}
{'_id': 1961}
{'_id': '1997è'}
{'_id': 2010}
{'_id': 1971}
{'_id': 1920}
{'_id': 1987}
{'_id': 1989}
{'_id': '1986è'}
{'_id': '2007è'}
{'_id': 1916}
{'_id': 1943}
{'_id': 1994}
{'_id': 1985}
{'_id': 1973}
{'_id': 1998}
{'_id': 1999}
{'_id': 2015}
{'_id': '2005è'}
{'_id': 2001}
{'_id': 1924}
{'_id': 1948}
{'_id': 2012}
{'_id': 1912}
{'_id': 1929}
{'_id': 1964}
{'_id': 1919}
{'_id': '2012è'}
{'_id': 1991}
{'_id': 1951}
{'_id': 1923}
{'_id': 2011}
{'_id': '2006è'}
{'_id': 1891}
{'_id': 1967}
{'_id': 1936}
{'_id': 1986}
{'_id': '1981è'}
{'_id': 2007}
{'_id': 1974}
{'_id': '20

Except that the results, is not really what we expected, yet it is correct since we haven't specified what to do with the groups.

Let's fix this.

In [21]:
pipeline = Pipeline(collection="movies")
pipeline.group(
    by = "year",
    query = {
        "movie_count" : {"$sum":1}
    }
)

Pipeline(on_call=<OnCallEnum.EXPORT: 'export'>, collection='movies', stages=[Group(by='$year', query={'movie_count': {'$sum': 1}, '_id': '$year'})])

In [22]:
results = movie_collection.aggregate(pipeline())
for year_summary in results:
   pprint(year_summary)


{'_id': 1947, 'movie_count': 38}
{'_id': 1995, 'movie_count': 396}
{'_id': 1999, 'movie_count': 542}
{'_id': 1973, 'movie_count': 123}
{'_id': 1998, 'movie_count': 552}
{'_id': 2015, 'movie_count': 485}
{'_id': '2005è', 'movie_count': 2}
{'_id': 1985, 'movie_count': 204}
{'_id': 1994, 'movie_count': 318}
{'_id': 1916, 'movie_count': 4}
{'_id': 1943, 'movie_count': 36}
{'_id': 1989, 'movie_count': 244}
{'_id': '1986è', 'movie_count': 1}
{'_id': '2007è', 'movie_count': 3}
{'_id': 1971, 'movie_count': 116}
{'_id': 1920, 'movie_count': 5}
{'_id': 1961, 'movie_count': 94}
{'_id': 1987, 'movie_count': 239}
{'_id': 2008, 'movie_count': 969}
{'_id': 2011, 'movie_count': 1040}
{'_id': 1923, 'movie_count': 6}
{'_id': 1951, 'movie_count': 62}
{'_id': 1919, 'movie_count': 2}
{'_id': 1991, 'movie_count': 252}
{'_id': '2012è', 'movie_count': 3}
{'_id': 1912, 'movie_count': 2}
{'_id': 1929, 'movie_count': 10}
{'_id': 1964, 'movie_count': 113}
{'_id': 2012, 'movie_count': 1109}
{'_id': 1948, 'movie_co

This is a bit better.

Now, let's only look at the last couple of years and show the movie titles.

In [22]:
pipeline = Pipeline(collection="movies")
pipeline.match(
    year = {
        "$type" : "number"
    }
).group(
    by = "year",
    query = {
        "movie_count" : {"$sum":1},
        "movie_titles" : {"$push":"$title"}
    }
).sort(
    by = "_id", # year has in a way been renamed _id in the previous stage ...
                # this is a feature of MongoDb itself and it isn't something done by monggregate directly.
    descending = True
).limit(10)

Pipeline(on_call=<OnCallEnum.EXPORT: 'export'>, collection='movies', stages=[Match(query={'year': {'$type': 'number'}}), Group(by='$year', query={'movie_count': {'$sum': 1}, 'movie_titles': {'$push': '$title'}, '_id': '$year'}), Sort(descending=True, ascending=False, by=['_id'], query={'_id': -1})])

In [23]:
pipeline()

[{'$match': {'year': {'$type': 'number'}}},
 {'$group': {'movie_count': {'$sum': 1},
   'movie_titles': {'$push': '$title'},
   '_id': '$year'}},
 {'$sort': {'_id': -1}}]

In [24]:
results = movie_collection.aggregate(pipeline())
for year_summary in results:
   pprint(year_summary)

{'_id': 2016, 'movie_count': 1, 'movie_titles': ['The Masked Saint']}
{'_id': 2015,
 'movie_count': 485,
 'movie_titles': ['Jurassic World',
                  'The Stanford Prison Experiment',
                  'Ex Machina',
                  'Ant-Man',
                  'The Wedding Ringer',
                  'The Danish Girl',
                  "Good Ol' Boy",
                  'A Tale of Love and Darkness',
                  'Terminator Genisys',
                  'Beasts of No Nation',
                  'Aloha',
                  'Sam',
                  'Mad Max: Fury Road',
                  'Fantastic Four',
                  'Tab Hunter Confidential',
                  'Jupiter Ascending',
                  'Racing Extinction',
                  'Z for Zachariah',
                  'Cinderella',
                  'The DUFF',
                  'Catching the Sun',
                  'Drunk Stoned Brilliant Dead: The Story of the National '
                  'Lampoon',
            

## Conclusions and Next Steps

# References