# Advanced Queries with gdmongolite

Master complex MongoDB operations including joins, aggregations, statistical analysis, and query optimization.

## What you'll learn:
- Join collections like SQL databases
- Build complex aggregation pipelines
- Perform statistical analysis
- Time series operations
- Geospatial queries
- Query optimization and indexing

## Setup and Sample Data

In [None]:
from gdmongolite import DB, Schema, Email, FieldTypes
from gdmongolite.advanced_queries import JoinBuilder, AggregationBuilder, CommonAggregations
from datetime import datetime, timedelta
import random

# Initialize database
db = DB()

print("Advanced queries module loaded!")

In [None]:
# Define related schemas for complex queries
class User(Schema):
    name: FieldTypes.Name
    email: Email
    age: FieldTypes.Age
    city: str
    country: str
    registration_date: datetime
    is_premium: bool = False

class Category(Schema):
    name: str
    description: str
    parent_id: str = None

class Product(Schema):
    name: FieldTypes.Title
    description: FieldTypes.Description
    price: FieldTypes.Price
    category_id: str
    tags: list[str] = []
    rating: FieldTypes.Rating = 0.0
    reviews_count: int = 0
    created_at: datetime

class Order(Schema):
    user_id: str
    product_id: str
    quantity: int
    unit_price: FieldTypes.Price
    total_amount: FieldTypes.Price
    order_date: datetime
    status: str = "pending"  # pending, confirmed, shipped, delivered
    shipping_address: dict = {}

class Review(Schema):
    user_id: str
    product_id: str
    rating: FieldTypes.Rating
    comment: str
    helpful_votes: int = 0
    created_at: datetime

# Register all schemas
for schema in [User, Category, Product, Order, Review]:
    db.register_schema(schema)

print("Schemas registered!")

In [None]:
# Create sample data for complex queries
import random
from datetime import datetime, timedelta

# Sample categories
categories_data = [
    {"name": "Electronics", "description": "Electronic devices and gadgets"},
    {"name": "Books", "description": "Books and literature"},
    {"name": "Clothing", "description": "Apparel and fashion"},
    {"name": "Home", "description": "Home and garden items"},
    {"name": "Sports", "description": "Sports and outdoor equipment"}
]

category_response = await db.Category.insert(categories_data)
categories = await db.Category.find().to_list()
category_ids = [str(cat["_id"]) for cat in categories]

print(f"Created {len(categories)} categories")

# Sample users
cities = ["New York", "London", "Tokyo", "Paris", "Sydney", "Toronto", "Berlin", "Mumbai"]
countries = ["USA", "UK", "Japan", "France", "Australia", "Canada", "Germany", "India"]

users_data = []
for i in range(50):
    city_country = random.choice(list(zip(cities, countries)))
    users_data.append({
        "name": f"User {i+1}",
        "email": f"user{i+1}@example.com",
        "age": random.randint(18, 65),
        "city": city_country[0],
        "country": city_country[1],
        "registration_date": datetime.now() - timedelta(days=random.randint(1, 365)),
        "is_premium": random.choice([True, False])
    })

user_response = await db.User.insert(users_data)
users = await db.User.find().to_list()
user_ids = [str(user["_id"]) for user in users]

print(f"Created {len(users)} users")

# Sample products
product_names = [
    "Laptop Pro", "Smartphone X", "Wireless Headphones", "Smart Watch", "Tablet Ultra",
    "Python Programming Book", "Data Science Guide", "Fiction Novel", "Cookbook", "Biography",
    "T-Shirt", "Jeans", "Sneakers", "Jacket", "Dress",
    "Coffee Maker", "Vacuum Cleaner", "Lamp", "Chair", "Desk",
    "Tennis Racket", "Basketball", "Yoga Mat", "Dumbbells", "Bicycle"
]

products_data = []
for i, name in enumerate(product_names):
    products_data.append({
        "name": name,
        "description": f"High quality {name.lower()} for everyday use",
        "price": round(random.uniform(10, 1000), 2),
        "category_id": random.choice(category_ids),
        "tags": random.sample(["popular", "new", "sale", "premium", "bestseller"], k=random.randint(1, 3)),
        "rating": round(random.uniform(3.0, 5.0), 1),
        "reviews_count": random.randint(0, 100),
        "created_at": datetime.now() - timedelta(days=random.randint(1, 180))
    })

