# MongoDB Aggregation Pipeline In Python

Learn about the various stages and configurations you can create to configure an Aggregation Pipeline

Stages: [match](#match), [project](#project), [unset](#unset), [limit](#limit), [skip](#skip), [sort](#sort), [count](#count), [sortByCount](#sortByCount), [unwind](#unwind), [group](#group), [addFields](#addFields), [sample](#sample), [lookup](#lookup), [unionWith](#unionWith), [out](#out), [merge](#merge)

Operators: [size](#size-(operator)), [in](#in-(operator)), [arrayElemAt](#arrayElemAt-(operator)), [first](#first-(operator)), [count](#count-(accumulator-operator)), [sum](#sum-(accumulator-operator)), [first, last](#first,-last-(accumulator-operators)), [push](#push-(accumulator-operator)), [addToSet](#addToSet-(accumulator-operator)), [regexMatch](#regexMatch-(operator)), [cond](#cond-(operator)), [Date](#Date-Operators), [expr](#expr-(operator)), [ifNull](#ifNull-(operator)), [type](#type-(operator)), [switch](#switch-(operator))

In [11]:
from pymongo import MongoClient

In [12]:
mongodb_uri = "mongodb://localhost:27017/"
db_name = "aggregation_test"

In [13]:
client = MongoClient(mongodb_uri)
db = client[db_name]

### Helper Function

In [23]:
def print_cursor(cursor):
    for document in cursor:
        print(document, end="\n\n")

### Inserting Some Sample Data

In [15]:
#it double checks if there is data or not -- learn it
import aggregatehelper.insert_aggregation_sample_data as iasd

iasd.insert_data(mongodb_uri, db_name)

Entries already exist in the aggregation_test database in the users, products, or orders collection. Insert commands aborted.


### match

In [21]:
match_cursor = db.products.aggregate([
    {"$match": {"name": "Pens"}}
])

In [22]:
print_cursor(match_cursor)

{'_id': ObjectId('640b4ff43c493ec19556619b'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School']}

{'_id': ObjectId('640c70acd17bdad3315514fc'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School']}



In [49]:
# returns commandCursor - like cursor Object
match_cursor = db.products.aggregate([
    {"$match": {"$or": [
        {"tags": "Beauty"},
        {"tags": "Home"}]}}
])

In [50]:
print_cursor(match_cursor)

{'_id': ObjectId('640b4ff43c493ec195566199'), 'name': 'Mug', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Home', 'Kitchen']}

{'_id': ObjectId('640b4ff43c493ec19556619a'), 'name': 'Moisturizer', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec19556619c'), 'name': 'Face Cleanser', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec19556619d'), 'name': 'Concealer Makeup', 'seller_id': ObjectId('640b4ff43c493ec195566196'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec19556619e'), 'name': 'Eyeliner', 'seller_id': ObjectId('640b4ff43c493ec195566197'), 'tags': ['Beauty']}



### project

In [51]:
#change returned column name as you wish with $ infront of the column name you want to rename
project_cursor = db.products.aggregate([
    {"$project": {"_id": 0, "name_as_product_name": "$name", "tags": 1}}])

In [52]:
print_cursor(project_cursor)

{'tags': ['Home', 'Kitchen'], 'name_as_product_name': 'Mug'}

{'tags': ['Beauty'], 'name_as_product_name': 'Moisturizer'}

{'tags': ['Office', 'School'], 'name_as_product_name': 'Pens'}

{'tags': ['Beauty'], 'name_as_product_name': 'Face Cleanser'}

{'tags': ['Beauty'], 'name_as_product_name': 'Concealer Makeup'}

{'tags': ['Beauty'], 'name_as_product_name': 'Eyeliner'}

{'tags': ['Office', 'School'], 'name_as_product_name': 'Pens'}



In [53]:
match_project_cursor = db.products.aggregate([
    # copied from above match statement
    {"$match": {"$or": [{"name": "Pens"}]}},
    {"$project": {"_id": 0, "name_as_product_name": "$name", "tags": 1}}])

print_cursor(match_project_cursor)

{'tags': ['Office', 'School'], 'name_as_product_name': 'Pens'}

{'tags': ['Office', 'School'], 'name_as_product_name': 'Pens'}



In [54]:
# no results
match_project_cursor = db.products.aggregate([
    {"$match": {"$or": [{"name": "Pens"}]}},
    {"$project": {"_id": 0, "name_as_product_name": "$name", "tags": 1}},
    {"$match": {"name": "Pens"}}
])

print_cursor(match_project_cursor)

In [55]:
# but if you change 2nd match to product name then it sees
match_project_cursor = db.products.aggregate([
    {"$match": {"$or": [{"name": "Pens"}]}},
    {"$project": {"_id": 0, "product_name": "$name", "tags": 1}},
    {"$match": {"product_name": "Pens"}}
])

print_cursor(match_project_cursor)

{'tags': ['Office', 'School'], 'product_name': 'Pens'}

{'tags': ['Office', 'School'], 'product_name': 'Pens'}



### unset

In [56]:
# projection like , but  not passing attributes you want to keep but pass attributes you want to discard
unset_project_cursor = db.products.aggregate([
    {"$unset": ["_id", "seller_id"]}
])
print_cursor(unset_project_cursor)

{'name': 'Mug', 'tags': ['Home', 'Kitchen']}

{'name': 'Moisturizer', 'tags': ['Beauty']}

{'name': 'Pens', 'tags': ['Office', 'School']}

{'name': 'Face Cleanser', 'tags': ['Beauty']}

{'name': 'Concealer Makeup', 'tags': ['Beauty']}

{'name': 'Eyeliner', 'tags': ['Beauty']}

{'name': 'Pens', 'tags': ['Office', 'School']}



In [57]:
unset_project_cursor = db.products.aggregate([
    {"$match": {"$or": [{"name": "Pens"}]}},
    {"$unset": ["_id", "seller_id"]}
])
print_cursor(unset_project_cursor)

{'name': 'Pens', 'tags': ['Office', 'School']}

{'name': 'Pens', 'tags': ['Office', 'School']}



### limit

In [58]:
limit_project_cursor = db.products.aggregate([
    {"$limit": 3}
])
print_cursor(limit_project_cursor)

{'_id': ObjectId('640b4ff43c493ec195566199'), 'name': 'Mug', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Home', 'Kitchen']}

{'_id': ObjectId('640b4ff43c493ec19556619a'), 'name': 'Moisturizer', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec19556619b'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School']}



### skip

In [59]:
skip_project_cursor = db.products.aggregate([
    {"$skip": 2}
])
print_cursor(skip_project_cursor)

{'_id': ObjectId('640b4ff43c493ec19556619b'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School']}

{'_id': ObjectId('640b4ff43c493ec19556619c'), 'name': 'Face Cleanser', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec19556619d'), 'name': 'Concealer Makeup', 'seller_id': ObjectId('640b4ff43c493ec195566196'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec19556619e'), 'name': 'Eyeliner', 'seller_id': ObjectId('640b4ff43c493ec195566197'), 'tags': ['Beauty']}

{'_id': ObjectId('640c70acd17bdad3315514fc'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School']}



In [60]:
skip_and_limit_project_cursor = db.products.aggregate([
    {"$skip": 2},
    {"$limit": 3}
])
print_cursor(skip_and_limit_project_cursor)

{'_id': ObjectId('640b4ff43c493ec19556619b'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School']}

{'_id': ObjectId('640b4ff43c493ec19556619c'), 'name': 'Face Cleanser', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec19556619d'), 'name': 'Concealer Makeup', 'seller_id': ObjectId('640b4ff43c493ec195566196'), 'tags': ['Beauty']}



In [61]:
# order matters
limit_and_skip_project_cursor = db.products.aggregate([
    {"$limit": 3},
    {"$skip": 2}
])
print_cursor(limit_and_skip_project_cursor)

{'_id': ObjectId('640b4ff43c493ec19556619b'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School']}



### sort

In [62]:
# Ascending order alphabetical order
sort_cursor = db.products.aggregate([
    {"$sort": {"name": 1}}
])
print_cursor(sort_cursor)

### count

In [63]:
count_cursor = db.products.aggregate([
    {"$match": {"tags": "Beauty"}},
    {"$count": "beauty_products_count"}
])
print_cursor(count_cursor)

### sortByCount

In [64]:
# sorted from highest to the lowest

sortByCount_cursor = db.products.aggregate([
    {"$sortByCount": "$tags"}
])
print_cursor(sortByCount_cursor)

### size (operator)

In [65]:
project_cursor = db.products.aggregate([
    {"$project": {"_id": 0, "name": 1, "num_tags": {"$size": "$tags"}, "tags": "$tags"}}
])
print_cursor(project_cursor)

In [66]:
# this is same but only difference is order of printing!
# here tags :1 is showing tags firs, but if you need num_tags first displayed then use "tags": "$tags"
project_cursor = db.products.aggregate([
    {"$project": {"_id": 0, "name": 1, "num_tags": {"$size": "$tags"}, "tags": 1}}
])
print_cursor(project_cursor)

### in (operator)

In [67]:
#which products have bauty tag within the array
project_cursor = db.products.aggregate([
    {"$project": {"_id": 0, "name": 1, "is_beauty_product": {"$in": ["Beauty", "$tags"]}, "tags": "$tags"}}
])
print_cursor(project_cursor)

### arrayElemAt (operator)

In [68]:
# grab value at particular index
project_cursor = db.products.aggregate([
    {"$project": {"_id": 0, "name": 1, "first_tag": {"$arrayElemAt": ["$tags", 0]}, "tags": "$tags"}}
])
print_cursor(project_cursor)

In [69]:
# if length of array is smaller than index value you specify that entries that have an array with length will not have attributes is not showed at all
# check second entryhas not first_tag attribute
project_cursor = db.products.aggregate([
    {"$project": {"_id": 0, "name": 1, "first_tag": {"$arrayElemAt": ["$tags", 1]}, "tags": "$tags"}}
])
print_cursor(project_cursor)

### first (operator)

In [70]:

project_cursor = db.products.aggregate([
    {"$project": {"_id": 0, "name": 1, "first_tag": {"$first": "$tags"}, "tags": "$tags"}}
])
print_cursor(project_cursor)

In [71]:
project_cursor = db.products.aggregate([
    {"$project": {"_id": 0, "name": 1, "first_tag": {"$last": "$tags"}, "tags": "$tags"}}
])
print_cursor(project_cursor)

### unwind

In [91]:
# takes entry  that has array of values and
# create separate entry with each individual array element
# that has all other attributes in the document

# tags is list so it creates independent row for each element of the list
#like flatten

unwind_cursor = db.products.aggregate([
    {"$unwind": "$tags"},
    {"$unset": ["_id", "seller_id"]}
])
print_cursor(unwind_cursor)

In [73]:
unwind_cursor = db.orders.aggregate([
    {"$unwind": "$items"},
    {"$unset": "_id"}
])
print_cursor(unwind_cursor)

In [74]:
#use includeArrayIndex for the index of the array use "table_index" to identify index of the array

unwind_cursor = db.products.aggregate([
    {"$match": {"tags": {"$size": 2}}},  # if size of tags_array >2 then show
    {"$unwind": {"path": "$tags", "includeArrayIndex": "tag_index"}},  #create elements for each of the
    {"$unset": ["_id", "seller_id"]}
])
print_cursor(unwind_cursor)

### group

In [98]:
#cluter together all distinct entries
groups_cursor = db.products.aggregate([
    {"$group": {"_id": "$tags"}}
])
print_cursor(groups_cursor)

In [101]:
groups_cursor = db.products.aggregate([
    {"$unwind": {"path": "$tags", "includeArrayIndex": "tag_index"}},
    {"$group": {"_id": "$tags"}}
])
print_cursor(groups_cursor)

In [77]:
groups_cursor = db.products.aggregate([
    {"$unwind": "$tags"},
    {"$group": {"_id": "$tags"}}
])
print_cursor(groups_cursor)

### count (accumulator operator)

In [78]:
#count operator to count how many entries with each set
groups_cursor = db.products.aggregate([
    {"$group": {"_id": "$tags", "num_entries": {"$count": {}}}}
])
print_cursor(groups_cursor)

### sum (accumulator operator)

In [79]:
groups_cursor = db.orders.aggregate([
    {"$unwind": "$items"},
    # {"$group": {"_id": "$items.product_id", "total_quantity": {"$sum": "$items.quantity" }}}
])
print_cursor(groups_cursor)

In [80]:
groups_cursor = db.orders.aggregate([
    {"$unwind": "$items"},
    {"$group": {"_id": "$items.product_id", "total_quantity": {"$sum": "$items.quantity"}}}
])
print_cursor(groups_cursor)

### first, last (accumulator operators)

In [81]:
groups_cursor = db.products.aggregate([
    {"$group": {"_id": "$tags", "num_entries": {"$count": {}}, "first": {"$first": "$name"},
                "last": {"$last": "$name"}}}
])
print_cursor(groups_cursor)

### push (accumulator operator)

In [82]:
# push == append on listin python
#append name for each entry
groups_cursor = db.products.aggregate([
    {"$group": {"_id": "$tags", "products": {"$push": "$name"}}}
])
print_cursor(groups_cursor)

### addToSet (accumulator operator)

In [83]:
groups_cursor = db.products.aggregate([
    {"$group": {"_id": "$tags", "products": {"$addToSet": "$name"}}}
])
print_cursor(groups_cursor)

In [84]:
groups_cursor = db.products.aggregate([
    {"$group": {"_id": "$tags", "products": {"$addToSet": {"name": "$name", "seller_id": "$seller_id"}}}}
])
print_cursor(groups_cursor)

### $$ROOT (system variable)

In [85]:
#goes up
groups_cursor = db.products.aggregate([
    {"$group": {"_id": "$tags", "products": {"$addToSet": "$$ROOT"}}}
])
print_cursor(groups_cursor)

### addFields

In [27]:
add_fields_cursor = db.products.aggregate([
    {"$match": {"name": "Pens"}},
    {"$addFields": {"my_new_field": "hi there", "num_tags": {"$size": "$tags"}}}
])
print_cursor(add_fields_cursor)

{'_id': ObjectId('640b4ff43c493ec19556619b'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School'], 'my_new_field': 'hi there', 'num_tags': 2}

{'_id': ObjectId('640c70acd17bdad3315514fc'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School'], 'my_new_field': 'hi there', 'num_tags': 2}



### sample

In [42]:
#allows to specify random sample amount of your entries to put through the pipeline
sample_cursor = db.products.aggregate([
    {"$sample": {"size": 3}},
    {"$unset": ["_id", "seller_id"]}
])

print_cursor(sample_cursor)

{'name': 'Mug', 'tags': ['Home', 'Kitchen']}

{'name': 'Pens', 'tags': ['Office', 'School']}

{'name': 'Eyeliner', 'tags': ['Beauty']}



### lookup

In [44]:
# Join + From
# joining entries between db
lookup_cursor = db.products.aggregate([
    {"$lookup": {
        "from": "users",
        "localField": "seller_id",  # base table that is gathering other tables to himself, king
        "foreignField": "_id",  #from table, barons, knights come to join
        "as": "sellers"
    }}

])
print_cursor(lookup_cursor)

{'_id': ObjectId('640b4ff43c493ec195566199'), 'name': 'Mug', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Home', 'Kitchen'], 'sellers': [{'_id': ObjectId('640b4ff43c493ec195566193'), 'name': 'Sarah'}]}

{'_id': ObjectId('640b4ff43c493ec19556619a'), 'name': 'Moisturizer', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Beauty'], 'sellers': [{'_id': ObjectId('640b4ff43c493ec195566193'), 'name': 'Sarah'}]}

{'_id': ObjectId('640b4ff43c493ec19556619b'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School'], 'sellers': [{'_id': ObjectId('640b4ff43c493ec195566194'), 'name': 'Bob'}]}

{'_id': ObjectId('640b4ff43c493ec19556619c'), 'name': 'Face Cleanser', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Beauty'], 'sellers': [{'_id': ObjectId('640b4ff43c493ec195566193'), 'name': 'Sarah'}]}

{'_id': ObjectId('640b4ff43c493ec19556619d'), 'name': 'Concealer Makeup', 'seller_id': ObjectId('640b4ff43c493ec195566196'),

In [51]:
#product name, seller name one
lookup_cursor = db.products.aggregate([
    {"$lookup": {
        "from": "users",
        "localField": "seller_id",  # base table that is gathering other tables to himself, king
        "foreignField": "_id",  #from table, barons, knights come to join
        "as": "sellers"
    }},
    {"$project": {"_id": 0, "product_name": "$name", "seller_name": {"$first": "$sellers.name"}}}
])
print_cursor(lookup_cursor)

{'product_name': 'Mug', 'seller_name': 'Sarah'}

{'product_name': 'Moisturizer', 'seller_name': 'Sarah'}

{'product_name': 'Pens', 'seller_name': 'Bob'}

{'product_name': 'Face Cleanser', 'seller_name': 'Sarah'}

{'product_name': 'Concealer Makeup', 'seller_name': 'Lisa'}

{'product_name': 'Eyeliner', 'seller_name': 'Jessica'}

{'product_name': 'Pens', 'seller_name': 'Bob'}



In [61]:
# all names of product of particular user cells

lookup_pr = db.users.aggregate([
    {
        "$lookup": {
            "from": "products",
            "localField": "_id",
            "foreignField": "seller_id",
            "as": "products"
        }
    },
    {"$project": {"_id": 0, "user": "$name", "product": "$products.name"}}
])

print_cursor(lookup_pr)

{'user': 'Sarah', 'product': ['Mug', 'Moisturizer', 'Face Cleanser']}

{'user': 'Bob', 'product': ['Pens', 'Pens']}

{'user': 'Jose', 'product': []}

{'user': 'Lisa', 'product': ['Concealer Makeup']}

{'user': 'Jessica', 'product': ['Eyeliner']}

{'user': 'Tina', 'product': []}



In [63]:
# all names of product of particular user cells , display users selling at least one product

lookup_pr = db.users.aggregate([
    {
        "$lookup": {
            "from": "products",
            "localField": "_id",
            "foreignField": "seller_id",
            "as": "products"
        }
    },
    #-----------------------------------------------------------------
    # this field is added to count at "least one product" part
    #count number of product that match user id
    {"$addFields": {"num_products": {"$size": "$products"}}},
    #include results that has one entry
    {"$match": {"num_products": {"$gte": 1}}},
    #-----------------------------------------------------------------
    {"$project": {"_id": 0, "seller_name": "$name", "product": "$products.name"}}
])

print_cursor(lookup_pr)

{'seller_name': 'Sarah', 'product': ['Mug', 'Moisturizer', 'Face Cleanser']}

{'seller_name': 'Bob', 'product': ['Pens', 'Pens']}

{'seller_name': 'Lisa', 'product': ['Concealer Makeup']}

{'seller_name': 'Jessica', 'product': ['Eyeliner']}



In [67]:
lookup_pr = db.products.aggregate([
    #id and corresponding product names
    {"$group": {"_id": "$seller_id", "product_names": {"$push": "$name"}}},
    # translate id to seller names by lookup
    {
        "$lookup": {
            "from": "users",
            "localField": "_id",
            "foreignField": "_id",
            "as": "sellers"
        }
    },
    {"$project": {"_id": 0, "seller_name": {"$first": "$sellers.name"}, "products": "$product_names"}},
])

print_cursor(lookup_pr)

{'seller_name': 'Lisa', 'products': ['Concealer Makeup']}

{'seller_name': 'Bob', 'products': ['Pens', 'Pens']}

{'seller_name': 'Sarah', 'products': ['Mug', 'Moisturizer', 'Face Cleanser']}

{'seller_name': 'Jessica', 'products': ['Eyeliner']}



### unionWith

In [68]:
# append that collection to the data pipeline
union_with_cursor = db.products.aggregate([
    {"$unionWith": "users"}
])

print_cursor(union_with_cursor)

{'_id': ObjectId('640b4ff43c493ec195566199'), 'name': 'Mug', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Home', 'Kitchen']}

{'_id': ObjectId('640b4ff43c493ec19556619a'), 'name': 'Moisturizer', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec19556619b'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School']}

{'_id': ObjectId('640b4ff43c493ec19556619c'), 'name': 'Face Cleanser', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec19556619d'), 'name': 'Concealer Makeup', 'seller_id': ObjectId('640b4ff43c493ec195566196'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec19556619e'), 'name': 'Eyeliner', 'seller_id': ObjectId('640b4ff43c493ec195566197'), 'tags': ['Beauty']}

{'_id': ObjectId('640c70acd17bdad3315514fc'), 'name': 'Pens', 'seller_id': ObjectId('640b4ff43c493ec195566194'), 'tags': ['Office', 'School']}

{'

### regexMatch (operator)

In [70]:
#search for particular product or seller name
user_search = "is"


In [73]:
#returns true if finds in matched field
union_with_cursor = db.products.aggregate([
    {"$unionWith": "users"},
    {"$addFields": {"matched_result1": {"$regexMatch": {"input": "$name", "regex": user_search}}}},
    {"$match": {"matched_result1": True}},
    {"$unset": "matched_result1"}
])

print_cursor(union_with_cursor)

{'_id': ObjectId('640b4ff43c493ec19556619a'), 'name': 'Moisturizer', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec195566196'), 'name': 'Lisa'}



In [75]:
#case sensitive us lower letters
user_search = "IS"
union_with_cursor = db.products.aggregate([
    {"$unionWith": "users"},
    {"$addFields": {"matched_result1": {"$regexMatch": {"input": "$name", "regex": user_search, "options": "i"}}}},
    {"$match": {"matched_result1": True}},
    {"$unset": "matched_result1"}
])

print_cursor(union_with_cursor)

{'_id': ObjectId('640b4ff43c493ec19556619a'), 'name': 'Moisturizer', 'seller_id': ObjectId('640b4ff43c493ec195566193'), 'tags': ['Beauty']}

{'_id': ObjectId('640b4ff43c493ec195566196'), 'name': 'Lisa'}



### out
*Note: You can potentially overwrite all your data in a collection with this stage, use with caution*

In [84]:
# you need to keep data that is recently saved
#snapshot of results like Views
#outputs resulting entries from ur pipeline to collection
# its like insert statement at end of your pipeline except the fact

# !!!! it tries to completely overwrite any data within the colletion !!!!

#!!!! if you run second time then old data is deleted

out_cursor = db.products.aggregate([
    {"$match": {"tags": "Beauty"}},
    {"$out": {"db": "aggregation_test", "coll": "beauty_products"}}
])
print_cursor(out_cursor)

### merge
*Note: You can potentially overwrite data within a collection with this stage, use with caution*

In [81]:
# if you just want to append the results
#excausts the cursor while merging the reulst of our query

merge_cursor = db.products.aggregate([
    {"$match": {"tags": "Beauty"}},
    {"$merge": {"into": {"db": "aggregation_test", "coll": "beauty_products"}}}
])



In [89]:
# add new field / column only
# possible options
#  on
#  whenMatched  - merge/replace/keep existing data/all pipeline fails
#  whenNotMatched  - insert/discard/fail
merge_cursor = db.products.aggregate([
    {"$match": {"tags": "Beauty"}},
    # {"$project": {"myNewColumn": "test"}},
    {"$merge": {
        "into": {"db": "aggregation_test", "coll": "beauty_products"},
        "on": "_id",  # must specify a indexed attribute
        "whenMatched": "replace",
        "whenNotMatched": "insert"  # collection not created
    }}
])


In [86]:
# add new field / column only
# possible options
#  on
#  whenMatched  - merge/replace/keep existing data/all pipeline fails
#  whenNotMatched  - insert/discard/fail
merge_cursor = db.products.aggregate([
    {"$match": {"tags": "Beauty"}},
    {"$project": {"myNewColumn": "test"}},
    {"$merge": {
        "into": {"db": "aggregation_test", "coll": "beauty_products"},
        "on": "_id",  # must specify a indexed attribute
        "whenMatched": "replace",
        "whenNotMatched": "discard"  # collection not created
    }}
])



In [87]:
# add new field / column only
# possible options
#  on
#  whenMatched  - merge/replace/keep existing data/all pipeline fails
#  whenNotMatched  - insert/discard/fail
merge_cursor = db.products.aggregate([
    {"$match": {"tags": "Beauty"}},
    {"$project": {"myNewColumn": "test"}},
    {"$merge": {
        "into": {"db": "aggregation_test", "coll": "beauty_products"},
        "on": "_id",  # must specify a indexed attribute
        "whenMatched": "replace",
        "whenNotMatched": "fail"  # collection not created
    }}
])


OperationFailure: PlanExecutor error during aggregation :: caused by :: $merge could not find a matching document in the target collection for at least one document in the source collection, full error: {'ok': 0.0, 'errmsg': 'PlanExecutor error during aggregation :: caused by :: $merge could not find a matching document in the target collection for at least one document in the source collection', 'code': 13113, 'codeName': 'MergeStageNoMatchingDocument'}

### cond (operator)

In [95]:
# allows to use if else statement
cond_cursor = db.users.aggregate([
    {"$project": {"_id": 0, "name": 1, "is_bob": {
        "$cond": {"if": {"$eq": ["$name", "Bob"]}, "then": True, "else": False}
    }}}
])
print_cursor(cond_cursor)

{'name': 'Sarah', 'is_bob': False}

{'name': 'Bob', 'is_bob': True}

{'name': 'Jose', 'is_bob': False}

{'name': 'Lisa', 'is_bob': False}

{'name': 'Jessica', 'is_bob': False}

{'name': 'Tina', 'is_bob': False}



In [97]:
# combine with array operators
cond_cursor = db.products.aggregate([
    {"$project": {"_id": 0, "name": 1, "is_beauty_product": {
        "$cond": {"if": {"$in": ["Beauty", "$tags"]}, "then": True, "else": False}
    }}}
])
print_cursor(cond_cursor)

{'name': 'Mug', 'is_beauty_product': False}

{'name': 'Moisturizer', 'is_beauty_product': True}

{'name': 'Pens', 'is_beauty_product': False}

{'name': 'Face Cleanser', 'is_beauty_product': True}

{'name': 'Concealer Makeup', 'is_beauty_product': True}

{'name': 'Eyeliner', 'is_beauty_product': True}

{'name': 'Pens', 'is_beauty_product': False}



### $$NOW (system variable)

In [98]:
#date operators aggregation pipeline offers
now_cursor = db.users.aggregate([
    {"$match": {"name": "Bob"}},
    {"$project": {"_id": 0, "name": 1, "date_joined": "$$NOW"
                  }}
])
print_cursor(now_cursor)

{'name': 'Bob', 'date_joined': datetime.datetime(2023, 3, 11, 16, 19, 54, 692000)}



### Date Operators

In [100]:
#date operators aggregation pipeline offers
date_add_cursor = db.users.aggregate([
    {"$match": {"name": "Bob"}},
    {"$addFields": {"date_joined": "$$NOW"}},
    {"$project": {"_id": 0, "name": 1, "premium_exp_date": {
        "$dateAdd": {"startDate": "$date_joined", "unit": "day", "amount": 7}
    }}}
])
print_cursor(date_add_cursor)

{'name': 'Bob', 'premium_exp_date': datetime.datetime(2023, 3, 18, 16, 26, 51, 256000)}



In [103]:
date_diff_cursor = db.users.aggregate([
    {"$match": {"name": "Bob"}},
    {"$addFields": {"date_joined": "$$NOW"}},
    {"$project": {"_id": 0, "name": 1, "date_joined": 1, "premium_exp_date": {
        "$dateAdd": {"startDate": "$date_joined", "unit": "week", "amount": 2}
    }}},
    {"$addFields": {"premium_days_left": {
        "$dateDiff": {"startDate": "$date_joined", "endDate": "$premium_exp_date", "unit": "day"}
    }}}
])
print_cursor(date_diff_cursor)

{'name': 'Bob', 'date_joined': datetime.datetime(2023, 3, 11, 16, 32, 19, 738000), 'premium_exp_date': datetime.datetime(2023, 3, 25, 16, 32, 19, 738000), 'premium_days_left': 14}



In [107]:
date_to_parts_cursor = db.users.aggregate([
    {"$match": {"name": "Bob"}},
    {"$addFields": {"date_joined": "$$NOW"}},
    {"$project": {"_id": 0, "name": 1, "date_joined": 1, "date_joined_parts": {
        "$dateToParts": {"date": "$date_joined"}
    }}}
])
print_cursor(date_to_parts_cursor)

{'name': 'Bob', 'date_joined': datetime.datetime(2023, 3, 11, 16, 36, 1, 936000), 'date_joined_parts': {'year': 2023, 'month': 3, 'day': 11, 'hour': 16, 'minute': 36, 'second': 1, 'millisecond': 936}}



In [108]:
date_to_parts_cursor = db.users.aggregate([
    {"$match": {"name": "Bob"}},
    {"$addFields": {"date_joined": "$$NOW"}},
    {"$project": {"_id": 0, "name": 1, "date_joined": 1, "date_joined_year": {"$year": "$date_joined"}}}
])
print_cursor(date_to_parts_cursor)

{'name': 'Bob', 'date_joined': datetime.datetime(2023, 3, 11, 16, 37, 21, 949000), 'date_joined_year': 2023}



### expr (operator)

In [115]:
#check if permium exp date is within seven days
# add_fields_cursor = db.users.aggregate([
#     {"$match": {"name": "Bob"}},
#     {"$addFields": {"date_joined": "$$NOW"}},
#     # date diff operator
#     {"$addFields":{"premium_exp_date": {
#          "$dateAdd": {"startDate": "$date_joined", "unit": "day", "amount": 7}
#     }}},
#     # I had to add fields to each entry
#     {"$addFields": {"premium_days_left": {
#         "$dateDiff": {"startDate": "$date_joined", "endDate": "$premium_exp_date", "unit": "day"}
#     }}},
#      {"$match": {"premium_days_left": {"$lt":10}}},
# ])
#
# print_cursor(add_fields_cursor)

# !! but instead use expr operation
print("with expression-----------------------------------------------")
add_fields_cursor = db.users.aggregate([
    {"$match": {"name": "Bob"}},
    {"$addFields": {"date_joined": "$$NOW"}},
    # date diff operator
    {"$addFields":{"premium_exp_date": {
         "$dateAdd": {"startDate": "$date_joined", "unit": "day", "amount": 7}
    }}},
    # datediff operation is injected it will be in ram
     {"$match":{ "$expr":
          {"$lt":[ {"$dateDiff": {"startDate": "$date_joined", "endDate": "$premium_exp_date", "unit": "day"}} , 10  ]  }  } }
])

print_cursor(add_fields_cursor)

with expression-----------------------------------------------
{'_id': ObjectId('640b4ff43c493ec195566194'), 'name': 'Bob', 'date_joined': datetime.datetime(2023, 3, 11, 16, 53, 8, 53000), 'premium_exp_date': datetime.datetime(2023, 3, 18, 16, 53, 8, 53000)}



### ifNull (operator)

### type (operator)

### switch (operator)