# NoSQL DB With Python

## 1. Configuration Setup & Connection Initialization


In [None]:
import os
import asyncio
import yaml
from typing import List, Optional, Dict, Any
from datetime import datetime, timezone
from beanie import Document, init_beanie
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from daolib.drivers.nosql.mongo_connector import MongoConnector
from daolib.drivers.nosql.config import NoSQLConnectionEntry
import pprint

In [None]:
# ---- YAML File Reader ----

class YamlFileOperator:
    """Simple YAML file reader"""
    
    @staticmethod
    def read(file_path: str) -> Dict[str, Any]:
        """Read YAML file and return the parsed content"""
        try:
            with open(file_path, 'r') as file:
                return yaml.safe_load(file)
        except FileNotFoundError:
            raise FileNotFoundError(f"Config file not found: {file_path}")
        except yaml.YAMLError as e:
            raise ValueError(f"Error parsing YAML file: {e}")

### 1.1 Configuration Constants


In [None]:
# ---- Configuration Constants ----

class Constants:
    """Configuration constants for the examples"""
    
    # Environment selection (defaults to development)
    ENVIRONMENT = os.getenv("ENVIRONMENT_FOR_RUN", "development")
    
    # Base path for YAML configuration files
    BASE_CONFIG_PATH = os.path.abspath(
        os.path.join(os.path.dirname(__file__), "config", f"{ENVIRONMENT}.yml")
    )
    
    class DBConstants:
        """Database-related constants"""
        
        # YAML structure keys
        nosql_creds = "nosql_creds"
        mongo_inst = "mongo_inst"
        username = "username"
        host = "host"
        port = "port"
        db_name = "db_name"
        min_pool_size = "min_pool_size"
        max_pool_size = "max_pool_size"
        
        # Passwords from environment variables (not stored in YAML for security)
        mongo_password_dev = os.getenv("MONGO_PASSWORD_DEV", "password")
        mongo_password_stage = os.getenv("MONGO_PASSWORD_STAGE", "password")
        mongo_password_prod = os.getenv("MONGO_PASSWORD_PROD", "password")

### 1.2 Custom MongoConnector Helper


In [None]:
# ---- Custom MongoConnector for daolib ----

class MongoHelper(MongoConnector):
    """
    Concrete implementation of MongoConnector for async examples.
    Loads configuration from YAML files and environment variables.
    Supports development, staging, and production environments.
    """
    
    def read_and_load_config(self) -> NoSQLConnectionEntry:
        """
        Load MongoDB configuration from YAML and environment variables.
        
        Returns:
            NoSQLConnectionEntry: Configuration object with connection details
        """
        environment = os.getenv('ENVIRONMENT_FOR_RUN', 'development')
        print(f"Loading Mongo Config for: {environment} from {Constants.BASE_CONFIG_PATH}")
        
        # 1. Read YAML using the file operator
        try:
            configs_data = YamlFileOperator.read(Constants.BASE_CONFIG_PATH)
        except Exception as e:
            print(f"Error reading config: {e}")
            raise e
        
        # 2. Extract Mongo Config Section (Safe get)
        # Structure: nosql_creds -> mongo_inst
        nosql_section = configs_data.get(Constants.DBConstants.nosql_creds, {})
        mongo_data = nosql_section.get(Constants.DBConstants.mongo_inst, {})
        
        # 3. Determine Password based on Environment (Passwords are NOT in YAML)
        if environment == "development":
            password = Constants.DBConstants.mongo_password_dev
        else:
            password = Constants.DBConstants.mongo_password_prod
        
        # 4. Return Config Object (Used by Parent init() to build the connection)
        return NoSQLConnectionEntry(
            username=mongo_data.get(Constants.DBConstants.username, ""),
            password=password,
            host=mongo_data.get(Constants.DBConstants.host, "localhost"),
            port=int(mongo_data.get(Constants.DBConstants.port, 27017)),
            database=mongo_data.get(Constants.DBConstants.db_name, "kubeplayground_async"),
            min_pool_size=int(mongo_data.get(Constants.DBConstants.min_pool_size, 10)),
            max_pool_size=int(mongo_data.get(Constants.DBConstants.max_pool_size, 50))
        )

