<img src="https://s3.amazonaws.com/edu-static.mongodb.com/lessons/M220/notebook_assets/screen_align.png" style="margin: 0 auto;">


<h1 style="text-align: center; font-size=58px;">Change Streams</h1>

In this lesson, we're going to use change streams to track real-time changes to the data that our application's using.

### Change Streams

- Report changes at the collection level
- Accept pipelines to transform change events

As of MongoDB 3.6, change streams report changes at the collection level, so we open a change stream against a specific collection.

But by default it will return any change to the data in that collection regardless of what it is, so we can also pass a pipeline to transform the change events we get back from the stream.

In [None]:
from pymongo import MongoClient, errors
uri = "mongodb+srv://m220-user:m220-pass@m220-lessons-mcxlm.mongodb.net/test"
client = MongoClient(uri)

So here I'm just initializing my MongoClient object,

In [None]:
lessons = client.lessons
inventory = lessons.inventory
inventory.drop()

fruits = [ "strawberries", "bananas", "apples" ]
for fruit in fruits:
    inventory.insert_one( { "type": fruit, "quantity": 100 } )
    
list(inventory.find())

And I'm using a new collection for this lesson, `inventory`. If you imagine we have a store that sells fruits, this collection will store the total quanities of every fruit that we have in stock.

In this case, we have a very small store that only sells three types of fruits, and I've just updated the inventory to reflect that we just got a shipment for 100 of each fruit.

Now I'm just going to verify that our collection looks the way we expect.

(run cell)

And it looks like we have 100 of each fruit in the collection.

But people will start buying them, cause you know, people like fruit. They'll go pretty quickly, and we want to make sure we don't run out. So I'm going to open a change stream against this collection, and track data changes to the `inventory` collection in real time.

In [None]:
try:
    with inventory.watch(full_document='updateLookup') as change_stream_cursor:
        for data_change in change_stream_cursor:
            print(data_change)
except pymongo.errors.PyMongoError:
    print('Change stream closed because of an error.')

So here I'm opening a change stream against the `inventory` (point) collection, using the `watch()` method. `watch()` (point) returns a cursor object, so we can iterate through it in Python to return whatever document is next in the cursor.

We've wrapped this in a try-catch block so if something happens to the connection used for the change stream, we'll know immediately.

(start the while loop)

(go to `updates_every_one_second` notebook and start up process)

(come back here)

So the change stream cursor is just gonna spit out anything it gets, with no filter. Any change to the data in the `inventory` collection will appear in this output.

But really, this is noise. We don't care when the quantity drops to 71 (point) or 60 (point), we only want to know when it's close to zero.

In [None]:
low_quantity_pipeline = [ { "$match": { "fullDocument.quantity": { "$lt": 20 } } } ]

try:
    with inventory.watch(pipeline=low_quantity_pipeline, full_document='updateLookup') as change_stream_cursor:
        for data_change in change_stream_cursor:
            current_quantity = data_change["fullDocument"].get("quantity")
            fruit = data_change["fullDocument"].get("type")
            msg = "There are only {0} units left of {1}!".format(current_quantity, fruit)
            print(msg)
except pymongo.errors.PyMongoError:
    logging.error('Change stream closed because of an error.')

Let's say we want to know if any of our quantities (point to quantity values) dip below 20 units, so we know when to buy more.

Here I've defined a pipeline for the change event documents returned by the cursor. In this case, if the cursor returns a change event to me, it's because that event caused one of our quantities to fall below 10 units.

(open the change stream)

(go to `updates_every_one_second` and start the third cell)

(come back here)

So if we just wait for the customers to go about their business...

(wait for a print statement)

And now we know that we need to buy more strawberries!

## Summary

- Change streams can be opened against a collection
    - Tracks data changes in real time
- Aggregation pipelines can be used to transform change event documents

So change streams are a great way to track changes to the data in a collection. And if you're using Mongo 4.0, you can open a change stream against a whole database, and even a whole cluster.

We also have the flexibility to pass an aggregation pipeline to the change stream, to transform or filter out some of the change event documents.