# MongoDB Hands-On Workbook: VCF, bcftools, and MongoDB

Use this notebook to follow the interactive exercises for the workshop.


## Learning Objectives

At the end of this session, the students will be able to:
- **Explain** RBAC model on MongoDB
- **Understand** MongoDB CRUD operations:
  - Create
  - Read
  - Update
  - Delete
- **Explore** the profvided dataset schema with python
- **Perform** queries to the provided dataset with python
- **Explain** how aggregation works


## 0) Setup: dependencies and imports

This notebook requires a few Python packages for working with MongoDB and VCF files. In the GitHub Codespace used for the workshop these are preinstalled.

- `pymongo` — the official MongoDB Python driver (used to connect, run CRUD and aggregation operations).
- `vcfpy` — VCF parser/reader for loading variant call format (VCF) files into Python.
- `dnspython` — used by some MongoDB SRV connection strings to resolve DNS SRV/TXT records.
- `pathlib` / standard library — used for file path handling.

The next code cell performs simple imports and prints the Python version.

In [5]:
import sys
print('Python:', sys.version)
# Dependencies are provided by the GitHub Codespace environment
from pymongo import MongoClient, ASCENDING
import os, vcfpy
from dotenv import load_dotenv
from os import getenv
import pprint

Python: 3.13.5 (main, Jun 11 2025, 15:36:57) [Clang 17.0.0 (clang-1700.0.13.3)]


# Load .env if it exists and read MONGO_URI (don't echo secret).

- Obtain your connection string from the [portal](abds.hpcs.stjude.org)(SSO). What you will get:
  - Username
  - Password
  - Authentication Database
  - **Connection String**: All of above combined, along with the server address
- Set it as the environment variable `MONGO_URI` or create an env file, or paste interactively

```
# .env
MONGO_URI='mongodb+srv://xxxxxxxxxxxxxx'
```

```mermaid
sequenceDiagram
    autonumber
    participant App as Python Client (PyMongo)
    participant DNS as DNS (SRV + TXT)
    participant LB as Atlas Cluster Endpoint
    participant Pri as MongoDB Primary
    participant Sec as MongoDB Secondary

    App->>DNS: Lookup _mongodb._tcp.<cluster>.mongodb.net
    DNS-->>App: SRV hosts/ports + TXT options
    App->>LB: TLS handshake (SNI + cert verify)
    LB-->>App: TLS session established
    App->>Pri: Authenticate (SCRAM-SHA-256)
    Pri-->>App: Auth OK (serverHandshake)
    App->>Pri: CRUD (find/insert/update/delete)
    Pri-->>App: Cursor/results
    Note over Pri,Sec: Replication to secondaries (oplog)
    App->>Sec: Reads (if readPreference allows)
    Sec-->>App: Cursor/results
    App-->>LB: Heartbeats & topology discovery (hello)
    Note over App: Connection pool manages sockets, retries, timeouts
```


In [6]:
# load .env if exists
load_dotenv()
MONGO_URI = os.getenv('MONGO_URI') or input("Please set MONGO_URI to your connection string:")
assert MONGO_URI and MONGO_URI.startswith('mongodb'), 'Please check your MONGO_URI format'
client = MongoClient(MONGO_URI)
db_names = client.list_database_names()
print('Databases:', db_names)
assert 'public' in db_names, 'Expected to see a public database.'

Databases: ['public', 'zziang_db']


Databases: ['public', 'zziang_db']