### 1.3 Initialize Connection


In [None]:
# ---- Initialize Connector ----
async def setup_async_connection():
    """Initialize the async MongoDB connection with daolib using YAML config"""
    connector = MongoHelper(document_models=[Person, Address, Author, Book])
    await connector.init()
    print("✓ Async MongoDB connection established via daolib")
    return connector

In [None]:
printer = pprint.PrettyPrinter(indent=4)

# Initialize the async connection
connector = await setup_async_connection()

# Clean up previous data
db: AsyncIOMotorDatabase = connector.get_database()
await db.person_collection.delete_many({})
await db.addresses.delete_many({})
print("✓ Database cleared for fresh start")

## 2. Async CRUD Operations with daolib

This section demonstrates CRUD (Create, Read, Update, Delete) operations using daolib's `MongoConnector` and Beanie for async operations with MongoDB.


### 2.1 Define Document Models


In [None]:
# ---- Define Beanie Document Models ----

class Address(Document):
    """Address document for embedding or referencing"""
    street: str
    number: int
    city: str
    country: str
    zip: str
    owner_id: Optional[str] = None
    
    class Settings:
        collection = "addresses"


class Person(Document):
    """Person document with optional embedded addresses"""
    first_name: str
    last_name: str
    age: int
    married: Optional[bool] = None
    addresses: Optional[List[Address]] = None
    
    class Settings:
        collection = "person_collection"

We define Beanie `Document` models for our data entities. These models automatically integrate with MongoDB through the initialized connector.


In [None]:
# Create and insert persons using Beanie
async def create_persons_async(
    first_names: List[str],
    last_names: List[str],
    ages: List[int]
) -> List[str]:
    """Insert multiple persons and return their IDs"""
    persons = []
    
    for first_name, last_name, age in zip(first_names, last_names, ages):
        person = Person(
            first_name=first_name,
            last_name=last_name,
            age=age
        )
        persons.append(person)
    
    # Beanie handles bulk insert
    results = await Person.insert_many(persons)
    person_ids = [str(result.id) for result in results]
    return person_ids

### 2.2 Insert Operations

Create and insert sample data into the database.


In [None]:
# Insert sample data
first_names = ["Rahul", "Ananya", "Vikram", "Priya", "Arjun"]
last_names = ["Sharma", "Gupta", "Singh", "Mehta", "Verma"]
ages = [28, 24, 32, 27, 35]

inserted_ids_async = await create_persons_async(first_names, last_names, ages)
print(f"✓ Inserted {len(inserted_ids_async)} persons")
print(f"IDs: {inserted_ids_async}")

### 2.3 Read Operations

Retrieve data from the database using various query patterns.


In [None]:
# Get all persons
async def find_all_async():
    """Retrieve all persons"""
    persons = await Person.find_all().to_list()
    for person in persons:
        printer.pprint(person.dict())


await find_all_async()

In [None]:
# Get by ID
async def get_by_id_async(person_id: str):
    """Retrieve a person by ID"""
    # Convert string ID back to ObjectId
    from bson import ObjectId
    person = await Person.get(ObjectId(person_id))
    if person:
        printer.pprint(person.dict())
    else:
        print(f"No person found with ID: {person_id}")


# Retrieve the first person
print(f"Fetching person with ID: {inserted_ids_async[0]}")
await get_by_id_async(inserted_ids_async[0])

In [None]:
# Search in range using Beanie query syntax
async def get_in_range_async(min_age: int, max_age: int):
    """Find persons within an age range"""
    persons = await Person.find(
        Person.age >= min_age,
        Person.age <= max_age
    ).sort(Person.age).to_list()
    
    for person in persons:
        printer.pprint(person.dict(exclude={"id"}))


await get_in_range_async(25, 35)

### 2.4 Update Operations

Modify existing documents in the database.