product_response = await db.Product.insert(products_data)
products = await db.Product.find().to_list()
product_ids = [str(product["_id"]) for product in products]

print(f"Created {len(products)} products")

# Sample orders
orders_data = []
for i in range(200):
    product = random.choice(products)
    quantity = random.randint(1, 5)
    unit_price = product["price"]
    
    orders_data.append({
        "user_id": random.choice(user_ids),
        "product_id": str(product["_id"]),
        "quantity": quantity,
        "unit_price": unit_price,
        "total_amount": unit_price * quantity,
        "order_date": datetime.now() - timedelta(days=random.randint(1, 90)),
        "status": random.choice(["pending", "confirmed", "shipped", "delivered"]),
        "shipping_address": {
            "street": f"{random.randint(1, 999)} Main St",
            "city": random.choice(cities),
            "country": random.choice(countries)
        }
    })

order_response = await db.Order.insert(orders_data)
print(f"Created {len(orders_data)} orders")

# Sample reviews
reviews_data = []
for i in range(150):
    reviews_data.append({
        "user_id": random.choice(user_ids),
        "product_id": random.choice(product_ids),
        "rating": random.randint(1, 5),
        "comment": random.choice([
            "Great product, highly recommended!",
            "Good value for money",
            "Could be better",
            "Excellent quality",
            "Not what I expected",
            "Perfect for my needs",
            "Fast shipping, good product"
        ]),
        "helpful_votes": random.randint(0, 20),
        "created_at": datetime.now() - timedelta(days=random.randint(1, 60))
    })

review_response = await db.Review.insert(reviews_data)
print(f"Created {len(reviews_data)} reviews")

print("\nSample data created successfully!")

## 1. JOIN Operations (Like SQL JOINs)

Join collections together to get related data:

In [None]:
# Join orders with user information
orders_with_users = await (
    db.Order.join(db.User, "user_id", "_id", "user_info")
    .where(status="delivered")
    .select("total_amount", "order_date", "user_info.name", "user_info.email", "user_info.city")
    .sort(order_date=-1)
    .limit(10)
    .execute()
)

print("Orders with User Information:")
for order in orders_with_users:
    user = order.get("user_info", [{}])[0] if order.get("user_info") else {}
    print(f"  ${order['total_amount']:.2f} - {user.get('name', 'Unknown')} ({user.get('city', 'Unknown')})")

In [None]:
# Join products with category information (one-to-one join)
products_with_categories = await (
    db.Product.join_one(db.Category, "category_id", "_id", "category")
    .where(rating__gte=4.0)
    .select("name", "price", "rating", "category.name")
    .sort(rating=-1)
    .limit(10)
    .execute()
)

print("\nTop Rated Products with Categories:")
for product in products_with_categories:
    category_name = product.get("category", {}).get("name", "Unknown")
    print(f"  {product['name']} - ${product['price']:.2f} ({category_name}) - {product['rating']} stars")

In [None]:
# Complex multi-join: Orders with User and Product information
complex_join = await (
    db.Order.join_one(db.User, "user_id", "_id", "user")
    .join_one(db.Product, "product_id", "_id", "product")
    .where(status="delivered", total_amount__gte=100)
    .select(
        "total_amount", "quantity", "order_date",
        "user.name", "user.city", "user.is_premium",
        "product.name", "product.price", "product.rating"
    )
    .sort(total_amount=-1)
    .limit(5)
    .execute()
)

print("\nHigh Value Orders (Multi-Join):")
for order in complex_join:
    user = order.get("user", {})
    product = order.get("product", {})
    premium = "Premium" if user.get("is_premium") else "Regular"
    print(f"  ${order['total_amount']:.2f} - {user.get('name')} ({premium}) bought {product.get('name')}")

## 2. Aggregation Pipelines

Build complex data processing pipelines:

In [None]:
# Sales by category
sales_by_category = await (
    db.Order.aggregate()
    .match(status="delivered")
    .lookup("products", "product_id", "_id", "product_info")
    .unwind("product_info")
    .lookup("categories", "product_info.category_id", "_id", "category_info")
    .unwind("category_info")
    .group(
        "$category_info.name",
        total_sales={"$sum": "$total_amount"},
        order_count={"$sum": 1},
        avg_order_value={"$avg": "$total_amount"}
    )
    .sort(total_sales=-1)
    .execute()
)

print("Sales by Category:")
for category in sales_by_category:
    print(f"  {category['_id']}: ${category['total_sales']:.2f} ({category['order_count']} orders, avg: ${category['avg_order_value']:.2f})")

In [None]:
# Customer segmentation by purchase behavior
customer_segments = await (
    db.Order.aggregate()
    .match(status="delivered")
    .group(
        "$user_id",
        total_spent={"$sum": "$total_amount"},
        order_count={"$sum": 1},
        avg_order_value={"$avg": "$total_amount"},
        first_order={"$min": "$order_date"},
        last_order={"$max": "$order_date"}
    )
    .add_fields(
        customer_lifetime_days={
            "$divide": [
                {"$subtract": ["$last_order", "$first_order"]},
                86400000  # milliseconds in a day
            ]
        },
        segment={
            "$switch": {
                "branches": [
                    {"case": {"$gte": ["$total_spent", 500]}, "then": "VIP"},
                    {"case": {"$gte": ["$total_spent", 200]}, "then": "Premium"},
                    {"case": {"$gte": ["$order_count", 5]}, "then": "Loyal"},
                ],
                "default": "Regular"
            }
        }
    )
    .group(
        "$segment",
        customer_count={"$sum": 1},
        avg_total_spent={"$avg": "$total_spent"},
        avg_orders={"$avg": "$order_count"}
    )
    .sort(avg_total_spent=-1)
    .execute()
)

print("\nCustomer Segments:")
for segment in customer_segments:
    print(f"  {segment['_id']}: {segment['customer_count']} customers, avg spent: ${segment['avg_total_spent']:.2f}, avg orders: {segment['avg_orders']:.1f}")

In [None]:
# Product performance analysis
product_performance = await (
    db.Order.aggregate()
    .match(status="delivered")
    .group(
        "$product_id",
        total_revenue={"$sum": "$total_amount"},
        units_sold={"$sum": "$quantity"},
        order_count={"$sum": 1}
    )
    .lookup("products", "_id", "_id", "product_info")
    .unwind("product_info")
    .project(
        product_name="$product_info.name",
        product_price="$product_info.price",
        total_revenue=1,
        units_sold=1,
        order_count=1,
        revenue_per_unit={"$divide": ["$total_revenue", "$units_sold"]}
    )
    .sort(total_revenue=-1)
    .limit(10)
    .execute()
)

print("\nTop Products by Revenue:")
for product in product_performance:
    print(f"  {product['product_name']}: ${product['total_revenue']:.2f} ({product['units_sold']} units, {product['order_count']} orders)")

## 3. Statistical Analysis

Perform statistical operations on your data:

In [None]:
# Order value statistics
order_stats = await (
    db.Order.aggregate()
    .match(status="delivered")
    .stats("total_amount")
    .execute()
)

stats = order_stats[0] if order_stats else {}
print("Order Value Statistics:")
print(f"  Total Orders: {stats.get('count', 0)}")
print(f"  Average Order: ${stats.get('avg', 0):.2f}")
print(f"  Min Order: ${stats.get('min', 0):.2f}")
print(f"  Max Order: ${stats.get('max', 0):.2f}")
print(f"  Total Revenue: ${stats.get('sum', 0):.2f}")
print(f"  Standard Deviation: ${stats.get('stdDev', 0):.2f}")

In [None]:
# Age distribution histogram
age_histogram = await (
    db.User.aggregate()
    .histogram("age", buckets=5)
    .execute()
)

print("\nAge Distribution:")
for bucket in age_histogram:
    min_age = bucket['_id']['min']
    max_age = bucket['_id']['max']
    count = bucket['count']
    print(f"  Ages {min_age}-{max_age}: {count} users")

