In [None]:
#pip install pymongo 
from pymongo import MongoClient
import pymongo
# Pymongo Docs 
# Use PyMongo for synchronous Python applications.
# https://www.mongodb.com/docs/drivers/pymongo/
# Use Motor for asynchronous Python applications.
# https://www.mongodb.com/docs/drivers/motor/

# pymongo documentation
# https://pymongo.readthedocs.io/en/stable/api/pymongo/index.html

In [None]:
# generic use for setup client
client = MongoClient('localhost', 27017)
dbs = client.list_database_names()

# and list all db and collections 
for db in dbs:
    print(f"[{db}]=> {client[db].list_collection_names()}")

# connect to db
db_name = "mydb"
db = client[db_name] 
# connect to collection
coll_name = "student"
col = db[coll_name]  

# use generic functions on connected collection 
# OR
# use the MongoDB Manager Class defined below

In [None]:
# MongoDB Manager Class

class MongoDBManager:
    """
        the MongoDB client manager.
        Some function created. 
        if not present, use direct call on self.client, self.coll 
    """
    
    def __init__(self, host='localhost', port=27017, database_name='mydb', coll_name="mycoll"):
        """
        Initialize the MongoDB client and establish a connection to the server.
        :param host: MongoDB server hostname or IP address (default is 'localhost').
        :param port: MongoDB server port (default is 27017).
        :param database_name, coll_name: Deafult database and collection to perform operations.
        """
        self.client = MongoClient(host, port)
        self.db = self.client[database_name]    #select db
        self.coll = self.db[coll_name]          #select collection, short coll
        self.databases_list = []                # variable to store the list of databases

    def list_dbs_or_colls(self, database_name=None):
        """
        List all databases in MongoDB or list collections within a specific database.
        :param database_name: Name of the database to list collections from (default is None).
        """
        if database_name:
            # List collections within the specified database
            database = self.client[database_name]
            collection_names = database.list_collection_names()
            return collection_names
        else:
            # List all databases in MongoDB
            self.databases_list = self.client.list_database_names()
            return self.databases_list
    
    def create_db_or_coll(self, **kwargs):
        """
        Create (and Select) a new database or collection based on the parameters.
        :param kwargs: Keyword arguments to specify the database and/or collection.
            - 'database_name': Name of the database to create.
            - 'collection_name': Name of the collection to create.
        Note:  In MongoDB, a database is not created until it gets content!    
        """
        database_name = kwargs.get('database_name')
        collection_name = kwargs.get('collection_name')

        if database_name == None and collection_name == None:
            raise ValueError("Either 'database_name' or/and 'collection_name' must be provided.")

        if database_name:
            self.db = self.client[database_name] #select or create collection
        # To create a collection, you need to select a database first
        if collection_name:
            if self.db is not None:
                self.coll = self.db[collection_name]  
            else:
                raise Exception("No database selected. Select or create a database.")
            
    def connect_db_or_coll(self, **kwargs):
        """
        Connect/Select to an existing database/collection
        If not **kwargs are given return current db / collection (as string)
        :param kwargs: Keyword arguments to specify the database and/or collection to connect to
            - 'database_name': Name of the database to create.
            - 'collection_name': Name of the collection to create.
        :return: tuple of selected/current current db / collection (as string)
        """
        database_name = kwargs.get('database_name')
        collection_name = kwargs.get('collection_name')

        if database_name == None and collection_name == None:
            #raise ValueError("Either 'database_name' or/and 'collection_name' must be provided.")
            return tuple([self.db.name, self.coll.name])
        if database_name:
            self.db = self.client[database_name]    #select database_name as active
        if collection_name:
            self.coll = self.db[collection_name]  #select collection_name as active

        # return tuple with selected db and collection
        return tuple([self.db.name, self.coll.name])
    
    def drop_db_or_coll(self, name, is_collection=False):
        """
        Delete or drop a database or collection based on the parameters.

        :param name: Name of the database or collection (currently) to delete.
        :param is_collection: Whether to delete a collection (default is False, which deletes a database).
        """
        if is_collection:
            if self.db is not None:
                return self.db.drop_collection(name)
            else:
                raise Exception("No database selected. Use create_database() to select or create a database.")
        else:
            return self.client.drop_database(name)

        
    def insert_one(self, document, collection_name=None):
        """
        Insert a single document into a specified collection.
        :param collection_name: str Name of the collection, or current selected collection will be used
        :param document: Dictionary representing the document to insert.
        """
        # check if collection name given, if not use selected one
        if collection_name is None:
            collection = self.coll
        else:
            collection = self.db[collection_name]

        return collection.insert_one(document)

    def insert_many(self, documents, collection_name=None):
        """
        Insert multiple documents into a specified collection.
        :param collection_name: Name of the collection.
        :param documents: List of dictionaries representing the documents to insert.
        """
                # check if collection name given, if not use selected one
        if collection_name is None:
            collection = self.coll
        else:
            collection = self.db[collection_name]

        return collection.insert_many(documents)
            
    def find(self, query, limit=None, collection_name=None):
        """
        Query documents in a specified collection.
        :param query: Dictionary representing the query filter (default is an empty query).
        :param collection_name: Name of the collection.
        :param limit: number of records to fetch. None = all, limit = 1 one, limit = 3, etc
        :return: List of documents matching the query.
        """
        
        if collection_name is None:     # check if collection name given, if not use selected one
            collection = self.coll
        else:
            collection = self.db[collection_name]
        #print(collection)

        if limit is None:
            return collection.find(query)
        elif limit <= 1:
            return collection.find_one(query)
        else:
            return collection.find(query).limit(limit)
        
    def aggregate(self, pipleline, collection_name=None):
        if collection_name is None:     # check if collection name given, if not use selected one
            collection = self.coll
        else:
            collection = self.db[collection_name]

        return collection.aggregate(pipleline)
        
    def update(self, query, update, limit="One", collection_name=None):
        """
        Update a single document matching the query in a specified collection.
        :param collection_name: Name of the collection.
        :param query: Dictionary representing the query filter.
        :param update: Dictionary representing the update to apply.
        :param limit: Number to update: None=One, otherwise Many
        """
        if collection_name is None:     # check if collection name given, if not use selected one
            collection = self.coll
        else:
            collection = self.db[collection_name]
        #print(collection)

        if (limit is None) or (limit == "One"):
            return collection.update_one(query, update)
        elif (limit == "Many"):
            return collection.update_many(query, update)
        
    def delete(self, query, limit="One", collection_name=None):
        """
        Delete a single document matching the query from a specified collection.
        :param collection_name: Name of the collection.
        :param query: Dictionary representing the query filter.
        :param limit: Number to update: None=One, Many, All
        """
        if collection_name is None:     # check if collection name given, if not use selected one
            collection = self.coll
        else:
            collection = self.db[collection_name]
        #print(collection)
        # the first occurrence is deleted.
        if (limit is None) or (limit == "One"):
            return collection.delete_one(query)
        elif (limit == "Many"):
            return collection.delete_many(query)
        # Delete All Documents in a Collection
        elif (limit == "All"):            
            return collection.delete_many({})

    def execute_command(self, command):
        """
        Execute an arbitrary command in the database.
        :param command: Dictionary representing the MongoDB command.
        :return: The result of executing the command.
        """
        return self.db.command(command)

    def close_connection(self):
        """
        Close the MongoDB connection.
        """
        self.client.close()