In [None]:
async def update_by_id_async(person_id: str):
    """Update a person's details"""
    from bson import ObjectId
    person = await Person.get(ObjectId(person_id))
    
    if person:
        person.married = False
        person.age += 1
        await person.save()
        print(f"✓ Updated person {person_id}")
    else:
        print(f"Person {person_id} not found")


# Update the second person (Ananya)
target_id = inserted_ids_async[1]
await update_by_id_async(target_id)

# Verify the update
print("\nVerifying update:")
await get_by_id_async(target_id)


### 2.5 Delete Operations

Remove documents from the database.


In [None]:
async def delete_by_id_async(person_id: str):
    """Delete a person by ID"""
    from bson import ObjectId
    person = await Person.get(ObjectId(person_id))
    
    if person:
        await person.delete()
        print(f"✓ Deleted person {person_id}")
    else:
        print(f"Person {person_id} not found")


# Delete the third person (Vikram)
target_id = inserted_ids_async[2]
await delete_by_id_async(target_id)

# Verify deletion
print("\nVerifying deletion:")
await get_by_id_async(target_id)


## 3. Data Relationships

Explore different patterns for managing relationships between entities in MongoDB.

### 3.1 Embedding (One-to-Few)

Store related data directly within a parent document.


In [None]:
async def add_address_embed_async(person_id: str, address_data: dict):
    """Add an embedded address to a person"""
    from bson import ObjectId
    person = await Person.get(ObjectId(person_id))
    
    if person:
        address = Address(**address_data)
        if person.addresses is None:
            person.addresses = []
        person.addresses.append(address)
        await person.save()
        print(f"✓ Added address to person {person_id}")
    else:
        print(f"Person {person_id} not found")


# Add address to the second person (Ananya) who we updated earlier
target_id = inserted_ids_async[1]
address_data = {
    "street": "Bay Street",
    "number": 2706,
    "city": "San Francisco",
    "country": "United States",
    "zip": "94107"
}

await add_address_embed_async(target_id, address_data)

# Verify the embedded address
print("\nVerifying embedded address:")
await get_by_id_async(target_id)


### 3.2 Referencing (One-to-Many)

Store related data in separate collections and link them via IDs.


In [None]:
async def add_address_relationship_async(person_id: str, address_data: dict):
    """Add a separate address document linked via person_id (referencing pattern)"""
    address_data_copy = address_data.copy()
    address_data_copy["owner_id"] = person_id
    
    address = Address(**address_data_copy)
    await address.insert()
    print(f"✓ Created address linked to person {person_id}")


async def find_addresses_by_owner_async(person_id: str):
    """Find all addresses belonging to a person"""
    addresses = await Address.find(Address.owner_id == person_id).to_list()
    
    print(f"Addresses for Person {person_id}:")
    for addr in addresses:
        printer.pprint(addr.dict(exclude={"id"}))


# Link address to the fourth person (Priya) using references
target_id = inserted_ids_async[3]

await add_address_relationship_async(target_id, address_data)

# Verify the referenced address
await find_addresses_by_owner_async(target_id)


### 3.3 Many-to-Many Relationships

Demonstrate complex relationships between multiple documents using aggregation pipelines.

#### 3.3.1 Define Models for Authors and Books


In [None]:
class Author(Document):
    """Author document"""
    first_name: str
    last_name: str
    date_of_birth: datetime
    
    class Settings:
        collection = "author"


class Book(Document):
    """Book document with references to authors"""
    title: str
    authors: List[str]  # List of Author ObjectIds
    publish_date: datetime
    type: str  # 'fiction' or 'non-fiction'
    copies: int
    
    class Settings:
        collection = "book"

#### 3.3.2 Insert Author Data


In [None]:
# Create and insert authors
authors_data = [
    {
        "first_name": "Haruki",
        "last_name": "Murakami",
        "date_of_birth": datetime(1949, 1, 12, tzinfo=timezone.utc),
    },
    {
        "first_name": "Chimamanda",
        "last_name": "Ngozi Adichie",
        "date_of_birth": datetime(1977, 9, 15, tzinfo=timezone.utc),
    },
    {
        "first_name": "Yuval",
        "last_name": "Noah Harari",
        "date_of_birth": datetime(1976, 2, 24, tzinfo=timezone.utc),
    },
]