In [None]:
# Product rating analysis
rating_analysis = await (
    db.Product.aggregate()
    .bucket(
        "rating",
        boundaries=[0, 2, 3, 4, 5],
        default="unrated",
        output={
            "count": {"$sum": 1},
            "avg_price": {"$avg": "$price"},
            "products": {"$push": "$name"}
        }
    )
    .execute()
)

print("\nProduct Rating Distribution:")
rating_labels = {0: "Poor (0-2)", 2: "Fair (2-3)", 3: "Good (3-4)", 4: "Excellent (4-5)"}
for bucket in rating_analysis:
    label = rating_labels.get(bucket['_id'], f"Rating {bucket['_id']}")
    print(f"  {label}: {bucket['count']} products, avg price: ${bucket['avg_price']:.2f}")

## 4. Time Series Analysis

Analyze data over time periods:

In [None]:
# Daily sales trend
daily_sales = await (
    db.Order.aggregate()
    .match(status="delivered")
    .date_histogram("order_date", "day")
    .add_fields(total_revenue={"$sum": "$total_amount"})
    .sort(date=1)
    .limit(30)  # Last 30 days
    .execute()
)

print("Daily Sales (Last 30 days):")
for day in daily_sales[-10:]:  # Show last 10 days
    print(f"  {day['_id']}: {day['count']} orders, ${day.get('total_revenue', 0):.2f} revenue")

In [None]:
# Monthly user registration trend
monthly_registrations = await (
    db.User.aggregate()
    .date_histogram("registration_date", "month")
    .add_fields(
        premium_users={"$sum": {"$cond": ["$is_premium", 1, 0]}},
        regular_users={"$sum": {"$cond": ["$is_premium", 0, 1]}}
    )
    .sort(date=1)
    .execute()
)

print("\nMonthly User Registrations:")
for month in monthly_registrations:
    total = month['count']
    premium = month.get('premium_users', 0)
    regular = month.get('regular_users', 0)
    print(f"  {month['_id']}: {total} users ({premium} premium, {regular} regular)")

In [None]:
# Cohort analysis - user retention by registration month
cohort_analysis = await (
    db.Order.aggregate()
    .lookup("users", "user_id", "_id", "user_info")
    .unwind("user_info")
    .add_fields(
        registration_month={
            "$dateToString": {
                "format": "%Y-%m",
                "date": "$user_info.registration_date"
            }
        },
        order_month={
            "$dateToString": {
                "format": "%Y-%m",
                "date": "$order_date"
            }
        }
    )
    .group(
        {"registration_month": "$registration_month", "order_month": "$order_month"},
        active_users={"$addToSet": "$user_id"},
        total_orders={"$sum": 1}
    )
    .add_fields(active_user_count={"$size": "$active_users"})
    .sort(registration_month=1, order_month=1)
    .limit(20)
    .execute()
)

print("\nCohort Analysis (Registration vs Order Month):")
for cohort in cohort_analysis[:10]:
    reg_month = cohort['_id']['registration_month']
    order_month = cohort['_id']['order_month']
    active_users = cohort['active_user_count']
    print(f"  Users from {reg_month} active in {order_month}: {active_users} users")

## 5. Geographic Analysis

Analyze data by geographic dimensions:

In [None]:
# Sales by country
sales_by_country = await (
    db.Order.aggregate()
    .match(status="delivered")
    .lookup("users", "user_id", "_id", "user_info")
    .unwind("user_info")
    .group(
        "$user_info.country",
        total_sales={"$sum": "$total_amount"},
        order_count={"$sum": 1},
        unique_customers={"$addToSet": "$user_id"}
    )
    .add_fields(customer_count={"$size": "$unique_customers"})
    .project(
        total_sales=1,
        order_count=1,
        customer_count=1,
        avg_order_value={"$divide": ["$total_sales", "$order_count"]},
        sales_per_customer={"$divide": ["$total_sales", "$customer_count"]}
    )
    .sort(total_sales=-1)
    .execute()
)

print("Sales by Country:")
for country in sales_by_country:
    name = country['_id']
    sales = country['total_sales']
    customers = country['customer_count']
    avg_order = country['avg_order_value']
    print(f"  {name}: ${sales:.2f} ({customers} customers, avg order: ${avg_order:.2f})")

