In [3]:
from pymongo import MongoClient
from bson.objectid import ObjectId
from bson.decimal128 import Decimal128
import json

In [4]:
class JSONEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, ObjectId) or isinstance(o, Decimal128):
            return str(o)
        return json.JSONEncoder.default(self, o)

## Data source

If you do not change the data uri (*course_cluster_uri*), you can execute most
of this notebook, however you will not be able to write to the database.

To execute successfully the pipelines with an $out/save stage in this notebook,
point to your own Atlas cluster into which you will have imported the *retail.csv* dataset.


In [5]:
course_cluster_uri = "mongodb://agg-student:agg-password@cluster0-shard-00-00-jxeqq.mongodb.net:27017,cluster0-shard-00-01-jxeqq.mongodb.net:27017,cluster0-shard-00-02-jxeqq.mongodb.net:27017/test?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin"
course_client = MongoClient(course_cluster_uri)

In [6]:
retail_col = course_client['coursera-agg']['retail']

In [7]:
assemble = {
    "$group": {
        "_id": {
            "InvoiceNo": "$InvoiceNo",
            "CustomerID": "$CustomerID",
            "Country": "$Country"
        },
        "InvoiceDate": { "$max": "$InvoiceDate" },
        "Items": {
            "$push": {
                "StockCode": "$StockCode",
                "Description": "$Description",
                "Quantity": "$Quantity",
                "UnitPrice": "$UnitPrice"
            }
        }
    }
}

In [8]:
beautify = {
    "$project": {
        "_id": "$_id.InvoiceNo",
        "InvoiceDate": "$_id.InvoiceDate",
        "CustomerID": "$_id.CustomerID",
        "Country": "$_id.Country",
        "Items": 1
    }
}

In [9]:
cursor = retail_col.aggregate([
    assemble,
    beautify
  ],
  allowDiskUse=True)

In [10]:
retail_doc = cursor.next()

In [11]:
print(json.dumps(retail_doc, cls=JSONEncoder, indent=4))