# Insert authors
authors = [Author(**data) for data in authors_data]
author_results = await Author.insert_many(authors)
author_ids = [str(result.id) for result in author_results]

print(f"✓ Inserted {len(author_ids)} authors")
print(f"Author IDs: {author_ids}")

murakami_id, adichie_id, harari_id = author_ids

#### 3.3.3 Insert Book Data with Author References


In [None]:
# Create and insert books with author references
books_data = [
    {
        "title": "Kafka on the Shore",
        "authors": [murakami_id],
        "publish_date": datetime(2002, 9, 12, tzinfo=timezone.utc),
        "type": "fiction",
        "copies": 12,
    },
    {
        "title": "Norwegian Wood",
        "authors": [murakami_id],
        "publish_date": datetime(1987, 9, 4, tzinfo=timezone.utc),
        "type": "fiction",
        "copies": 9,
    },
    {
        "title": "Half of a Yellow Sun",
        "authors": [adichie_id],
        "publish_date": datetime(2006, 9, 12, tzinfo=timezone.utc),
        "type": "fiction",
        "copies": 7,
    },
    {
        "title": "We Should All Be Feminists",
        "authors": [adichie_id],
        "publish_date": datetime(2014, 1, 1, tzinfo=timezone.utc),
        "type": "non-fiction",
        "copies": 15,
    },
    {
        "title": "Sapiens: A Brief History of Humankind",
        "authors": [harari_id],
        "publish_date": datetime(2011, 1, 1, tzinfo=timezone.utc),
        "type": "non-fiction",
        "copies": 20,
    },
]

# Insert books
books = [Book(**data) for data in books_data]
book_results = await Book.insert_many(books)
book_ids = [str(result.id) for result in book_results]

print(f"✓ Inserted {len(book_ids)} books")
print(f"Book IDs: {book_ids}")


#### 3.3.4 Join Authors with Their Books

Use aggregation pipeline with `$lookup` to perform a join operation.


In [None]:
# Query with aggregation pipeline for many-to-many joins
async def get_authors_with_books():
    """Use aggregation pipeline to join authors with their books"""
    from bson import ObjectId
    
    pipeline = [
        {
            "$lookup": {
                "from": "book",
                "localField": "_id",
                "foreignField": "authors",
                "as": "books"
            }
        },
        {
            "$project": {
                "first_name": 1,
                "last_name": 1,
                "books": {
                    "$map": {
                        "input": "$books",
                        "in": {
                            "title": "$$this.title",
                            "type": "$$this.type",
                            "copies": "$$this.copies"
                        }
                    }
                }
            }
        }
    ]
    
    results = await Author.aggregate(pipeline).to_list()
    
    print("Authors and their Books:")
    for result in results:
        printer.pprint(result)


await get_authors_with_books()

#### 3.3.5 Advanced Aggregation with Calculated Fields

Calculate statistics like total books and copies per author.


In [None]:
# Advanced aggregation with calculated fields
async def get_authors_with_book_count():
    """Join authors with books and calculate total books per author"""
    
    pipeline = [
        {
            "$lookup": {
                "from": "book",
                "localField": "_id",
                "foreignField": "authors",
                "as": "books"
            }
        },
        {
            "$addFields": {
                "total_books": {"$size": "$books"},
                "total_copies": {"$sum": "$books.copies"}
            }
        },
        {
            "$project": {
                "first_name": 1,
                "last_name": 1,
                "total_books": 1,
                "total_copies": 1
            }
        }
    ]
    
    results = await Author.aggregate(pipeline).to_list()
    
    print("Author Book Statistics:")
    for result in results:
        printer.pprint(result)


await get_authors_with_book_count()

## 4. Cleanup

Close the database connection and clean up resources.


In [None]:
# Cleanup: close the connection
connector.close()
print("✓ MongoDB connection closed")