# Category 1: Basic Data Operations
This notebook demonstrates basic CRUD operations in a sharded environment:

1. Insert with validation
2. Bulk inserts with shard analysis
3. Complex updates across shards
4. Delete operations with shard awareness
5. Upsert operations
6. Atomic operations

In [None]:
# Setup and Connection
import sys
!{sys.executable} -m pip install pandas pymongo --quiet

import pandas as pd
from pymongo import MongoClient
from datetime import datetime
import json
import time
import warnings
warnings.filterwarnings('ignore')

def print_mongo(obj):
    print(json.dumps(obj, indent=2, default=str))

client = MongoClient('mongodb://admin:admin@router1:27017/businessdb?authSource=admin')
db = client.businessdb
print("Connected to MongoDB")

## Example 1: Insert with Validation
Task: Insert a new organization with validation rules

In [1]:
new_org = {
    "organizationId": "TEST123",
    "name": "Test Company",
    "industry": "Technology",
    "country": "Czech Republic",
    "founded": 2023,
    "numberOfEmployees": 100
}

try:
    result = db.organizations.insert_one(new_org)
    print(f"Inserted document with ID: {result.inserted_id}")
except Exception as e:
    print(f"Validation error: {e}")

# Explanation: This command demonstrates document insertion with schema validation
# The document must match the schema defined in init-collections.js

Inserted document with ID: 6771e2c5ffa110f4979adcbf


## Example 2: Bulk Insert with Shard Distribution Analysis
Task: Insert organizations and analyze how data is distributed across shards

In [None]:
bulk_orgs = [
    {
        "organizationId": f"ORG{i}",
        "name": f"Company {i}",
        "industry": "Technology",
        "country": "Country {i % 5}",
        "founded": 2000 + (i % 20),
        "numberOfEmployees": 50 + (i * 10)
    }
    for i in range(100)
]

try:
    result = db.organizations.insert_many(bulk_orgs)
    print(f"Inserted {len(result.inserted_ids)} documents")
except Exception as e:
    print(f"Bulk insert error: {e}")

shard_stats = db.command('shardCollection', 'businessdb.organizations', key={'organizationId': 1})
print("Distribution for organizations:")
for shard, stats in shard_stats['shards'].items():
    print(f"Shard {shard}: {stats['count']} documents, {stats['size']}MB")

print("Detailed stats:")
print_mongo(db.organizations.stats())

# Explanation: This command demonstrates bulk insertion and shard distribution analysis
# The data is inserted and then analyzed to see how it is distributed across shards

## Example 3: Advanced Upsert with Version Tracking
Task: Update or insert documents with automatic version control across shards

In [None]:
from datetime import datetime

def upsert_with_version_tracking(org_id, data):
    # Start session for consistency
    with client.start_session() as session:
        # Get current version if exists
        current = db.organizations.find_one(
            {"organizationId": org_id},
            session=session
        )
        
        # Prepare version metadata
        version_data = {
            "version": (current.get("version", 0) + 1 if current else 1),
            "lastModified": datetime.utcnow(),
            "modifiedBy": "system"
        }
        
        # Combine with new data
        update_data = {**data, **version_data}
        
        # Perform upsert
        result = db.organizations.update_one(
            {"organizationId": org_id},
            {"$set": update_data},
            upsert=True,
            session=session
        )
        
        # Log version history
        if current:
            db.organizations_versions.insert_one(
                {**current, "replacedAt": datetime.utcnow()},
                session=session
            )
            
        return result

# Example usage
result = upsert_with_version_tracking(
    "ORG_TEST_1",
    {
        "name": "Test Organization",
        "industry": "Technology",
        "status": "active"
    }
)
print("Operation result:", result.modified_count, "modified,", "new document created" if result.upserted_id else "existing document updated")

# Verify versions
versions = list(db.organizations_versions.find({"organizationId": "ORG_TEST_1"}))
print(f"\nVersion history: {len(versions)} previous versions")
for v in versions:
    print(f"Version {v['version']} from {v['lastModified']}")

## Example 4: Complex Document History System
Task: Implement a sophisticated versioning system with diff tracking

In [None]:
from deepdiff import DeepDiff

