# DATA 304 – Module 6, Session 2

## MongoDB with Python (PyMongo): Documents, Queries, Aggregations, Joins
**Goal:** Work with a MongoDB collection from Python. Insert JSON-like documents, run queries, use the aggregation pipeline, and integrate with pandas.

**What you'll learn:**
- Connect to MongoDB (local or cloud)
- Insert and query documents
- Use projection, sorting, and operators ($gt, $in, etc.)
- Build aggregation pipelines ($match, $group, $project, $unwind, $lookup)
- Create indexes and view query plans
- Load results into pandas

**Prereqs:** Python 3, `pandas`. For database access you need **either** a local MongoDB server (e.g., Docker) **or** cloud instance (MongoDB Atlas).

## 0) Setup
If `pymongo` or `mongomock` are missing, uncomment the `pip install` lines.

In [1]:
# If needed, install packages (uncomment if missing in your environment)
# !pip install pymongo mongomock pandas --quiet
import os, json
import pandas as pd
print('pandas', pd.__version__)

pandas 2.2.2


## 1) Connect to MongoDB (with graceful fallback)
The code tries in this order:
1. Real MongoDB at `mongodb://localhost:27017/`
2. `MONGODB_URI` environment variable if set
3. `mongomock` in-memory server for practice

In [2]:
from pprint import pprint
client = None
db = None
using_mock = False

try:
    from pymongo import MongoClient
    uri = os.environ.get('MONGODB_URI', 'mongodb://localhost:27017/')
    client = MongoClient(uri, serverSelectionTimeoutMS=1000)
    _ = client.server_info()  # force connection
    db = client['data304_m6']
    print('Connected to MongoDB at', uri)
except Exception as e:
    print('Real MongoDB not available -> trying mongomock. Error was:', str(e))
    try:
        import mongomock
        client = mongomock.MongoClient()
        db = client['data304_m6']
        using_mock = True
        print('Using mongomock (in-memory MongoDB emulator).')
    except Exception as ee:
        raise RuntimeError('Neither real MongoDB nor mongomock available. Install pymongo or mongomock.') from ee

Real MongoDB not available -> trying mongomock. Error was: localhost:27017: [Errno 111] Connection refused (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms), Timeout: 1.0s, Topology Description: <TopologyDescription id: 68dbeb79e6d83968ade37a8b, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [Errno 111] Connection refused (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>]>
Using mongomock (in-memory MongoDB emulator).


## 2) Create sample collections and insert documents
We will create three collections: `users`, `products`, and `orders`.

In [3]:
users = db['users']
products = db['products']
orders = db['orders']

# Clean slate for reruns
users.delete_many({})
products.delete_many({})
orders.delete_many({})

users.insert_many([
    {"_id": 1, "name": "Alice", "age": 29, "email": "alice@example.com", "tags": ["python", "sql"], "address": {"city": "Knoxville", "state": "TN"}},
    {"_id": 2, "name": "Bob",   "age": 35, "email": "bob@example.com",   "tags": ["marketing"],        "address": {"city": "Nashville", "state": "TN"}},
    {"_id": 3, "name": "Carla", "age": 41, "email": "carla@example.com", "tags": ["python", "ml"],     "address": {"city": "Atlanta",   "state": "GA"}},
])

products.insert_many([
    {"_id": 101, "sku": "ABC-001", "name": "Widget A", "category": "widgets", "price": 19.99},
    {"_id": 102, "sku": "DEF-002", "name": "Widget B", "category": "widgets", "price": 29.99},
    {"_id": 201, "sku": "GHI-010", "name": "Gadget X", "category": "gadgets", "price": 49.99}
])

