In [1]:
from pymongo import MongoClient
from bson.objectid import ObjectId
from bson.decimal128 import Decimal128
import json

In [2]:
class JSONEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, ObjectId) or isinstance(o, Decimal128):
            return str(o)
        return json.JSONEncoder.default(self, o)

## Data source

If you do not change the data uri (*course_cluster_uri*), you can execute most
of this notebook, however you will not be able to write to the database.

To execute successfully the pipelines with an $out/save stage in this notebook,
point to your own Atlas cluster into which you will have imported the *retail.csv* dataset.


In [3]:
# course_cluster_uri = "mongodb://agg-student:agg-password@cluster0-shard-00-00-jxeqq.mongodb.net:27017,cluster0-shard-00-01-jxeqq.mongodb.net:27017,cluster0-shard-00-02-jxeqq.mongodb.net:27017/test?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin"
course_cluster_uri = "mongodb://alexsnow348:wuthmone08@54.198.7.183:27017"
course_client = MongoClient(course_cluster_uri)

In [4]:
retail_col = course_client['eco']['retail']

In [5]:
assemble = {
    "$group": {
        "_id": {
            "InvoiceNo": "$InvoiceNo",
            "CustomerID": "$CustomerID",
            "Country": "$Country"
        },
        "InvoiceDate": { "$max": "$InvoiceDate" },
        "Items": {
            "$push": {
                "StockCode": "$StockCode",
                "Description": "$Description",
                "Quantity": "$Quantity",
                "UnitPrice": "$UnitPrice"
            }
        }
    }
}

In [6]:
beautify = {
    "$project": {
        "_id": "$_id.InvoiceNo",
        "InvoiceDate": "$InvoiceDate",
        "CustomerID": "$_id.CustomerID",
        "Country": "$_id.Country",
        "Items": 1
    }
}

In [7]:
cursor = retail_col.aggregate([
    assemble,
    beautify
  ],
  allowDiskUse=True)

In [None]:
retail_doc = cursor.next()

In [None]:
print(json.dumps(retail_doc, cls=JSONEncoder, indent=4))

In [8]:
computed = {
    "$addFields" : {
        "TotalPrice": {
            "$reduce": {
                "input": "$Items",
                "initialValue": Decimal128("0.00"),
                "in": {
                    "$add": [
                        "$$value",
                        { "$multiply": [ "$$this.Quantity", "$$this.UnitPrice" ] }
                    ]
                }
            }
        }
    }
}

In [9]:
cursor = retail_col.aggregate([
    assemble,
    beautify,
    computed
  ],
  allowDiskUse=True)

In [None]:
retail_doc = cursor.next()

In [None]:
print(json.dumps(retail_doc, cls=JSONEncoder, indent=4))

In [10]:
save = {
    "$out": "orders"
}

The following cell will **fail if you are not pointing** to your own Atlas group
where you have write privileges to the target collection

In [11]:
cursor = retail_col.aggregate([
    assemble,
    beautify,
    computed,
    save
  ],
  allowDiskUse=True)

In [15]:
assemble = {
    "$group": {
        "_id": {
            "InvoiceNo": "$InvoiceNo",
            "CustomerID": "$CustomerID",
            "Country": "$Country",
            "InvoiceDate": { "$max": "$InvoiceDate" },
        },
        "Items": {
            "$push": {
                "StockCode": "$StockCode",
                "Description": "$Description",
                "Quantity": "$Quantity",
                "UnitPrice": "$UnitPrice"
            }
        }
    }
}

The following cell will show the expected error message of trying to build
an index on *_id*, if you are pointing to your own Atlas cluster where you
have write privileges

In [16]:
cursor = retail_col.aggregate([
    assemble,
    beautify,
    computed,
    save
  ],
  allowDiskUse=True)

OperationFailure: insert for $out failed: { connectionId: 13, err: "E11000 duplicate key error collection: eco.tmp.agg_out.2 index: _id_ dup key: { : "536591" }", code: 11000, codeName: "DuplicateKey", n: 0, ok: 1.0 }

In [17]:
orders = course_client['eco']['orders']

In [18]:
orders.find_one()

{'_id': '544796',
 'Items': [{'StockCode': '85086A',
   'Description': 'CANDY SPOT HEART DECORATION',
   'Quantity': 6,
   'UnitPrice': Decimal128('0.85')}],
 'InvoiceDate': datetime.datetime(2011, 2, 23, 13, 37),
 'CustomerID': '16208',
 'Country': 'United Kingdom',
 'TotalPrice': Decimal128('5.10')}

In [24]:
pipeline_ = [
    {
        '$addFields': {
            'total_quantity': {
                '$sum': '$Items.Quantity'
            }, 
            'num_items': {
                '$size': '$Items'
            }
        }
    }, {
        '$limit': 1
    }
]

In [32]:
cursor = orders.aggregate(pipeline_)

In [33]:
list(cursor)

[{'_id': '544796',
  'Items': [{'StockCode': '85086A',
    'Description': 'CANDY SPOT HEART DECORATION',
    'Quantity': 6,
    'UnitPrice': Decimal128('0.85')}],
  'InvoiceDate': datetime.datetime(2011, 2, 23, 13, 37),
  'CustomerID': '16208',
  'Country': 'United Kingdom',
  'TotalPrice': Decimal128('5.10'),
  'total_quantity': 6,
  'num_items': 1}]