class DocumentTracker:
    def __init__(self, collection, history_collection):
        self.collection = collection
        self.history = history_collection
    
    def update_with_history(self, filter_query, updates, metadata=None):
        with client.start_session() as session:
            # Get current state
            current_doc = self.collection.find_one(filter_query)
            
            if not current_doc:
                raise ValueError("Document not found")
            
            # Calculate differences
            diff = DeepDiff(current_doc, updates, ignore_order=True)
            
            # Create history record
            history_record = {
                "documentId": current_doc["_id"],
                "timestamp": datetime.utcnow(),
                "previousVersion": current_doc,
                "changes": diff.to_dict(),
                "metadata": metadata or {}
            }
            
            # Update document and store history atomically
            session.start_transaction()
            try:
                self.collection.update_one(
                    filter_query,
                    {"$set": updates},
                    session=session
                )
                self.history.insert_one(history_record, session=session)
                session.commit_transaction()
                return True
            except Exception as e:
                session.abort_transaction()
                raise e

# Initialize tracker
tracker = DocumentTracker(db.organizations, db.organizations_history)

# Example usage
try:
    tracker.update_with_history(
        {"organizationId": "ORG_TEST_1"},
        {"status": "inactive", "lastUpdateReason": "Company restructuring"},
        metadata={"updatedBy": "admin", "reason": "status change"}
    )
    print("Update successful with history tracking")
except Exception as e:
    print(f"Error: {e}")

## Example 5: Cross-Shard Atomic Operations
Task: Implement complex atomic operations across multiple shards

In [None]:
def atomic_multi_org_operation(operations):
    """Execute multiple operations across different shards atomically"""
    with client.start_session() as session:
        session.start_transaction()
        try:
            results = []
            for op in operations:
                if op['type'] == 'update':
                    result = db.organizations.update_one(
                        op['filter'],
                        op['update'],
                        session=session
                    )
                elif op['type'] == 'insert':
                    result = db.organizations.insert_one(
                        op['document'],
                        session=session
                    )
                results.append({
                    'operation': op['type'],
                    'success': True,
                    'result': result
                })
            
            session.commit_transaction()
            return results
        except Exception as e:
            session.abort_transaction()
            raise Exception(f"Transaction failed: {str(e)}")

# Example usage
ops = [
    {
        'type': 'update',
        'filter': {'organizationId': 'ORG_TEST_1'},
        'update': {'$set': {'status': 'merging'}}
    },
    {
        'type': 'insert',
        'document': {
            'organizationId': 'ORG_TEST_2',
            'name': 'Test Org 2',
            'status': 'active'
        }
    }
]

try:
    results = atomic_multi_org_operation(ops)
    print("All operations completed successfully")
    for r in results:
        print(f"{r['operation']}: {r['result']}")
except Exception as e:
    print(f"Transaction failed: {e}")

## Example 6: Parallel Bulk Operations
Task: Execute bulk operations with parallel processing and shard awareness

In [None]:
from concurrent.futures import ThreadPoolExecutor
from pymongo import UpdateOne, InsertOne
import threading

class ShardedBulkOperator:
    def __init__(self, collection, batch_size=100, max_workers=4):
        self.collection = collection
        self.batch_size = batch_size
        self.max_workers = max_workers
        self._local = threading.local()
    
    def _get_session(self):
        if not hasattr(self._local, 'session'):
            self._local.session = client.start_session()
        return self._local.session
    
    def _process_batch(self, operations):
        session = self._get_session()
        try:
            result = self.collection.bulk_write(
                operations,
                session=session,
                ordered=False
            )
            return {
                'inserted': result.inserted_count,
                'modified': result.modified_count,
                'deleted': result.deleted_count
            }
        except Exception as e:
            print(f"Batch error: {str(e)}")
            raise
    
    def execute(self, operations):
        # Split into batches
        batches = [operations[i:i + self.batch_size] 
                   for i in range(0, len(operations), self.batch_size)]
        
        results = []
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [executor.submit(self._process_batch, batch) 
                      for batch in batches]
            
            for future in futures:
                try:
                    results.append(future.result())
                except Exception as e:
                    print(f"Failed batch: {e}")
        
        return self._aggregate_results(results)
    
    def _aggregate_results(self, results):
        totals = {'inserted': 0, 'modified': 0, 'deleted': 0}
        for r in results:
            for k in totals:
                totals[k] += r.get(k, 0)
        return totals

# Example usage
bulk_ops = []
for i in range(1000):
    if i % 2 == 0:
        bulk_ops.append(InsertOne({
            'organizationId': f'BULK_{i}',
            'name': f'Bulk Organization {i}',
            'status': 'active'
        }))
    else:
        bulk_ops.append(UpdateOne(
            {'organizationId': f'BULK_{i-1}'},
            {'$set': {'status': 'updated'}}
        ))

operator = ShardedBulkOperator(db.organizations)
results = operator.execute(bulk_ops)
print("Bulk operation results:")
print(f"Inserted: {results['inserted']}")
print(f"Modified: {results['modified']}")
print(f"Deleted: {results['deleted']}")