## Student MongoDB Project: E-Commerce Data Analysis
This project is designed to demonstrate your understanding of MongoDB concepts, including CRUD operations, aggregation, indexing, $lookup, schema design, and more. You will work with a sample dataset to model, query, and analyze data. The project mirrors real-world scenarios to help you consolidate your knowledge.

## Project Overview
You are tasked with building a MongoDB database for an e-commerce platform and using it to answer analytical questions. The project involves:

1. Creating collections and designing a schema.
2. Importing sample data.
3. Applying advanced queries and aggregation pipelines.
4. Answering key analytical questions.
5. Documenting and presenting your work.

1. **Set Up the Environment**
    - Install MongoDB locally or use a MongoDB cloud service (e.g., MongoDB Atlas).\
    - Install pymongo in your Python environment:\
            - pip install pymongo
    
    - If using MongoDB Atlas, configure a connection to your cloud cluster.\


# Task 1: 
## Database Setup and Data Insertion
- Create the above collections in MongoDB.
- Insert at least 20 documents into each collection using pymongo.

In [1]:
import pprint
from datetime import datetime, timedelta
from pymongo import MongoClient
from pymongo.errors import OperationFailure

In [3]:
# first connect to the server and database using the connection string
db_name = "alt_ecommerce"
conn_str = "..."
try:
    client = MongoClient(conn_str)
    db = client[db_name]
    print(f"Connection to {db_name} done successfully ")
except Exception as e:
    raise Exception(
        "The following error occurred: ", e) 

  _crypto.X509.from_cryptography(x509.load_der_x509_certificate(cert))


Connection to alt_ecommerce done successfully 


In [4]:
# Read-in collections
customers = db['customers']
orders = db["orders"]
products = db["products"]
order_items = db["order_items"]

# drop if already populated
customers.drop()
orders.drop()
products.drop()
order_items.drop()
# Create collections
customers = db['customers']
orders = db["orders"]
products = db["products"]
order_items = db["order_items"]

In [5]:
# Read sample data from JSON
with open("./sample_data/customers_collection.json", "r") as json_file:
    sample_customers_list = json.load(json_file)

with open("./sample_data/orders_collection.json", "r") as json_file:
    sample_orders_list = json.load(json_file)

with open("./sample_data/products_collection.json", "r") as json_file:
    sample_products_list = json.load(json_file)

with open("./sample_data/order_items_collection.json", "r") as json_file:
    sample_order_items_list = json.load(json_file)




# Convert the datetime strings back to datetime objects
for item in sample_orders_list:
    item["order_date"] = datetime.fromisoformat(item["order_date"])
    if item.get("delivery_date"):
        item["delivery_date"] = datetime.fromisoformat(item["delivery_date"])


In [6]:
# Insert multiple documents
def insert_many_documents(collection,doc_list):
    """Insert many documents into a collection    
    """
    try:
        result = collection.insert_many(doc_list)
        return result.inserted_ids
    except Exception as e:
        print(f"Error inserting documents: {e}")
        return None


# Insert sample documents into each collection using pymongo.
customers_insertion_result = insert_many_documents(customers, sample_customers_list)
orders_insertion_result = insert_many_documents(orders, sample_orders_list)
products_insertion_result = insert_many_documents(products, sample_products_list)
order_items_insertion_result = insert_many_documents(order_items, sample_order_items_list)

# Task 2: 
## Analytical Queries
- Use MongoDB queries and aggregation pipelines to answer the following questions:

In [8]:
pp = pprint.PrettyPrinter(indent=2)
#helper function to print
def print_results(cursor):
    for doc in cursor:
        pp.pprint(doc)

#### 1. Which product categories generate the highest revenue?

- Use \$group to calculate revenue by category.- 
Sort the results in descending order.

In [10]:
pipeline = [
    {
        '$lookup': {
            'from': "products", 
            'localField': "product_id", 
            'foreignField': "product_id", 
            'as': "product_details"
        }
    },
    {
        '$unwind': "$product_details"
    },
    {
        '$group': {
            '_id': "$product_details.category",
            'total_revenue': { '$sum': { '$multiply': ["$quantity", "$price"] } }
        }
    },
    {
        '$sort': { 'total_revenue': -1 }
    }
];