orders.insert_many([
    {"_id": 1001, "user_id": 1, "status": "A", "items": [{"product_id": 101, "qty": 2}, {"product_id": 201, "qty": 1}], "amount": 89.97},
    {"_id": 1002, "user_id": 2, "status": "P", "items": [{"product_id": 102, "qty": 1}],                       "amount": 29.99},
    {"_id": 1003, "user_id": 1, "status": "A", "items": [{"product_id": 102, "qty": 3}],                       "amount": 89.97}
])
print('Seeded users/products/orders.')

Seeded users/products/orders.


## 3) Basic `find` queries
Filter, projection, sort.

In [4]:
# All users aged > 30, project name and age only, sort by age desc
cur = users.find(
    {"age": {"$gt": 30}}, 
    {"_id": 0, "name": 1, "age": 1}).sort("age", -1
)

In [5]:
print(cur)

<mongomock.collection.Cursor object at 0x74d80a4beff0>


In [6]:
for doc in cur:
    print(doc)

{'name': 'Carla', 'age': 41}
{'name': 'Bob', 'age': 35}


In [7]:
list(cur)

[]

In [8]:
cur = users.find(
    {"age": {"$gt": 30}}, 
    {"_id": 0, "name": 1, "age": 1}).sort("age", -1
)
list(cur)

[{'name': 'Carla', 'age': 41}, {'name': 'Bob', 'age': 35}]

In [9]:
# Users with tag 'python'
list(
    users.find(
        {"tags": "python"}, 
        {"_id": 0, "name": 1, "tags": 1}
    )
)

[{'name': 'Alice', 'tags': ['python', 'sql']},
 {'name': 'Carla', 'tags': ['python', 'ml']}]

In [10]:
# Users in Tennessee (nested field match)
list(
    users.find(
        {"address.state": "TN"}, 
        {"_id": 0, "name": 1, "address": 1}
    )
)

[{'name': 'Alice', 'address': {'city': 'Knoxville', 'state': 'TN'}},
 {'name': 'Bob', 'address': {'city': 'Nashville', 'state': 'TN'}}]

## 4) Aggregation pipeline basics
Use `$match`, `$group`, `$project`.

In [11]:
pipeline = [
    {"$match": 
            {"status": "A"}
    },
    {"$group": 
        {
            "_id": "$user_id", 
            "orders": {"$sum": 1}, 
            "total": {"$sum": "$amount"}
        }
    },
    {"$project": 
        {
            "_id": 0, 
            "user_id": "$_id", 
            "orders": 1, 
            "total": 1
        }
    }
]
list(orders.aggregate(pipeline))

[{'orders': 2, 'total': 179.94, 'user_id': 1}]

## 5) `$unwind` and `$lookup` (join-like)
Explode arrays and join with `products` to compute totals per product.

In [12]:
pipeline = [
    {"$unwind": "$items"},
    {"$lookup": {"from": "products", "localField": "items.product_id", "foreignField": "_id", "as": "prod"}},
    {"$unwind": "$prod"},
    {"$group": {"_id": "$prod.sku", "qty": {"$sum": "$items.qty"}, "revenue": {"$sum": {"$multiply": ["$items.qty", "$prod.price"]}}}},
    {"$project": {"_id": 0, "sku": "$_id", "qty": 1, "revenue": 1}},
    {"$sort": {"revenue": -1}}
]
list(orders.aggregate(pipeline))

[{'qty': 4, 'revenue': 119.96, 'sku': 'DEF-002'},
 {'qty': 1, 'revenue': 49.99, 'sku': 'GHI-010'},
 {'qty': 2, 'revenue': 39.98, 'sku': 'ABC-001'}]

SQL Equilavent:  

```sql
SELECT p.sku,  
    SUM(i.qty) AS qty,  
    SUM(i.qty * p.price) AS revenue  
FROM orders o  
JOIN order_items i ON o.id = i.order_id  
JOIN products p ON i.product_id = p.id  
GROUP BY p.sku  
ORDER BY revenue DESC;  
```

## 6) Indexes and explain
Create an index and review an example query plan.