In [None]:
# Example Usage:
# Initialize the MongoDB client and establish a connection to the server
dbm = MongoDBManager('localhost', 27017)

# List all databases in MongoDB  => list
dbm.list_dbs_or_colls()

# List all collections within a specific database => list
dbm.list_dbs_or_colls("mydb")

# List active db and collection => tuple
dbm.connect_db_or_coll()

# Connect to db and collection => tuple
dbm.connect_db_or_coll(database_name='mydb', collection_name='student')
dbm.list_dbs_or_colls("mydb")


In [None]:
#Create db
dbm.create_db_or_coll(database_name='animals')

#Drop database
dbm.drop_db_or_coll('animals', is_collection=False)

#Create collection
dbm.create_db_or_coll(collection_name='chicken')

#Drop Collection
dbm.drop_db_or_coll('chicken', is_collection=True)

In [None]:
# List all collections within a specific database
# dbm.list_dbs_or_colls()
dbm.list_dbs_or_colls("mydb")

#Create collection
dbm.create_db_or_coll(collection_name='python')

# Connect or current db/coll
dbm.connect_db_or_coll(collection_name='python')


In [None]:
# Random data generator for dummy data, with variety of characteristics
import random
from datetime import datetime
import time
from faker import Faker

fake = Faker()

def generate_random_object():
    # random data generator 
    short_names = ["John", "Jane", "Bob", "Sara", "Mike", "Liz", "Tom", "Emma", "Dave", "Lisa", "Andy"]
    hobby_names = ["Reading", "Painting", "Cooking", "Hiking", "Photography", "Gardening", "Singing"]
    levels = ["beg", "interm", "advance"]

    current_timestamp = int(time.time())       # Get the current Unix timestamp
    random_timestamp = random.randint(0, current_timestamp)  # Generate a random timestamp between 0 and the current timestamp
    random_datetime = datetime.fromtimestamp(random_timestamp)

    return {
        "name"  : random.choice(short_names),
        "age"   : random.randint(18, 60), 
        "date"  : random_datetime.strftime("%Y-%m-%d"),
        "numbers" : [random.randint(1, 100) for _ in range(3)],
        "hobby" : { 
            "name": random.choice(hobby_names),
            "level": random.choice(levels) 
            },
        "desc"     : fake.text(),
        "email"    : fake.email()
    }