{
    "Items": [
        {
            "StockCode": "71053",
            "Description": "WHITE METAL LANTERN",
            "Quantity": 6,
            "UnitPrice": "3.39"
        },
        {
            "StockCode": "85123A",
            "Description": "WHITE HANGING HEART T-LIGHT HOLDER",
            "Quantity": 6,
            "UnitPrice": "2.55"
        },
        {
            "StockCode": "21730",
            "Description": "GLASS STAR FROSTED T-LIGHT HOLDER",
            "Quantity": 6,
            "UnitPrice": "4.25"
        },
        {
            "StockCode": "84406B",
            "Description": "CREAM CUPID HEARTS COAT HANGER",
            "Quantity": 8,
            "UnitPrice": "2.75"
        },
        {
            "StockCode": "22752",
            "Description": "SET 7 BABUSHKA NESTING BOXES",
            "Quantity": 2,
            "UnitPrice": "7.65"
        },
        {
            "StockCode": "84029G",
            "Description": "KNITTED UNION FLAG HOT WATER BOTTLE",
 

In [12]:
computed = {
    "$addFields" : {
        "TotalPrice": {
            "$reduce": {
                "input": "$Items",
                "initialValue": Decimal128("0.00"),
                "in": {
                    "$add": [
                        "$$value",
                        { "$multiply": [ "$$this.Quantity", "$$this.UnitPrice" ] }
                    ]
                }
            }
        }
    }
}

In [13]:
cursor = retail_col.aggregate([
    assemble,
    beautify,
    computed
  ],
  allowDiskUse=True)

In [14]:
retail_doc = cursor.next()

In [15]:
print(json.dumps(retail_doc, cls=JSONEncoder, indent=4))

{
    "Items": [
        {
            "StockCode": "71053",
            "Description": "WHITE METAL LANTERN",
            "Quantity": 6,
            "UnitPrice": "3.39"
        },
        {
            "StockCode": "85123A",
            "Description": "WHITE HANGING HEART T-LIGHT HOLDER",
            "Quantity": 6,
            "UnitPrice": "2.55"
        },
        {
            "StockCode": "21730",
            "Description": "GLASS STAR FROSTED T-LIGHT HOLDER",
            "Quantity": 6,
            "UnitPrice": "4.25"
        },
        {
            "StockCode": "84406B",
            "Description": "CREAM CUPID HEARTS COAT HANGER",
            "Quantity": 8,
            "UnitPrice": "2.75"
        },
        {
            "StockCode": "22752",
            "Description": "SET 7 BABUSHKA NESTING BOXES",
            "Quantity": 2,
            "UnitPrice": "7.65"
        },
        {
            "StockCode": "84029G",
            "Description": "KNITTED UNION FLAG HOT WATER BOTTLE",
 

In [21]:
save = {
    "$out": "orders_new"
}


The following cell will **fail if you are not pointing** to your own Atlas group
where you have write privileges to the target collection

In [22]:
cursor = retail_col.aggregate([
    assemble,
    beautify,
    computed,
    save
  ],
  allowDiskUse=True)

OperationFailure: not authorized on coursera-agg to execute command { aggregate: "retail", pipeline: [ { $group: { _id: { InvoiceNo: "$InvoiceNo", CustomerID: "$CustomerID", Country: "$Country", InvoiceDate: { $max: "$InvoiceDate" } }, Items: { $push: { StockCode: "$StockCode", Description: "$Description", Quantity: "$Quantity", UnitPrice: "$UnitPrice" } } } }, { $project: { _id: "$_id.InvoiceNo", InvoiceDate: "$_id.InvoiceDate", CustomerID: "$_id.CustomerID", Country: "$_id.Country", Items: 1 } }, { $addFields: { TotalPrice: { $reduce: { input: "$Items", initialValue: 0.00, in: { $add: [ "$$value", { $multiply: [ "$$this.Quantity", "$$this.UnitPrice" ] } ] } } } } }, { $out: "orders_new" } ], allowDiskUse: true, cursor: {}, lsid: { id: UUID("f6d15ac7-3bb6-49cd-af43-585dedfe58ae") }, $clusterTime: { clusterTime: Timestamp(1709439827, 1), signature: { hash: BinData(0, 4275B6C864B27FA2F126FDDF84E8E64CFEC2CE9F), keyId: 7298383946943823874 } }, $db: "coursera-agg" }, full error: {'operationTime': Timestamp(1709439827, 1), 'ok': 0.0, 'errmsg': 'not authorized on coursera-agg to execute command { aggregate: "retail", pipeline: [ { $group: { _id: { InvoiceNo: "$InvoiceNo", CustomerID: "$CustomerID", Country: "$Country", InvoiceDate: { $max: "$InvoiceDate" } }, Items: { $push: { StockCode: "$StockCode", Description: "$Description", Quantity: "$Quantity", UnitPrice: "$UnitPrice" } } } }, { $project: { _id: "$_id.InvoiceNo", InvoiceDate: "$_id.InvoiceDate", CustomerID: "$_id.CustomerID", Country: "$_id.Country", Items: 1 } }, { $addFields: { TotalPrice: { $reduce: { input: "$Items", initialValue: 0.00, in: { $add: [ "$$value", { $multiply: [ "$$this.Quantity", "$$this.UnitPrice" ] } ] } } } } }, { $out: "orders_new" } ], allowDiskUse: true, cursor: {}, lsid: { id: UUID("f6d15ac7-3bb6-49cd-af43-585dedfe58ae") }, $clusterTime: { clusterTime: Timestamp(1709439827, 1), signature: { hash: BinData(0, 4275B6C864B27FA2F126FDDF84E8E64CFEC2CE9F), keyId: 7298383946943823874 } }, $db: "coursera-agg" }', 'code': 13, 'codeName': 'Unauthorized', '$clusterTime': {'clusterTime': Timestamp(1709439827, 1), 'signature': {'hash': b'Bu\xb6\xc8d\xb2\x7f\xa2\xf1&\xfd\xdf\x84\xe8\xe6L\xfe\xc2\xce\x9f', 'keyId': 7298383946943823874}}}

In [19]:
assemble = {
    "$group": {
        "_id": {
            "InvoiceNo": "$InvoiceNo",
            "CustomerID": "$CustomerID",
            "Country": "$Country",
            "InvoiceDate": { "$max": "$InvoiceDate" },
        },
        "Items": {
            "$push": {
                "StockCode": "$StockCode",
                "Description": "$Description",
                "Quantity": "$Quantity",
                "UnitPrice": "$UnitPrice"
            }
        }
    }
}

The following cell will show the expected error message of trying to build
an index on *_id*, if you are pointing to your own Atlas cluster where you
have write privileges

In [20]:
cursor = retail_col.aggregate([
    assemble,
    beautify,
    computed,
    save
  ],
  allowDiskUse=True)

OperationFailure: not authorized on coursera-agg to execute command { aggregate: "retail", pipeline: [ { $group: { _id: { InvoiceNo: "$InvoiceNo", CustomerID: "$CustomerID", Country: "$Country", InvoiceDate: { $max: "$InvoiceDate" } }, Items: { $push: { StockCode: "$StockCode", Description: "$Description", Quantity: "$Quantity", UnitPrice: "$UnitPrice" } } } }, { $project: { _id: "$_id.InvoiceNo", InvoiceDate: "$_id.InvoiceDate", CustomerID: "$_id.CustomerID", Country: "$_id.Country", Items: 1 } }, { $addFields: { TotalPrice: { $reduce: { input: "$Items", initialValue: 0.00, in: { $add: [ "$$value", { $multiply: [ "$$this.Quantity", "$$this.UnitPrice" ] } ] } } } } }, { $out: "orders_new" } ], allowDiskUse: true, cursor: {}, lsid: { id: UUID("f6d15ac7-3bb6-49cd-af43-585dedfe58ae") }, $clusterTime: { clusterTime: Timestamp(1709439807, 1), signature: { hash: BinData(0, C7A7D37C58F66DD481962CC626BF7DD6C78D34CF), keyId: 7298383946943823874 } }, $db: "coursera-agg" }, full error: {'operationTime': Timestamp(1709439817, 1), 'ok': 0.0, 'errmsg': 'not authorized on coursera-agg to execute command { aggregate: "retail", pipeline: [ { $group: { _id: { InvoiceNo: "$InvoiceNo", CustomerID: "$CustomerID", Country: "$Country", InvoiceDate: { $max: "$InvoiceDate" } }, Items: { $push: { StockCode: "$StockCode", Description: "$Description", Quantity: "$Quantity", UnitPrice: "$UnitPrice" } } } }, { $project: { _id: "$_id.InvoiceNo", InvoiceDate: "$_id.InvoiceDate", CustomerID: "$_id.CustomerID", Country: "$_id.Country", Items: 1 } }, { $addFields: { TotalPrice: { $reduce: { input: "$Items", initialValue: 0.00, in: { $add: [ "$$value", { $multiply: [ "$$this.Quantity", "$$this.UnitPrice" ] } ] } } } } }, { $out: "orders_new" } ], allowDiskUse: true, cursor: {}, lsid: { id: UUID("f6d15ac7-3bb6-49cd-af43-585dedfe58ae") }, $clusterTime: { clusterTime: Timestamp(1709439807, 1), signature: { hash: BinData(0, C7A7D37C58F66DD481962CC626BF7DD6C78D34CF), keyId: 7298383946943823874 } }, $db: "coursera-agg" }', 'code': 13, 'codeName': 'Unauthorized', '$clusterTime': {'clusterTime': Timestamp(1709439817, 1), 'signature': {'hash': b'\x8f\x0f\xbaR\xe8}\xe0\x8c\x17\xb1e\xc7I\x04\\?\x11\x8b\xef6', 'keyId': 7298383946943823874}}}