result = list(order_items.aggregate(pipeline))

print_results(result)


{'_id': 'Appliances', 'total_revenue': 25272}
{'_id': 'Electronics', 'total_revenue': 24993}
{'_id': 'Wearables', 'total_revenue': 15225}
{'_id': 'Storage', 'total_revenue': 13110}
{'_id': 'Accessories', 'total_revenue': 11762}
{'_id': 'Audio', 'total_revenue': 8610}
{'_id': 'Lighting', 'total_revenue': 8173}
{'_id': 'Networking', 'total_revenue': 2589}
{'_id': 'Furniture', 'total_revenue': 844}


#### 2. What is the average delivery time for orders?
- Calculate the difference between order_date and delivery_date.
- Use `$group` to calculate the average delivery time.

In [13]:
pipeline = [
    {
        '$match': {
            'status': 'Delivered'  # Ensure only delivered orders are considered
        }
    },
    {
        '$project': {
            '_id': 0,
            'order_id': 1,
            'delivery_time_in_days': {
                '$divide': [
                    { '$subtract': ["$delivery_date", "$order_date"] }, 
                    1000 * 60 * 60 * 24  #Convert milliseconds to days
                ]
            }
        }
    },
    {
        '$group': {
            '_id': None,
            'average_delivery_time_in_days': { '$avg': "$delivery_time_in_days" }
        }
    }
];

result = list(orders.aggregate(pipeline))
print_results(result)

{'_id': None, 'average_delivery_time_in_days': 3.8774509803921573}


#### 3. Which states have the highest number of customers?
- Use `$group` to count customers by state.
- Sort the results in descending order.

In [15]:
pipeline = [
    {
        '$group': {
            '_id': "$address.state",  # Group by state
            'customer_count': { '$sum': 1 }  # Count customers in each state
        }
    },
    {
        '$sort': { 'customer_count': -1 }  # Sort in descending order
    }
];

result = list(customers.aggregate(pipeline))
print_results(result)

{'_id': 'Arizona', 'customer_count': 4}
{'_id': 'Colorado', 'customer_count': 2}
{'_id': 'Tennessee', 'customer_count': 2}
{'_id': 'New York', 'customer_count': 2}
{'_id': 'Connecticut', 'customer_count': 1}
{'_id': 'Michigan', 'customer_count': 1}
{'_id': 'Vermont', 'customer_count': 1}
{'_id': 'Georgia', 'customer_count': 1}
{'_id': 'South Dakota', 'customer_count': 1}
{'_id': 'New Mexico', 'customer_count': 1}
{'_id': 'West Virginia', 'customer_count': 1}
{'_id': 'Alaska', 'customer_count': 1}
{'_id': 'North Dakota', 'customer_count': 1}
{'_id': 'Iowa', 'customer_count': 1}
{'_id': 'Kansas', 'customer_count': 1}
{'_id': 'Pennsylvania', 'customer_count': 1}
{'_id': 'Wyoming', 'customer_count': 1}
{'_id': 'Mississippi', 'customer_count': 1}


#### 4. What are the top 3 most expensive products sold in each order?
- Use `$lookup` and `$sort` to find the top products in each order.

In [17]:
# Use $group to calculate revenue by category.

pipeline = [
    {
        '$lookup': {
            'from': 'products',  # Join with the products collection
            'localField': 'product_id', 
            'foreignField': 'product_id', 
            'as': 'product_details'
        }
    },
    {
        '$unwind': '$product_details'  # Unpack the product details
    },
    {
        '$project': {
            'order_id': 1,
            'product_id': 1,
            'product_name': '$product_details.product_name',
            'category': '$product_details.category',
            'price': '$price'
        }
    },
    {
        '$sort': {
            'order_id': 1,  # Sort by order first
            'price': -1     # Then sort by price in descending order
        }
    },
    {
        '$group': {
            '_id': '$order_id',  # Group by order_id
            'top_products': { '$push': {
                'product_id': '$product_id',
                'product_name': '$product_name',
                'price': '$price'
            }},
        }
    },
    {
        '$project': {
            'top_products': { '$slice': ['$top_products', 3] }  # Keep only the top 3
        }
    }
];