# test single generated data.
print(generate_random_object())

In [None]:
# Single document to insert using insertOne
document = generate_random_object()

#Insert single record
result = dbm.insert_one( document , collection_name='python')
print(document)
print(result.inserted_id)

In [None]:
# Generate a list of objects to insert many documents
documents = []

for _ in range(200):
    random_data = generate_random_object()
    documents.append(random_data)

#Insert multiple documents
results = dbm.insert_many( documents , collection_name='python')
print(f"num of docs inserted: {len(results.inserted_ids)}")

In [None]:
from bson import ObjectId

# Custom timestamp in the desired format
custom_timestamp = "1986-11-04 01:01:01"
# Convert the custom timestamp to a datetime object
custom_datetime = datetime.strptime(custom_timestamp, "%Y-%m-%d %H:%M:%S")
# Calculate the timestamp in seconds since the Unix epoch
timestamp_seconds = int(custom_datetime.timestamp())

# Create a custom ObjectId with the timestamp
custom_object_id = ObjectId.from_datetime(
    datetime.fromtimestamp(timestamp_seconds)
)

# Document to insert with the timstamp custom ObjectId
document = {
    "_id": custom_object_id,
    "name"  : None,
    "age"   : -1
}
print(document)

# Insert single record with Created ObjectId
result = dbm.insert_one( document , collection_name='python')
print(result.inserted_id)

In [None]:
# Find Query One
query = {}
# get one row = find.one()
row = dbm.find(query,limit=1)
print(row)

In [None]:
# Find Query All
#Find all
query = {}
cursor = dbm.find(query)  
# return currsor if more than one row 

# iterate cursor
for _ in range(1):    
    print( cursor.next())
# convert to array of documents
docs = list (cursor) 
# for all
print(f"num of docs read: {len(docs )}")


In [None]:
# Find Query Multiple
# get two records 
query = {}
curs = dbm.find(query,limit=2)
# convert to array of documents
rows = list(curs)
print(rows)