In [None]:
# City-wise user demographics
city_demographics = await (
    db.User.aggregate()
    .group(
        "$city",
        user_count={"$sum": 1},
        avg_age={"$avg": "$age"},
        premium_users={"$sum": {"$cond": ["$is_premium", 1, 0]}},
        age_groups={
            "$push": {
                "$switch": {
                    "branches": [
                        {"case": {"$lt": ["$age", 25]}, "then": "18-24"},
                        {"case": {"$lt": ["$age", 35]}, "then": "25-34"},
                        {"case": {"$lt": ["$age", 45]}, "then": "35-44"},
                        {"case": {"$lt": ["$age", 55]}, "then": "45-54"},
                    ],
                    "default": "55+"
                }
            }
        }
    )
    .add_fields(
        premium_percentage={"$multiply": [{"$divide": ["$premium_users", "$user_count"]}, 100]}
    )
    .sort(user_count=-1)
    .execute()
)

print("\nCity Demographics:")
for city in city_demographics:
    name = city['_id']
    count = city['user_count']
    avg_age = city['avg_age']
    premium_pct = city['premium_percentage']
    print(f"  {name}: {count} users, avg age: {avg_age:.1f}, premium: {premium_pct:.1f}%")

## 6. Advanced Text Analysis

Analyze text data and reviews:

In [None]:
# Review sentiment analysis (simple keyword-based)
review_sentiment = await (
    db.Review.aggregate()
    .add_fields(
        sentiment={
            "$switch": {
                "branches": [
                    {
                        "case": {
                            "$or": [
                                {"$regexMatch": {"input": "$comment", "regex": "great|excellent|amazing|perfect", "options": "i"}},
                                {"$gte": ["$rating", 4]}
                            ]
                        },
                        "then": "positive"
                    },
                    {
                        "case": {
                            "$or": [
                                {"$regexMatch": {"input": "$comment", "regex": "bad|terrible|awful|worst", "options": "i"}},
                                {"$lte": ["$rating", 2]}
                            ]
                        },
                        "then": "negative"
                    }
                ],
                "default": "neutral"
            }
        }
    )
    .group(
        "$sentiment",
        count={"$sum": 1},
        avg_rating={"$avg": "$rating"},
        avg_helpful_votes={"$avg": "$helpful_votes"}
    )
    .sort(count=-1)
    .execute()
)

print("Review Sentiment Analysis:")
for sentiment in review_sentiment:
    name = sentiment['_id']
    count = sentiment['count']
    avg_rating = sentiment['avg_rating']
    avg_helpful = sentiment['avg_helpful_votes']
    print(f"  {name.title()}: {count} reviews, avg rating: {avg_rating:.1f}, avg helpful votes: {avg_helpful:.1f}")

In [None]:
# Most helpful reviews by product
helpful_reviews = await (
    db.Review.aggregate()
    .match(helpful_votes__gte=5)
    .lookup("products", "product_id", "_id", "product_info")
    .unwind("product_info")
    .lookup("users", "user_id", "_id", "user_info")
    .unwind("user_info")
    .project(
        product_name="$product_info.name",
        user_name="$user_info.name",
        rating=1,
        comment=1,
        helpful_votes=1,
        created_at=1
    )
    .sort(helpful_votes=-1)
    .limit(5)
    .execute()
)

print("\nMost Helpful Reviews:")
for review in helpful_reviews:
    product = review['product_name']
    user = review['user_name']
    rating = review['rating']
    helpful = review['helpful_votes']
    comment = review['comment'][:50] + "..." if len(review['comment']) > 50 else review['comment']
    print(f"  {product} - {user} ({rating} stars, {helpful} helpful): {comment}")

## 7. Common Business Analytics

Use pre-built aggregation patterns:

In [None]:
# Top customers analysis
top_customers = await (
    CommonAggregations.top_customers(
        db.Order,
        customer_field="user_id",
        amount_field="total_amount",
        limit=10
    )
    .lookup("users", "_id", "_id", "user_info")
    .unwind("user_info")
    .project(
        customer_name="$user_info.name",
        customer_email="$user_info.email",
        customer_city="$user_info.city",
        is_premium="$user_info.is_premium",
        total_spent="$total",
        order_count="$count",
        avg_order_value={"$divide": ["$total", "$count"]}
    )
    .execute()
)

print("Top Customers by Total Spending:")
for customer in top_customers:
    name = customer['customer_name']
    total = customer['total_spent']
    orders = customer['order_count']
    avg = customer['avg_order_value']
    premium = "Premium" if customer['is_premium'] else "Regular"
    city = customer['customer_city']
    print(f"  {name} ({city}, {premium}): ${total:.2f} ({orders} orders, avg: ${avg:.2f})")

In [None]:
# Category performance statistics
category_stats = await (
    CommonAggregations.category_stats(
        db.Product,
        category_field="category_id",
        value_field="price"
    )
    .lookup("categories", "_id", "_id", "category_info")
    .unwind("category_info")
    .project(
        category_name="$category_info.name",
        product_count="$count",
        total_value="$total",
        avg_price="$avg",
        min_price="$min",
        max_price="$max"
    )
    .sort(product_count=-1)
    .execute()
)

print("\nCategory Statistics:")
for category in category_stats:
    name = category['category_name']
    count = category['product_count']
    avg_price = category['avg_price']
    min_price = category['min_price']
    max_price = category['max_price']
    print(f"  {name}: {count} products, avg: ${avg_price:.2f}, range: ${min_price:.2f}-${max_price:.2f}")

## 8. Query Optimization

Analyze and optimize your queries:

In [None]:
# Analyze query performance
analyzer = db.Order.analyze()

# Explain a query
explanation = await analyzer.explain_query(status="delivered", total_amount__gte=100)

print("Query Explanation:")
print(f"  Execution time: {explanation.get('executionTimeMillis', 'N/A')} ms")
print(f"  Documents examined: {explanation.get('totalDocsExamined', 'N/A')}")
print(f"  Documents returned: {explanation.get('totalDocsReturned', 'N/A')}")
print(f"  Index used: {explanation.get('indexName', 'No index')}")

In [None]:
# Get existing indexes
indexes = await analyzer.get_indexes()

print("\nExisting Indexes:")
for index in indexes:
    name = index.get('name', 'Unknown')
    keys = index.get('key', {})
    print(f"  {name}: {keys}")

In [None]:
# Suggest indexes for common queries
common_queries = [
    {"status": "delivered"},
    {"user_id": "some_id", "status": "delivered"},
    {"order_date__gte": datetime.now(), "total_amount__gte": 100},
    {"product_id": "some_id"}
]

suggestions = await analyzer.suggest_indexes(common_queries)

print("\nIndex Suggestions:")
for suggestion in suggestions:
    index_type = suggestion['type']
    fields = suggestion['fields']
    query = suggestion['query']
    print(f"  {index_type.title()} index on {fields} for query: {query}")

## Congratulations!

You've mastered advanced MongoDB operations with gdmongolite! You now know how to:

### Joins and Relationships:
- Join collections like SQL databases
- Perform one-to-one and one-to-many joins
- Create complex multi-collection queries

### Aggregation Pipelines:
- Build complex data processing pipelines
- Group and transform data
- Calculate business metrics

### Statistical Analysis:
- Calculate statistics (min, max, avg, std dev)
- Create histograms and distributions
- Perform cohort analysis

### Time Series Analysis:
- Analyze trends over time
- Create date-based histograms
- Track user behavior patterns

### Geographic Analysis:
- Analyze data by location
- Create demographic reports
- Understand regional patterns

### Text Analysis:
- Analyze review sentiment
- Find patterns in text data
- Rank content by relevance

### Query Optimization:
- Analyze query performance
- Get index suggestions
- Optimize database operations

## Next Steps:

1. Apply these techniques to your real data
2. Create custom aggregation pipelines for your business needs
3. Set up monitoring for query performance
4. Build dashboards using these analytics
5. Explore machine learning with your aggregated data

You're now ready to handle any MongoDB challenge with gdmongolite!