result = list(order_items.aggregate(pipeline))

print_results(result)

{ '_id': 5014,
  'top_products': [ { 'price': 907,
                      'product_id': 105,
                      'product_name': 'Smartwatch'}]}
{ '_id': 5002,
  'top_products': [ { 'price': 1038,
                      'product_id': 119,
                      'product_name': 'Air Fryer'},
                    {'price': 211, 'product_id': 108, 'product_name': 'Mouse'}]}
{ '_id': 5016,
  'top_products': [ { 'price': 984,
                      'product_id': 107,
                      'product_name': 'Keyboard'},
                    { 'price': 636,
                      'product_id': 120,
                      'product_name': 'Electric Kettle'},
                    { 'price': 574,
                      'product_id': 115,
                      'product_name': 'Speakers'}]}
{ '_id': 5022,
  'top_products': [ { 'price': 743,
                      'product_id': 110,
                      'product_name': 'Desk Lamp'},
                    { 'price': 586,
                      'product_id': 106,


# Task 3: 
### Schema Design and Optimization
- Choose between embedded and referenced schema designs for each collection and explain why.
- Create indexes on frequently queried fields like customer_id and product_id.

#### Schema Design Choices
1. **Customers Collection**\
**Design:** Embedded\
**Reason:** A customer's address (e.g., street, city, state) is closely related to the customer and unlikely to change frequently or be queried independently.

2. **Products  Collection**\
**Design:** Referenced\
**Reason:** Products are often reused across multiple orders and have stable data. Keeping them in a separate collection ensures consistency and reduces redundancy.

3. **Orders Collection**\
**Design:** Referenced\
**Reason:** Orders involve a relationship with both customers and order items. Using references for `customer_id` allows efficient queries on customer orders without duplicating data.

4. **Order Items Collection**\
**Design:** Referenced\
**Reason:** Order items represent a many-to-many relationship between orders and products. A referenced structure avoids duplicating product details across order items.

In [20]:
# Index on customer_id speeds up queries like fetching customer details or joining with orders.
customers.create_index([('customer_id',1)])
# Index on product_id optimizes lookups when joining with order items or querying product details.
products.create_index([('product_id',1)])
orders.create_index([('customer_id',1)])
orders.create_index([('order_id',1)])
order_items.create_index([('order_id',1)])
order_items.create_index([('product_id',1)])

'product_id_1'

# Task 4: 
## Advanced Features

#### 1. Implement Transactions:
- Simulate an order creation process that updates the orders and inventory collections atomically.

In [23]:
# Start a transaction
def create_order(customer_id, items):
    """
    Simulates an order creation process that updates the `orders`, `order_items`, 
    and `products` collections atomically using a MongoDB transaction.

    Parameters
    ----------
    customer_id : int
        The ID of the customer placing the order.
    items : dict
        A dictionary where each key is a `product_id` (int) and the corresponding 
        value is the `quantity` (int) of that product being ordered.
    """
    session = client.start_session()
    try:
        session.start_transaction()

        # Step 1: Add the order to the orders collection
        max_order = orders.find_one(sort=[("order_id", -1)])
        max_order_id = max_order["order_id"] if max_order else 5000  # Default initial order ID
        order_id = max_order_id + 1

        orders.insert_one({
            "order_id": order_id,
            "customer_id": customer_id,
            "order_date": datetime.now(),
            "status": "Pending"
        }, session=session)

        # Step 2: Add items to the order_items collection and update stock
        for product_id, quantity in items.items():
            # Fetch product details
            product = products.find_one({"product_id": product_id}, {"price": 1, "stock_quantity": 1, "_id": 0})
            if not product:
                raise ValueError(f"Product with ID {product_id} does not exist.")
            if product["stock_quantity"] < quantity:
                raise ValueError(f"Insufficient stock for product ID {product_id}. Available: {product['stock_quantity']}, Requested: {quantity}.")

            # Generate order_item_id
            max_order_item = order_items.find_one(sort=[("order_item_id", -1)])
            max_order_item_id = max_order_item["order_item_id"] if max_order_item else 9000  # Default initial order item ID
            order_item_id = max_order_item_id + 1

            # Insert order item
            order_items.insert_one({
                "order_item_id": order_item_id,
                "order_id": order_id,
                "product_id": product_id,
                "quantity": quantity,
                "price": product["price"]
            }, session=session)

            # Update stock quantity
            products.update_one({"product_id": product_id}, {"$inc": {"stock_quantity": -quantity}}, session=session)

        # Commit the transaction
        session.commit_transaction()
        print(f"Order {order_id} created successfully.")

    except (OperationFailure, ValueError) as e:
        # Abort the transaction in case of failure
        session.abort_transaction()
        print(f"Transaction failed: {e}")
    finally:
        session.end_session()

In [24]:
# make an order
create_order(customer_id=4, items = {103:2, 104:3})

# fetch the last order
orders.find_one(sort=[("order_id", -1)])

Order 5030 created successfully.


{'_id': ObjectId('676617aec06ac745131eacde'),
 'order_id': 5030,
 'customer_id': 4,
 'order_date': datetime.datetime(2024, 12, 21, 2, 19, 42, 640000),
 'status': 'Pending'}

#### 2. Use Change Streams:
- Monitor real-time changes in the orders collection.

In [26]:
import threading

# function to monitor changes
def monitor_orders():
    with orders.watch() as stream:
        for change in stream:
            print(f"Change detected: {change}")

# start monitoring in a separate thread
monitor_thread = threading.Thread(target=monitor_orders, daemon=True)
monitor_thread.start()

In [27]:
# Add a new order to trigger a change
create_order(customer_id=2, items = {110:3})

Change detected: {'txnNumber': 1, 'lsid': {'id': Binary(b'x\xabk\x08\xcf(J\xbd\x8b\xa0\xa1\x17[o\xde\x84', 4), 'uid': b'u\xda\xa8\xb7\xaf/.\x90T>s\xae\xa8X\xf6q\xd0\xa5\xbf\xf7o\xfe\xf1}eT\xb0\x92v\x07\x12\xc1'}, '_id': {'_data': '82676617B2000000032B042C0100296E5A100405671B08A698478492A13C4AA15E4D52463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064676617B1C06AC745131EACE1000004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1734743986, 3), 'wallTime': datetime.datetime(2024, 12, 21, 1, 19, 46, 225000), 'fullDocument': {'_id': ObjectId('676617b1c06ac745131eace1'), 'order_id': 5031, 'customer_id': 2, 'order_date': datetime.datetime(2024, 12, 21, 2, 19, 45, 236000), 'status': 'Pending'}, 'ns': {'db': 'alt_ecommerce', 'coll': 'orders'}, 'documentKey': {'_id': ObjectId('676617b1c06ac745131eace1')}}
Order 5031 created successfully.


#### 3. Apply Schema Validation:
- Ensure all documents in the products collection include a valid price.

In [29]:
schema = {
    "$jsonSchema": {
        "bsonType": "object",
        "required": ["product_id", "product_name", "category", "price", "stock_quantity"],
        "properties": {
            "product_id": {"bsonType": "int"},
            "product_name": {"bsonType": "string"},
            "category": {"bsonType": "string"},
            "price": {
                "bsonType": "double",
                "minimum": 0,
                "description": "Price must be a positive number"
                },
            "stock_quantity": {
                "bsonType": "int",
                "minimum": 0,
                "description": "inventory must be a positive number"
                }
        }
    }
}

# Apply the schema validation
db.command({
    "collMod": "products",
    "validator": schema,
    "validationLevel": "strict"
})
print("Schema validation applied to the products collection.")


Schema validation applied to the products collection.