Exception in thread Thread-7 (bg_main):
Traceback (most recent call last):
  File [35m"/var/folders/fx/vjy4z31s5hbbz4wq3sxs2n4m0000gp/T/ipykernel_18072/3046598929.py"[0m, line [35m17[0m, in [35mbg_main[0m
    [31moutput.update[0m[1;31m({"application/vnd.vscode.bg.execution.3.result": do_implementation()}, raw=True)[0m
    [31m~~~~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
  File [35m"/Users/zziang/Documents/projects/abds_nosql_course/.venv/lib/python3.13/site-packages/IPython/core/display_functions.py"[0m, line [35m354[0m, in [35mupdate[0m
    [31mupdate_display[0m[1;31m(obj, display_id=self.display_id, **kwargs)[0m
    [31m~~~~~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
  File [35m"/Users/zziang/Documents/projects/abds_nosql_course/.venv/lib/python3.13/site-packages/IPython/core/display_functions.py"[0m, line [35m306[0m, in [35mupdate_display[0m
    [31mdisplay[0m[1;3

In [7]:
# Your account were given rule:
#    personalDbName = {name}_db
#    roles = [
#      { role: 'read', db: 'public' },
#      { role: 'dbAdmin', db: personalDbName },
#      { role: 'readWrite', db: personalDbName }
#    ]
admin_db = client['admin']
status = admin_db.command('connectionStatus')
roles = status['authInfo']['authenticatedUserRoles']
print(roles)

[{'role': 'read', 'db': 'public'}, {'role': 'dbAdmin', 'db': 'zziang_db'}, {'role': 'readWrite', 'db': 'zziang_db'}]


Check what MongoDB roles mean [here](https://www.mongodb.com/docs/manual/reference/built-in-roles/)

### MongoDB data model quick reminder

MongoDB organizes data in a simple hierarchy:

- __Database__: a logical namespace for collections. In this workshop you'll see a shared `public` database and a personal database (e.g., `yourname_db`).
  - __Collection__: a grouping of related documents (similar to a SQL table). Example: the `variants` collection holds documents derived from VCF records. Access a collection with `db['variants']`.
    - __Document__: a JSON-like object (BSON) stored in a collection. Example document fields from a VCF ingestion: `{'chrom': '1', 'pos': 12345, 'ref': 'A', 'alt': ['T'], 'qual': 50.0, 'filter': 'PASS', 'info': {...}}`.
      - `_id`: the unique identifier for a document within its collection. If you don't provide `_id`, MongoDB automatically creates an ObjectId. You may also choose another field as `_id` if it uniquely identifies documents in that collection.
    - __Index__: a data structure that speeds up queries on one or more fields (at the cost of additional storage and write overhead).

In code: `client = MongoClient(MONGO_URI)`  ->  `db = client['<db_name>']`  ->  `coll = db['<collection>']`  ->  `coll.find_one()`

## Exercise 1 — Explore the `public` dataset (read-only)

- List collections
- Inspect a sample document, counts, indexes
- Distinct values for a field (e.g., `filter`)
- Try a forbidden operation to observe RBAC behavior

In [8]:
# get the public database
db_public = client['public']
names = db_public.list_collection_names()
print('Collections in public:', names)

# list number of documents at each collection
for name in names:
    col = db_public[name]
    num = col.count_documents({})
    print(f"{name} Collection got {num} documents")

Collections in public: ['reference', 'example']
reference Collection got 0 documents
example Collection got 2 documents
example Collection got 2 documents


In [9]:
# RBAC demo: this should fail on public (read-only)
coll = db_public['example']
if coll is not None:
    try:
        coll.delete_one({})
        print('Unexpected: delete succeeded (RBAC misconfigured?)')
    except Exception as e:
        print('Expected RBAC error:', e)

Expected RBAC error: user is not allowed to do action [remove] on [public.example], full error: {'ok': 0, 'errmsg': 'user is not allowed to do action [remove] on [public.example]', 'code': 8000, 'codeName': 'AtlasError'}


## Exercise 2 — Parse `sample.vcf` and ingest into your personal DB

- Choose your personal DB name (or set `DB_NAME` env)
- Parse VCF with vcfpy
- Insert into `variants` and create indexes

In [1]:
# load VCF doc from `public.example` collection (adjust collection name if different)
col = db_public.get_collection('example')
vcf_doc = col.find_one({'_id': 'sample.vcf'})
print('VCF Document:')
pprint.pprint(vcf_doc)

NameError: name 'db_public' is not defined

In [None]:

import io
reader = None
if vcf_doc:
    vcf_content = vcf_doc.get('content')
    if isinstance(vcf_content, (bytes, bytearray)):
        vcf_content = vcf_content.decode('utf-8')
    # create a vcfpy Reader from the in-memory string
    reader = vcfpy.Reader(io.StringIO(vcf_content))
    print('Loaded VCF content from database')
else:
    # fallback: try reading a local sample.vcf file path
    local_vcf_path = 'course_material/2-mongodb-workshop/sample.vcf'
    try:
        reader = vcfpy.Reader.from_path(local_vcf_path)
        print(f'Loaded VCF from local path: {local_vcf_path}')
    except Exception:
        print('VCF document not found in database and local fallback failed.')
        raise

# get personal DB name (safer checks)
DBs = client.list_database_names()
person_db = [db_name for db_name in DBs if db_name.endswith('_db')]
if not person_db:
    raise RuntimeError('No personal database found (expected one ending with ). Set DB_NAME env or ensure your account has a personal DB.')
if len(person_db) > 1:
    print('Multiple personal DBs found; using the first one:', person_db)
DB_NAME = person_db[0]
print('Using personal database:', DB_NAME)

db = client[DB_NAME]
variants = db['variants']

# Optional: clear previous runs (disabled by default)
CLEAR_PREVIOUS = True
if CLEAR_PREVIOUS:
    variants.delete_many({})
    print('Cleared variants collection.')
# Parse VCF and prepare docs
docs = []
for rec in reader:
    alts = [str(a.value) for a in rec.ALT]
    info = {k: v for k, v in rec.INFO.items()}
    doc = {
        'chrom': rec.CHROM,
        'pos': rec.POS,
        'id': rec.ID,
        'ref': rec.REF,
        'alt': alts,
        'qual': float(rec.QUAL) if rec.QUAL is not None else None,
        'filter': 'PASS' if rec.FILTER is None or len(rec.FILTER) == 0 else ';'.join(rec.FILTER),
        'info': info,
    }
    docs.append(doc)

# Insert with basic error handling
if docs:
    try:
        variants.insert_many(docs, ordered=False)
        # ensure needed indexes
        variants.create_index([('chrom', ASCENDING), ('pos', ASCENDING)])
        variants.create_index([('filter', ASCENDING)])
        variants.create_index([('qual', ASCENDING)])
        print('Inserted documents (this run):', len(docs))
    except Exception as e:
        print('Insert failed:', e)
        raise
else:
    print('No documents parsed from VCF; nothing inserted.')

Loaded VCF content from database
Using personal database: zziang_db
Cleared variants collection.
Cleared variants collection.
Inserted documents (this run): 10
Inserted documents (this run): 10


## Operation breakdown

Summary of actions performed in the ingestion exercise:

- Query a document from the `public` database (e.g., `public.example`) by its `_id`. The `_id` field is unique within a collection and is the recommended way to retrieve a single document.
- Parse the VCF file content (which may be stored as a string or bytes in the document) using `vcfpy.Reader` into records that can be transformed into plain Python dicts.
- Transform each VCF record into a MongoDB document with fields such as `chrom`, `pos`, `id`, `ref`, `alt`, `qual`, `filter`, and `info` and collect them for insertion.
- Insert parsed documents into the `variants` collection using `insert_many()` (consider de-duplication or `update/upsert` if you run this multiple times).
- Create indexes to speed lookups and sorts. Example: `variants.create_index([("chrom", ASCENDING), ("pos", ASCENDING)])`. Indexing helps queries like range scans or sorted results be efficient.

Safety and best practices:
- Avoid unconditional destructive operations like `variants.delete_many({})` in shared or persistent databases; require an explicit confirmation or a configuration flag before clearing data.
- When inserting many documents, handle duplicate-key or bulk write errors with try/except and consider `ordered=False` to continue on non-fatal errors.
- Add indexes after bulk inserts for faster ingestion, or plan accordingly for your write workload and index maintenance costs.

In [11]:
# Demo: _id uniqueness and DuplicateKeyError
from pymongo.errors import DuplicateKeyError
# Use a temporary collection for demonstration so we don't affect 'variants'
db = client[DB_NAME]
demo_coll = db.get_collection('demo_id_demo')
# Clean up any previous demo content
demo_coll.delete_many({})

# 1) Insert a document without _id -> MongoDB will generate an ObjectId
doc = {'name': 'example1', 'value': 123}
res = demo_coll.insert_one(doc)
print('Inserted document with auto generated _id:', res.inserted_id)

# 2) Insert a document with an explicit _id
explicit = {'_id': 'duplicated_id', 'name': 'example2'}
demo_coll.insert_one(explicit)
print('Inserted document with explicit _id: duplicated_id')

# 3) Attempt to insert another document with the same _id -> DuplicateKeyError expected
try:
    demo_coll.insert_one({'_id': 'duplicated_id', 'name': 'example2-dup'})
except DuplicateKeyError as e:
    print('DuplicateKeyError caught as expected:', e)

demo_coll.delete_many({})

Inserted document with generated _id: 68f7103188ea8e2b294bdf5e
Inserted document with explicit _id: my-custom-id
DuplicateKeyError caught as expected: E11000 duplicate key error collection: zziang_db.demo_id_demo index: _id_ dup key: { _id: "my-custom-id" }, full error: {'index': 0, 'code': 11000, 'errmsg': 'E11000 duplicate key error collection: zziang_db.demo_id_demo index: _id_ dup key: { _id: "my-custom-id" }', 'keyPattern': {'_id': 1}, 'keyValue': {'_id': 'my-custom-id'}}
Demo finished.
DuplicateKeyError caught as expected: E11000 duplicate key error collection: zziang_db.demo_id_demo index: _id_ dup key: { _id: "my-custom-id" }, full error: {'index': 0, 'code': 11000, 'errmsg': 'E11000 duplicate key error collection: zziang_db.demo_id_demo index: _id_ dup key: { _id: "my-custom-id" }', 'keyPattern': {'_id': 1}, 'keyValue': {'_id': 'my-custom-id'}}
Demo finished.


In [32]:
# Explain the query plan to show how indexes are used
query_wo_index = {'chrom': '1', 'pos': {'$gte': 1000, '$lte': 5000}}
query_w_index = {'chrom': '1', 'pos_idx': {'$gte': 1000, '$lte': 5000}}
target_collection = db_public["reference"]
# show the index

def print_index_info(collection):
    indexes = collection.index_information()
    print("Indexes on collection", collection.name)
    for name, info in indexes.items():
        print(f"Index Name: {name}, Info: {info}")
    
def print_query_plan(query, collection):
    explain_plan = collection.find(query).explain()
    pprint.pprint(explain_plan['queryPlanner']['winningPlan'])
    print("Time taken (ms):", explain_plan.get('executionStats', {}).get('executionTimeMillis', 'N/A'))


print_index_info(target_collection)
print("Query plan without index:")
print_query_plan(query_wo_index, target_collection)
print("Query plan with index:")
print_query_plan(query_w_index, target_collection)


Query plan without index:
{'filter': {'$and': [{'pos': {'$lte': 5000}}, {'pos': {'$gte': 1000}}]},
 'inputStage': {'direction': 'forward',
                'indexBounds': {'chrom': ['["1", "1"]'],
                                'pos_idx': ['[MinKey, MaxKey]']},
                'indexName': 'chrom_1_pos_idx_1',
                'indexVersion': 2,
                'isMultiKey': False,
                'isPartial': False,
                'isSparse': False,
                'isUnique': False,
                'keyPattern': {'chrom': 1, 'pos_idx': 1},
                'multiKeyPaths': {'chrom': [], 'pos_idx': []},
                'stage': 'IXSCAN'},
 'isCached': False,
 'stage': 'FETCH'}
Time taken (ms): 684
Query plan with index:
{'inputStage': {'direction': 'forward',
                'indexBounds': {'chrom': ['["1", "1"]'],
                                'pos_idx': ['[1000, 5000]']},
                'indexName': 'chrom_1_pos_idx_1',
                'indexVersion': 2,
                'isMultiKe

## Exercise 3 — Update operations

- Add `is_high_quality = (qual >= 30 and filter == "PASS")`

In [33]:
# 1) Quality flag
res1 = variants.update_many(
    {'qual': {'$ne': None}}, # for all entries that has has non-null ddquality field
    [{
        '$set': {
            'is_high_quality': {
                '$and': [
                    {'$gte': ['$qual', 30]},
                    {'$eq': ['$filter', 'PASS']}
                ]
            }
        }
    }]
)
print('Matched:', res1.matched_count, 'Modified:', res1.modified_count)

Matched: 10 Modified: 10
Matched: 10 Modified: 10


## Exercise 4 — Delete operations

- Remove low-quality records (e.g., `qual < 10` or `filter != "PASS"`)
- Pre-check counts before and after

In [13]:
pred = {'$or': [{'qual': {'$lt': 10}}, {'filter': {'$ne': 'PASS'}}]}
to_remove = variants.count_documents(pred)
print('Would remove:', to_remove)
res = variants.delete_many(pred)
print('Deleted:', res.deleted_count)
print('Remaining:', variants.estimated_document_count())

Would remove: 4
Deleted: 4
Remaining: 6
Remaining: 6


## Exercise 5 — Aggregations (≈ bcftools)

- Count by chromosome
- PASS vs non-PASS
- Transition/Transversion ratio for SNPs
- QUAL histogram

In [34]:
# 1) Count by chromosome
print(list(variants.aggregate([
  {'$group': {'_id': '$chrom', 'n': {'$sum': 1}}},
  {'$sort': {'n': -1}}
])))

# 2) PASS vs non-PASS
print(list(variants.aggregate([
  {'$group': {'_id': '$filter', 'n': {'$sum': 1}}},
  {'$sort': {'n': -1}}
])))

# 3) Ts/Tv for single-nucleotide substitutions
pipeline_tstv = [
  {'$match': {'$expr': {'$and': [
      {'$eq': [{'$strLenCP': '$ref'}, 1]},
      {'$in': [1, {'$map': {'input': '$alt', 'as': 'a', 'in': {'$strLenCP': '$$a'}}}]}
  ]}}},
  {'$unwind': '$alt'},
  {'$project': {
      'pair': ['$$ROOT.ref', '$alt']
  }},
  {'$project': {
      'class': {
        '$switch': {
          'branches': [
            {'case': {'$in': ['$pair', [['A','G'],['G','A'],['C','T'],['T','C']]]}, 'then': 'transition'}
          ],
          'default': 'transversion'
        }
      }
  }},
  {'$group': {'_id': '$class', 'n': {'$sum': 1}}}
]
print(list(variants.aggregate(pipeline_tstv)))

# 4) QUAL histogram
bins = [0,10,20,30,40,50,60,1000]
pipeline_hist = [
  {'$match': {'qual': {'$ne': None}}},
  {'$bucket': {
    'groupBy': '$qual',
    'boundaries': bins,
    'default': '>=1000',
    'output': {'count': {'$sum': 1}}
  }}
]
print(list(variants.aggregate(pipeline_hist)))

[{'_id': '1', 'n': 4}, {'_id': 'X', 'n': 1}, {'_id': '2', 'n': 1}]
[{'_id': 'PASS', 'n': 6}]
[{'_id': 'transition', 'n': 4}, {'_id': 'transversion', 'n': 1}]
[{'_id': 30, 'count': 1}, {'_id': 50, 'count': 1}, {'_id': 60, 'count': 4}]
[{'_id': 'PASS', 'n': 6}]
[{'_id': 'transition', 'n': 4}, {'_id': 'transversion', 'n': 1}]
[{'_id': 30, 'count': 1}, {'_id': 50, 'count': 1}, {'_id': 60, 'count': 4}]


### (Optional) Save aggregation results to a `variant_stats` collection

In [15]:
stats_coll = db['variant_stats']
stats_coll.delete_many({})
stats_coll.insert_many(list(variants.aggregate(pipeline_hist)))
print('Stats docs:', stats_coll.estimated_document_count())

Stats docs: 3


# Assignment

1. Query VCF slices from `public` database `example` collection. Process the data and insert the result to the personal database. (2%)
2. Query the database and get statistics like chromosome count, QUAL distribution, and ALT allele count distribution (3%)

Use the starter code cells below as a template — adapt the region(s) and any processing rules as needed.