In [None]:
# query for name = "Jane"
query = { "name": "Jane" }
cursor = dbm.find(query)
# for all
print(f"num of docs read: {len(list (cursor))}")


In [None]:
# query for age > 34
query = {"age": {"$gt": 57}}
cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")

In [None]:
# query for name = "Jane" or "Bob"
query = {"name": {"$in": ["Jane", "Bob"] } }
cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")

In [None]:
# query for name = "Jane" and  age = 32
query = {"name": "Jane", "age": {"$gt": 32}}
cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")

In [None]:
# query for field existance, return document if True, name and and age
query = {
    "name": {"$exists": True},
    "age": {"$exists": True}
}
cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")

In [None]:
# query for nested objects, with dot notation
query = {"hobby.level": "beg","hobby.name": "Gardening" }
cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")

In [None]:
# Query with Regular Expressions
import re

# Define regEX pattern for field to search
pattern = re.compile(r'^[A]')                     # name start with A
# pattern = re.compile(r'example.net')              # match exact example.net
# pattern = re.compile(r'^[A-Za-z ]+$')             # matches names with alphabetic characters and spaces
# pattern = re.compile(r'^\d{4}-\d{2}-\d{2}$')      # matches dates in the "YYYY-MM-DD" format
# pattern = re.compile(r'^\w+@\w+\.\w{2,}$')        # matches email addresses like "user@example.com.
# pattern = re.compile(r'^[0-9a-fA-F]{24}$')        # matches valid ObjectId strings
# pattern = re.compile(r'^.*$/')                    # matches any sequence of characters

# Query for documents where the "name" field matches the pattern
query = {"name": {"$regex": pattern}}
# Find documents where the 'user_name' field starts with 'Dan'.
query = {'name': {'$regex': '^Dan' }}

cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")

query = { "name": { "$regex": "Mike|John" } }
cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")

In [None]:
# Query object array documents
# Query for Documents with Any of the Numbers in the Array
query = { "numbers": { "$in": [10, 20, 30] } }
# Query for Documents with All of the Numbers in the Array
query = { "numbers": { "$all": [8, 21, 22] } }
# Query for Documents with a Specific Number in the Array
query = { "numbers":  34 }

cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")


In [None]:
# Find documents created after defined time, using ObjectID and _id field
# Calculate the timestamp for the specific date
specific_date = datetime(2023, 1, 1)
timestamp = int(specific_date.timestamp())
# Use $gte operator with _id to find documents created after the specific date
query = {
    "_id": {"$gte": ObjectId.from_datetime(datetime.fromtimestamp(timestamp))}
}

cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")

In [None]:
# Query for Documents Within a Range of Values
# Find documents where 'age' is between 25 and 35.
query = {'age': {'$gte': 30, '$lte': 32} }
# Query using $gte and $lte
query = {
    "age": {"$gte": 59},
    "date": {
        "$gte": "2012-05-27",
        "$lte": "2010-05-27"
    }
}

cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")

In [None]:
# Query for Documents Matching Multiple Conditions
# Find documents where 'age' is greater than 60 and 'resresulttaurant_name' is 'Spice of India'.
query = {'$and': [{'age': {'$gt': 60}}, {'hobby.level': 'beg'}]}
query = {'$or': [{'age': {'$gte': 90}}, {'age': {'$lte': 20}}]}
# Complex query using $or and $and operators, 
query = {
    "$or": [
        {
            "name": "Mike",
            "age": 59
        },
        {
            "numbers": {
                "$all": [8, 21, 22]
            }
        },
        {
            "hobby.name": "Gardening",
        }
    ],
    "$and": [
        {
            "desc": {
                "$regex": "invest",
                "$regex": "discussion"
            },
            "hobby.level": "beg"
        }
    ]
}

cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")

In [None]:
from pymongo import MongoClient, ASCENDING, DESCENDING
# Before use of $text operator for text search, need to create a text index on the "desc" field. 
# use of self.coll variable to acess directly current collection
# create an index in descending order, on field desc
dbm.coll.create_index( [ ("desc", pymongo.TEXT ) ])