In [13]:
try:
    users.create_index([('age', 1)])
    plan = users.find({"age": {"$gt": 30}}).explain()
    # Show selected parts of the plan for brevity
    plan_keys = ['queryPlanner']
    {k: plan.get(k) for k in plan_keys}
except Exception as e:
    print('Explain may be limited in mongomock:', e)

Explain may be limited in mongomock: 'Cursor' object has no attribute 'explain'


## 7) Pandas integration
Convert cursors and aggregation results to DataFrames.

In [14]:
df_users = pd.DataFrame(list(users.find({}, {'_id': 0})))
df_users

Unnamed: 0,name,age,email,tags,address
0,Alice,29,alice@example.com,"[python, sql]","{'city': 'Knoxville', 'state': 'TN'}"
1,Bob,35,bob@example.com,[marketing],"{'city': 'Nashville', 'state': 'TN'}"
2,Carla,41,carla@example.com,"[python, ml]","{'city': 'Atlanta', 'state': 'GA'}"


In [15]:
df_rev = pd.DataFrame(list(orders.aggregate([
    {"$unwind": "$items"},
    {"$lookup": {"from": "products", "localField": "items.product_id", "foreignField": "_id", "as": "prod"}},
    {"$unwind": "$prod"},
    {"$group": {"_id": "$prod.category", "revenue": {"$sum": {"$multiply": ["$items.qty", "$prod.price"]}}}},
    {"$project": {"_id": 0, "category": "$_id", "revenue": 1}}
])))
df_rev

Unnamed: 0,revenue,category
0,49.99,gadgets
1,159.94,widgets


## 8) Parameterized-style filters
Build queries safely by constructing dicts.

In [16]:
def get_users_by_states(states):
    q = {"address.state": {"$in": list(states)}}
    proj = {"_id": 0, "name": 1, "address": 1}
    return list(users.find(q, proj))

get_users_by_states(["TN"])

[{'name': 'Alice', 'address': {'city': 'Knoxville', 'state': 'TN'}},
 {'name': 'Bob', 'address': {'city': 'Nashville', 'state': 'TN'}}]

## 9) Mini-Exercises
Write queries directly in the cells below.

Q1: Find orders with amount >= 50, return _id and amount, sorted by amount desc

In [17]:
list(orders.find({"amount": {"$gte": 50}}, {"_id": 1, "amount": 1}).sort("amount", -1))

[{'amount': 89.97, '_id': 1001}, {'amount': 89.97, '_id': 1003}]

Q2: Aggregation: total amount per status, sorted by total desc

In [18]:
list(orders.aggregate([
    {"$group": {"_id": "$status", "total": {"$sum": "$amount"}}},
    {"$project": {"_id": 0, "status": "$_id", "total": 1}},
    {"$sort": {"total": -1}}
]))

[{'total': 179.94, 'status': 'A'}, {'total': 29.99, 'status': 'P'}]

Q3: Using $lookup, list each order with user name

In [19]:
list(orders.aggregate([
    {"$lookup": {"from": "users", "localField": "user_id", "foreignField": "_id", "as": "u"}},
    {"$unwind": "$u"},
    {"$project": {"_id": 1, "user": "$u.name", "amount": 1, "status": 1}}
]))

[{'_id': 1001, 'status': 'A', 'amount': 89.97, 'user': 'Alice'},
 {'_id': 1002, 'status': 'P', 'amount': 29.99, 'user': 'Bob'},
 {'_id': 1003, 'status': 'A', 'amount': 89.97, 'user': 'Alice'}]

## 10) Cleanup (optional)

In [20]:
# Uncomment to drop the database when done
client.drop_database('data304_m6')
print('Done.')

Done.


---
**End of Session 2 Notebook**

Tips:
- To use a real server locally, run MongoDB in Docker:
  `docker run -d --name mdb -p 27017:27017 mongo:7`
- Or set `MONGODB_URI` to a MongoDB Atlas connection string.
- Then rerun the connection cell.