# use of $text operator to search for documents that contain specific keywords in the "desc" field
query = {
   "$text": {
      "$search": "investment discussion"
   }
}

cursor = dbm.find(query)
print(f"num of docs read: {len(list (cursor))}")

In [None]:
# Sorting and Limiting Results:
# Sort documents by 'age' in descending order and limit the result to 4 documents.
# sort and limit are cursor method => could be applied after findMany
query = {}
sort = [('age', pymongo.DESCENDING)]
result = dbm.find(query, limit=5).sort(sort).limit(4)
print(len(list(result)))


In [None]:
# Aggregation Pipeline
pipeline = [
    {'$group': {'_id': '$city', 'average_age': {'$avg': '$age'}}}
]
result = dbm.client.orders.food.aggregate(pipeline)
result = dbm.find(query)
len(list(result))

In [None]:
# Some other pipeline definitions
# Filter Documents with a Condition
pipeline = [
    {"$match": {"age": {"$gt": 50}}}
]
# Project Specific Fields
pipeline = [
    {"$project": {"name": 1, "age": 1, "email": 1}}
]
# Group and Calculate Average Age
pipeline = [
    {"$group": {"_id": "$hobby.name", "avgAge": {"$avg": "$age"}}}
]
# Sort Documents  in Descending Order
pipeline = [
    {"$sort": {"age": -1}}
]
# Limit Results to Num Documents
pipeline = [
    {"$limit": 5}
]
# Unwind and Group by Numbers
pipeline = [
    {"$unwind": "$numbers"},
    {"$group": {"_id": "$numbers", "count": {"$sum": 1}}}
]
# Output the results to a new collection (optional)
pipeline = [ 
    {"$out": "aggregation_results"}
]

In [None]:
# some Complex pipelines
pipeline = [
    # Group documents by the "hobby.name" field
    {"$group": {
        "_id": "$hobby.name",
        "count": {"$sum": 1},         # Count the documents in each group
        "maxAge": {"$max": "$age"},   # Find the maximum age in each group
        "minAge": {"$min": "$age"},   # Find the minimum age in each group
        "avgAge": {"$avg": "$age"}    # Calculate the average age in each group
    }},
    # Project the results to rename the _id field to "hobby" and exclude _id
    {"$project": {
        "hobby": "$_id",
        "_id": 0,
        "count": 1,
        "maxAge": 1,
        "minAge": 1,
        "avgAge": 1
    }},
    # Sort the results by the "count" field in descending order
    {"$sort": {"count": -1}},
    # Limit the results to the first 5 groups
    {"$limit": 5},
    # Output the results to a new collection (optional)
    {"$out": "aggregation_results"}
]

pipeline = [
    # Match documents where the "age" field is greater than 30
    {"$match": {"age": {"$gt": 30}}},
    # Group documents by the "hobby.name" field and calculate the average age for each hobby
    {"$group": {
        "_id": "$hobby.name",
        "avgAge": {"$avg": "$age"}
    }},
    # Sort the results by the average age in descending order
    {"$sort": {"avgAge": -1}},
    # Project the results to rename the "_id" field to "hobby" and exclude "_id"
    {"$project": {
        "hobby": "$_id",
        "_id": 0,
        "avgAge": 1
    }},
    # Add a new field "isElderly" based on the average age
    {"$addFields": {
        "isElderly": {"$gte": ["$avgAge", 50]}
    }},
    # Output the results to a new collection (optional)
    {"$out": "aggregation_results"},
    # Limit the results to the first 5 documents
    {"$limit": 5},
    # Count the number of documents in the output collection
    {"$count": "totalDocuments"}
]

In [None]:
# UPDATE DOCUMENTS
# To set a new field or update an existing field, you can use the $set operator
dbm.coll.update_one(
    {"name": "Mike"},
    {"$set": {"name": "nameUpdated"}}
)
# To remove a field from a document, you can use the $unset operator
dbm.coll.update_many(
    {"name": "Mike"},
    {"$unset": {"age": 1}}
)
# To rename a field in a document, you can use the $rename operator
dbm.coll.update_many(
    {"name": "Mike"},
    {"$rename": {"numbers": "new_numbers"}}
)
# To increment a numeric field by a specified value, you can use the $inc operator
dbm.coll.update_one(
    {"name": "Mike"},
    {"$inc": {"age": 5}}
)
# To update a field with the minimum value between the existing value and a specified value, use the $min operator
dbm.coll.update_one(
    {"name": "Mike"},
    {"$min": {"age": 60}}
)
# Multiplication (Mul) Operation
dbm.coll.update_one(
    {"name": "Mike"},
    {"$mul": {"age": 1.1}}
)

In [None]:
# DELETE DOCUMENTS
# Delete a Document by its ID
dbm.coll.delete_one({"_id": document['_id']})
# Delete Documents Based on a Field Value
dbm.coll.delete_many({"age": 59})
# Delete Documents Based on Multiple Criteria
dbm.coll.delete_many({
    "name": "Mike",
    "date": "2012-05-27"
})
# Delete All Documents in a Collection
dbm.coll.delete_many({})
# Delete Fields from a Document
dbm.coll.update_one(
    {"_id": document['_id']},
    {"$unset": {"email": ""}}
)

In [None]:
# REPLACE
# Replace a Single Document Using
criteria = {"name": "Mike"}
replacement = generate_random_object()
dbm.coll.replace_one( criteria, replacement)

In [None]:
# CREATE SCHEMA in MONGO if REQUIRED
# MongoDB uses a JSON schema of BSON types
# https://www.mongodb.com/docs/manual/core/schema-validation/specify-json-schema/

# Example, schema defines a collection for storing book information

# {
#     "_id": ObjectId,          // MongoDB-generated unique identifier
#     "title": "string",        // Title of the book
#     "author": "string",       // Author's name
#     "genre": "string",        // Genre of the book
#     "published": ISODate,     // Date of publication
#     "rating": double,         // User-rated book rating
#     "reviews": [              // Array of embedded document for reviews
#         {
#             "user": "string",     // Reviewer's username
#             "comment": "string",  // Review comment
#             "rating": int         // Reviewer's rating
#         }
#     ]
# }

# Define collection schema
book_schema = {
    "validator": {
        "$jsonSchema": {
            "bsonType": "object",
            "required": ["title", "author", "genre", "published", "rating", "reviews"],
            "properties": {
                "title": {
                    "bsonType": "string",
                    "description": "Title of the book"
                },
                "author": {
                    "bsonType": "string",
                    "description": "Author's name"
                },
                "genre": {
                    "bsonType": "string",
                    "description": "Genre of the book"
                },
                "published": {
                    "bsonType": "date",
                    "description": "Date of publication"
                },
                "rating": {
                    "bsonType": "double",
                    "description": "User-rated book rating",
                    "minimum": 0,
                    "maximum": 5
                },
                "reviews": {
                    "bsonType": "array",
                    "description": "Array of embedded document for reviews",
                    "items": {
                        "bsonType": "object",
                        "required": ["user", "comment", "rating"],
                        "properties": {
                            "user": {
                                "bsonType": "string",
                                "description": "Reviewer's username"
                            },
                            "comment": {
                                "bsonType": "string",
                                "description": "Review comment"
                            },
                            "rating": {
                                "bsonType": "int",
                                "description": "Reviewer's rating",
                                "minimum": 1,
                                "maximum": 5
                            }
                        }
                    }
                }
            }
        }
    }
}


In [None]:
# generic use for setup client
client = MongoClient('localhost', 27017)
# connect to db
db_name = "mydb"
db = client[db_name] 

# Create the collection with the schema
db.create_collection("books", book_schema )