# MontyDB demos

In [1]:
# Load in the required packages
import pandas as pd
from jupyterlite_simple_cors_proxy import furl, xurl

# Generate the API URL pattern
dakar_api_template = "https://www.dakar.live.worldrallyraidchampionship.com/api/{path}"

# Define the year
YEAR = 2025
# Define the category
CATEGORY = "A"

# Define the API path to the withdrawal resource
# Use a Python f-string to instantiate variable values directly
withdrawal_path = f"withdrawal-{YEAR}-{CATEGORY}"

# Define the URL
withdrawal_url = dakar_api_template.format(path=withdrawal_path)
withdrawal_url

'https://www.dakar.live.worldrallyraidchampionship.com/api/withdrawal-2025-A'

In [2]:
import requests
w_json = requests.get(xurl(withdrawal_url)).json()

In [3]:
from montydb import MontyClient

db = MontyClient(":memory:").db
col = db.test
col.insert_many( w_json )

InsertManyResult(
  'withdrawal-2025-A-9',
  'withdrawal-2025-A-6',
  'withdrawal-2025-A-2',
  'withdrawal-2025-A-3',
  'withdrawal-2025-A-4',
  'withdrawal-2025-A-8',
  'withdrawal-2025-A-5',
  'withdrawal-2025-A-7',
  'withdrawal-2025-A-11',
  'withdrawal-2025-A-12',
  'withdrawal-2025-A-10'
)

In [4]:
cur = col.find({"stage":5}, {"_id":0,"list":1, "stage":1})
"""
"list", {
    "reason": "2",
    "team.w2rc": True
}
"""
next(cur)

{'list': [{'bib': 208,
   'reason': '1',
   'team': {'bib': 208,
    'brand': 'TOYOTA',
    'model': 'HILUX',
    'vehicle': 'GURTAM TOYOTA GAZOO RACING BALTICS',
    'vehicleImg': 'https://img.aso.fr/core_app/img-motorSports-dak-png/208/194454/0:0,600:500-0-0-70/1280a',
    'clazz': '96c0869600e0013dbf5f86f60e5c4da4',
    'w2rc': False,
    'competitors': [{'name': 'B. VANAGAS',
      'firstName': 'BENEDIKTAS',
      'lastName': 'VANAGAS',
      'role': 'P',
      'gender': 'm',
      'nationality': 'ltu',
      'profil': 'https://img.aso.fr/core_app/img-motorSports-dak-png/208-p/195817/0:0,800:800-0-0-70/07130',
      'profil_sm': 'https://img.aso.fr/core_app/img-motorSports-dak-png/208-p/195817/0:0,800:800-200-0-70/aa4f1',
      'podium': 'https://img.aso.fr/core_app/img-motorSports-dak-png/208-p/192806/0:0,532:690-0-0-70/5ffab',
      'aid': '77735d6e-4d5a-421d-8a32-0b3f21629f04'},
     {'name': 'S. GOSPODARCZYK',
      'firstName': 'SZYMON',
      'lastName': 'GOSPODARCZYK',
     

In [5]:
# This breaks  on list.team.competitors.name
#cur = col.find({}, {"_id": 0, "list.reason": 1, "list.team.w2rc":1, "stage": 1, list.team.competitors.name:1})
"""
"list", {
    "reason": "2",
    "team.w2rc": True
}
"""
#next(cur)

'\n"list", {\n    "reason": "2",\n    "team.w2rc": True\n}\n'

In [19]:
[r for r in col.find({"stage": 4}, {"_id":0,"list":1, "stage":1, } )]
# Unsupported projection:  "$elemMatch": { "list.competitors.lastName": "LOEB" }
#  "list.team.competitors" :{"$elemMatch":{"role":"P"}}
#  "list":{"$elemMatch": {"competitors":{"$elemMatch": {"lastName":"LOEB"}}}}

OperationFailure: Cannot use $elemMatch projection on a nested field.

In [6]:
[r for r in col.find({"list.team.brand": 'DACIA'}, {"_id": 0})]

[{'_bind': 'withdrawal-2025-A',
  '_updatedAt': 1736877547536,
  '_parent': 'stage-2025-A:8002af597f2d091dc91450266fab52d6',
  'list': [{'bib': 219,
    'reason': '2',
    'team': {'bib': 219,
     'brand': 'DACIA',
     'model': 'SANDRIDER',
     'vehicle': 'THE DACIA SANDRIDERS',
     'vehicleImg': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219/194474/0:0,600:500-0-0-70/a91f6',
     'clazz': '96c0869600e0013dbf5f86f60e5c4da4',
     'w2rc': True,
     'competitors': [{'name': 'S. LOEB',
       'firstName': 'SEBASTIEN',
       'lastName': 'LOEB',
       'role': 'P',
       'gender': 'm',
       'nationality': 'fra',
       'profil': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219-p/195610/0:0,800:800-0-0-70/58e9a',
       'profil_sm': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219-p/195610/0:0,800:800-200-0-70/2340d',
       'podium': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219-p/192891/0:0,532:690-0-0-70/cc2b4',
       'aid': '5dd0184f-5f90-4b

__TO DO__ start to build up pipeline operator from scratch; need $unwind , $match, $project

In [7]:
from typing import List, Dict, Any, Union, Callable
import operator
from datetime import datetime

# ??DEPRECATE THIS ONE????

class MontyFilter:
    """
    A class to extend MontyDB with MongoDB-style filtering capabilities.
    """

    def __init__(self, collection):
        self.collection = collection
        self.operators = {
            "$eq": operator.eq,
            "$ne": operator.ne,
            "$gt": operator.gt,
            "$gte": operator.ge,
            "$lt": operator.lt,
            "$lte": operator.le,
            "$in": lambda x, y: x in y,
            "$nin": lambda x, y: x not in y,
            "$exists": lambda x, y: (x is not None) == y,
            "$type": lambda x, y: isinstance(x, self._get_type(y)),
            "$regex": lambda x, y: bool(re.search(y, x)) if isinstance(x, str) else False
        }

    def _get_type(self, type_name: str) -> type:
        """Map MongoDB type names to Python types."""
        type_map = {
            "string": str,
            "number": (int, float),
            "boolean": bool,
            "date": datetime,
            "array": list,
            "object": dict,
            "null": type(None)
        }
        return type_map.get(type_name, object)

    def _get_nested_value(self, obj: Dict, path: str) -> Any:
        """Get value from nested dictionary using dot notation."""
        parts = path.split('.')
        current = obj
        for part in parts:
            if isinstance(current, dict):
                current = current.get(part)
            elif isinstance(current, list) and part.isdigit():
                current = current[int(part)] if int(
                    part) < len(current) else None
            else:
                return None
            if current is None:
                return None
        return current

    def _apply_operator(self, value: Any, condition: Dict) -> bool:
        """Apply MongoDB-style operators to a value."""
        for op, comparison in condition.items():
            if op not in self.operators:
                raise ValueError(f"Unsupported operator: {op}")
            if not self.operators[op](value, comparison):
                return False
        return True

    def _evaluate_condition(self, doc: Dict, condition: Dict) -> bool:
        """Evaluate a condition against a document."""
        for field, value in condition.items():
            if field.startswith('$'):
                # Handle logical operators
                if field == '$and':
                    if not all(self._evaluate_condition(doc, cond) for cond in value):
                        return False
                elif field == '$or':
                    if not any(self._evaluate_condition(doc, cond) for cond in value):
                        return False
                elif field == '$not':
                    if self._evaluate_condition(doc, value):
                        return False
            else:
                # Handle field conditions
                actual_value = self._get_nested_value(doc, field)
                if isinstance(value, dict):
                    if not self._apply_operator(actual_value, value):
                        return False
                else:
                    if actual_value != value:
                        return False
        return True

    def filter_array(self, array_field: str, conditions: Dict) -> List[Dict]:
        """
        Filter array elements within documents based on conditions.
        
        Args:
            array_field: The field containing the array to filter
            conditions: MongoDB-style query conditions
            
        Returns:
            List of documents with filtered arrays
        """
        results = []
        for doc in self.collection.find():
            array_data = self._get_nested_value(doc, array_field)
            if not isinstance(array_data, list):
                continue

            filtered_array = [
                item for item in array_data
                if self._evaluate_condition(item, conditions)
            ]

            if filtered_array:
                new_doc = doc.copy()
                current = new_doc
                parts = array_field.split('.')
                for part in parts[:-1]:
                    current = current[part]
                current[parts[-1]] = filtered_array
                results.append(new_doc)

        return results

    def find(self, conditions: Dict) -> List[Dict]:
        """
        Find documents matching the given conditions.
        
        Args:
            conditions: MongoDB-style query conditions
            
        Returns:
            List of matching documents
        """
        return [
            doc for doc in self.collection.find()
            if self._evaluate_condition(doc, conditions)
        ]

# Example usage class for demonstration


class FilterExample:
    def __init__(self, collection):
        self.filter = MontyFilter(collection)

    def filter_by_reason(self, reason: str) -> List[Dict]:
        """Filter list items by reason."""
        return self.filter.filter_array("list", {"reason": reason})

    def filter_by_brand(self, brand: str) -> List[Dict]:
        """Filter list items by team brand."""
        return self.filter.filter_array("list", {"team.brand": brand})

    def complex_filter(self) -> List[Dict]:
        """Example of a complex filter."""
        conditions = {
            "$and": [
                {"team.w2rc": True},
                {"$or": [
                    {"team.brand": "DACIA"},
                    {"team.brand": "TOYOTA"}
                ]}
            ]
        }
        return self.filter.filter_array("list", conditions)


# Usage example:
"""
# Initialize
client = montydb.MontyClient('path/to/db')
db = client.your_database
collection = db.your_collection
filter_helper = FilterExample(collection)

# Simple filtering
dacia_entries = filter_helper.filter_by_brand("DACIA")

# Complex filtering
w2rc_entries = filter_helper.complex_filter()

# Custom filtering using MontyFilter directly
monty_filter = MontyFilter(collection)
custom_filtered = monty_filter.filter_array("list", {
    "reason": "2",
    "team.w2rc": True,
    "team.competitors": {
        "$exists": True
    }
})
"""

'\n# Initialize\nclient = montydb.MontyClient(\'path/to/db\')\ndb = client.your_database\ncollection = db.your_collection\nfilter_helper = FilterExample(collection)\n\n# Simple filtering\ndacia_entries = filter_helper.filter_by_brand("DACIA")\n\n# Complex filtering\nw2rc_entries = filter_helper.complex_filter()\n\n# Custom filtering using MontyFilter directly\nmonty_filter = MontyFilter(collection)\ncustom_filtered = monty_filter.filter_array("list", {\n    "reason": "2",\n    "team.w2rc": True,\n    "team.competitors": {\n        "$exists": True\n    }\n})\n'

In [8]:
# DEPRECATED
filter_helper = FilterExample(col)

# Simple filtering
dacia_entries = filter_helper.filter_by_brand("DACIA")
dacia_entries

[{'_id': 'withdrawal-2025-A-4',
  '_bind': 'withdrawal-2025-A',
  '_updatedAt': 1736877547536,
  '_parent': 'stage-2025-A:8002af597f2d091dc91450266fab52d6',
  'list': [{'bib': 219,
    'reason': '2',
    'team': {'bib': 219,
     'brand': 'DACIA',
     'model': 'SANDRIDER',
     'vehicle': 'THE DACIA SANDRIDERS',
     'vehicleImg': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219/194474/0:0,600:500-0-0-70/a91f6',
     'clazz': '96c0869600e0013dbf5f86f60e5c4da4',
     'w2rc': True,
     'competitors': [{'name': 'S. LOEB',
       'firstName': 'SEBASTIEN',
       'lastName': 'LOEB',
       'role': 'P',
       'gender': 'm',
       'nationality': 'fra',
       'profil': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219-p/195610/0:0,800:800-0-0-70/58e9a',
       'profil_sm': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219-p/195610/0:0,800:800-200-0-70/2340d',
       'podium': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219-p/192891/0:0,532:690-0-0-70/cc2b4',

In [9]:
# DEPRECATED
monty_filter = MontyFilter(col)
custom_filtered = monty_filter.filter_array("list", {
    "reason": "1",
    "team.w2rc": True,
    "team.competitors": {
        "$exists": True
    }
})
custom_filtered

[{'_id': 'withdrawal-2025-A-5',
  '_bind': 'withdrawal-2025-A',
  '_updatedAt': 1736877542722,
  '_parent': 'stage-2025-A:e2b95b95a4ae9063eb4924fc2d7cf7e7',
  'list': [{'bib': 309,
    'reason': '1',
    'team': {'bib': 309,
     'brand': 'TAURUS',
     'model': 'T3 MAX',
     'vehicle': 'NASSER RACING',
     'vehicleImg': 'https://img.aso.fr/core_app/img-motorSports-dak-png/309/194406/0:0,600:500-0-0-70/b16a2',
     'clazz': 'a0a6386a4b9a61b73b036a50966345c0',
     'w2rc': True,
     'competitors': [{'name': 'E. PONS',
       'firstName': 'EDUARD',
       'lastName': 'PONS',
       'role': 'P',
       'gender': 'm',
       'nationality': 'esp',
       'profil': 'https://img.aso.fr/core_app/img-motorSports-dak-png/309-p/195921/0:0,800:800-0-0-70/760ed',
       'profil_sm': 'https://img.aso.fr/core_app/img-motorSports-dak-png/309-p/195921/0:0,800:800-200-0-70/d21d9',
       'podium': 'https://img.aso.fr/core_app/img-motorSports-dak-png/309-p/192526/0:0,532:690-0-0-70/b071c',
       'aid

TO DO - in the following, `filter_array()` first queries everything using `self.collection.find()`; really, we should have a step before that with an actual query, eg as a pipleline stage, and then pass that in?

In [5]:
from typing import List, Dict, Any, Union, Callable, Optional
import operator
from datetime import datetime
import copy


class MontyFilter:
    def __init__(self, collection):
        self.collection = collection
        self.operators = {
            "$eq": operator.eq,
            "$ne": operator.ne,
            "$gt": operator.gt,
            "$gte": operator.ge,
            "$lt": operator.lt,
            "$lte": operator.le,
            "$in": lambda x, y: x in y,
            "$nin": lambda x, y: x not in y,
            "$exists": lambda x, y: (x is not None) == y,
            "$type": lambda x, y: isinstance(x, self._get_type(y)),
            "$regex": lambda x, y: bool(re.search(y, x)) if isinstance(x, str) else False
        }

    def _get_type(self, type_name: str) -> type:
        type_map = {
            "string": str,
            "number": (int, float),
            "boolean": bool,
            "date": datetime,
            "array": list,
            "object": dict,
            "null": type(None)
        }
        return type_map.get(type_name, object)

    def _get_nested_value(self, obj: Dict, path: str) -> Any:
        parts = path.split('.')
        current = obj
        for part in parts:
            if isinstance(current, dict):
                current = current.get(part)
            elif isinstance(current, list) and part.isdigit():
                current = current[int(part)] if int(
                    part) < len(current) else None
            else:
                return None
            if current is None:
                return None
        return current

    def _apply_operator(self, value: Any, condition: Dict) -> bool:
        for op, comparison in condition.items():
            if op not in self.operators:
                raise ValueError(f"Unsupported operator: {op}")
            if not self.operators[op](value, comparison):
                return False
        return True

    def _evaluate_condition(self, doc: Dict, condition: Dict) -> bool:
        for field, value in condition.items():
            if field.startswith('$'):
                if field == '$and':
                    if not all(self._evaluate_condition(doc, cond) for cond in value):
                        return False
                elif field == '$or':
                    if not any(self._evaluate_condition(doc, cond) for cond in value):
                        return False
                elif field == '$not':
                    if self._evaluate_condition(doc, value):
                        return False
            else:
                actual_value = self._get_nested_value(doc, field)
                if isinstance(value, dict):
                    if not self._apply_operator(actual_value, value):
                        return False
                else:
                    if actual_value != value:
                        return False
        return True

    def _keep_fields(self, doc: Dict, fields: List[str]) -> Dict:
        """Simple field filtering - keep only specified fields and their parents"""
        if not fields:
            return doc

        result = {'_id': doc.get('_id')}  # Always keep _id

        # For each field we want to keep
        for field in fields:
            parts = field.split('.')
            value = doc  # Start at the root
            temp_dict = result

            # Walk through the parts
            for i, part in enumerate(parts):
                if value is None or not isinstance(value, dict):
                    break

                # Get the value for this part
                value = value.get(part)

                # If it's not the last part, ensure the path exists
                if i < len(parts) - 1:
                    if part not in temp_dict:
                        temp_dict[part] = {}
                    temp_dict = temp_dict[part]
                # If it's the last part, set the value
                else:
                    temp_dict[part] = value

        return result

    def filter_array(self, array_field: str, conditions: Dict, fields: List[str] = None) -> List[Dict]:
        """
        Filter array elements within documents based on conditions.
        
        Args:
            array_field: The field containing the array to filter
            conditions: MongoDB-style query conditions
            fields: Optional list of fields to keep in the result
            
        Returns:
            List of documents with filtered arrays
        """
        results = []
        # First do the filtering
        for doc in self.collection.find():
            array_data = self._get_nested_value(doc, array_field)
            if not isinstance(array_data, list):
                continue

            filtered_array = [
                item for item in array_data
                if self._evaluate_condition(item, conditions)
            ]

            if filtered_array:
                new_doc = doc.copy()
                current = new_doc
                parts = array_field.split('.')
                for part in parts[:-1]:
                    current = current[part]
                current[parts[-1]] = filtered_array

                # If fields are specified, filter them
                if fields:
                    # Make sure to include the array field in the fields list
                    if array_field not in fields:
                        fields.append(array_field)
                    new_doc = self._keep_fields(new_doc, fields)

                results.append(new_doc)

        return results


# Example usage:
"""
monty_filter = MontyFilter(collection)

# Basic filtering (original working version)
filtered_docs = monty_filter.filter_array("list", {
    "reason": "2",
    "team.w2rc": True
})

# With field projection
filtered_docs = monty_filter.filter_array(
    array_field="list",
    conditions={"reason": "2"},
    fields=["list", "list.reason", "list.team.brand"]  # Note: 'list' is included automatically
)
"""

'\nmonty_filter = MontyFilter(collection)\n\n# Basic filtering (original working version)\nfiltered_docs = monty_filter.filter_array("list", {\n    "reason": "2",\n    "team.w2rc": True\n})\n\n# With field projection\nfiltered_docs = monty_filter.filter_array(\n    array_field="list",\n    conditions={"reason": "2"},\n    fields=["list", "list.reason", "list.team.brand"]  # Note: \'list\' is included automatically\n)\n'

In [6]:
monty_filter = MontyFilter(col)

# Basic filtering (original working version)
filtered_docs = monty_filter.filter_array("list", {
    "reason": "2",
    "team.w2rc": True
})

filtered_docs

[{'_id': 'withdrawal-2025-A-4',
  '_bind': 'withdrawal-2025-A',
  '_updatedAt': 1736877547536,
  '_parent': 'stage-2025-A:8002af597f2d091dc91450266fab52d6',
  'list': [{'bib': 219,
    'reason': '2',
    'team': {'bib': 219,
     'brand': 'DACIA',
     'model': 'SANDRIDER',
     'vehicle': 'THE DACIA SANDRIDERS',
     'vehicleImg': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219/194474/0:0,600:500-0-0-70/a91f6',
     'clazz': '96c0869600e0013dbf5f86f60e5c4da4',
     'w2rc': True,
     'competitors': [{'name': 'S. LOEB',
       'firstName': 'SEBASTIEN',
       'lastName': 'LOEB',
       'role': 'P',
       'gender': 'm',
       'nationality': 'fra',
       'profil': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219-p/195610/0:0,800:800-0-0-70/58e9a',
       'profil_sm': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219-p/195610/0:0,800:800-200-0-70/2340d',
       'podium': 'https://img.aso.fr/core_app/img-motorSports-dak-png/219-p/192891/0:0,532:690-0-0-70/cc2b4',

In [None]:
# TO DO  - use this - doesn't select everything in find
# ALSO - need to be able to use find in pipleline and then pass to this?

    def filter_array(self, array_field: str, conditions: Dict, base_query: Dict = None, fields: List[str] = None) -> List[Dict]:
        """
        Filter array elements within documents based on conditions.
        
        Args:
            array_field: The field containing the array to filter
            conditions: MongoDB-style query conditions for array elements
            base_query: Optional base query to pre-filter documents before array filtering
            fields: Optional list of fields to keep in the result
            
        Returns:
            List of documents with filtered arrays
        """
        results = []
        # Use the base query if provided, otherwise get all documents
        base_query = base_query or {}
        
        # First do the filtering
        for doc in self.collection.find(base_query):
            array_data = self._get_nested_value(doc, array_field)
            if not isinstance(array_data, list):
                continue

            filtered_array = [
                item for item in array_data
                if self._evaluate_condition(item, conditions)
            ]

            if filtered_array:
                new_doc = doc.copy()
                current = new_doc
                parts = array_field.split('.')
                for part in parts[:-1]:
                    current = current[part]
                current[parts[-1]] = filtered_array

                # If fields are specified, filter them
                if fields:
                    # Make sure to include the array field in the fields list
                    if array_field not in fields:
                        fields.append(array_field)
                    new_doc = self._keep_fields(new_doc, fields)

                results.append(new_doc)

        return results


"""
monty_filter = MontyFilter(collection)

# Basic filtering with base query
filtered_docs = monty_filter.filter_array(
    array_field="list",
    conditions={"reason": "2"},
    base_query={"status": "active"}  # Pre-filter documents
)

# With field projection and base query
filtered_docs = monty_filter.filter_array(
    array_field="list",
    conditions={"reason": "2"},
    base_query={"status": "active"},
    fields=["list.reason", "list.team.brand"]
)
"""

In [21]:
# DEPRECATE AND USE BELOW project_fields2 
def project_fields(doc: Dict, projection: Dict) -> Dict:
    """
    Project specific fields from a document based on a MongoDB-style projection.
    
    Args:
        doc: The document to project fields from
        projection: Dict with field paths as keys and 1 as values for inclusion
        
    Returns:
        Dict containing only the specified fields with their original structure
    """
    def _project_value(obj: Any, field_paths: Dict[str, List[str]]) -> Any:
        if obj is None:
            return None

        if isinstance(obj, list):
            return [_project_value(item, field_paths) for item in obj]

        if not isinstance(obj, dict):
            return obj

        result = {}
        for prefix, remaining_paths in field_paths.items():
            if prefix not in obj:
                continue

            if not remaining_paths:
                result[prefix] = obj[prefix]
            else:
                nested_paths = {}
                for path in remaining_paths:
                    parts = path.split('.', 1)
                    if len(parts) == 1:
                        nested_paths[parts[0]] = []
                    else:
                        if parts[0] not in nested_paths:
                            nested_paths[parts[0]] = []
                        nested_paths[parts[0]].append(parts[1])

                projected = _project_value(obj[prefix], nested_paths)
                if projected is not None:
                    result[prefix] = projected

        return result if result else None

    # Organize projection paths
    field_paths = {}
    for path, include in projection.items():
        if include != 1:
            continue

        parts = path.split('.', 1)
        if parts[0] not in field_paths:
            field_paths[parts[0]] = []
        if len(parts) > 1:
            field_paths[parts[0]].append(parts[1])

    # Start projection from root
    result = _project_value(doc, field_paths)

    # Always include _id
    if isinstance(result, dict):
        result['_id'] = doc.get('_id')
    else:
        result = {'_id': doc.get('_id')}

    return result

In [22]:
# project specific fields
projection = {
    "list.team.competitors.name": 1,
    "list.team.bib": 1
}

# Apply projection to each filtered document
projected_docs = [project_fields(doc, projection) for doc in filtered_docs]
projected_docs

[{'list': [{'team': {'competitors': [{'name': 'S. LOEB'},
      {'name': 'F. LURQUIN'}],
     'bib': 219}}],
  '_id': 'withdrawal-2025-A-4'}]

try with exclusion operator also

> Exclusion projections are the opposite of inclusion projections - instead of specifying which fields you want to keep (with 1), you specify which fields you want to exclude (with 0).
>
> Key features of exclusion projections:
>
> - You can specify which fields to remove instead of which to keep
> - Useful when you want to keep most fields and only remove a few
> - Cannot be mixed with inclusion projections (except for _id field)
> - The _id field is always included by default unless explicitly excluded
> - In exclusion mode, unspecified fields are kept (opposite of inclusion mode)

In [28]:
#Need to add self when used in class
def project_fields2( doc: Dict, projection: Dict) -> Dict:
    """
    Project specific fields from a document based on a MongoDB-style projection.
    
    Args:
        doc: The document to project fields from
        projection: Dict with field paths as keys and 1/0 as values for inclusion/exclusion
        
    Returns:
        Dict containing only the specified fields with their original structure
    """
    def _get_nested_value(obj: Dict, path: List[str]) -> Any:
        """Get value at nested path"""
        current = obj
        for part in path:
            if isinstance(current, dict):
                if part not in current:
                    return None
                current = current[part]
            elif isinstance(current, list):
                current = [_get_nested_value(item, [part]) for item in current]
                current = [item for item in current if item is not None]
                if not current:
                    return None
            else:
                return None
        return current

    def _set_nested_value(result: Dict, path: List[str], value: Any) -> None:
        """Set value at nested path, creating intermediate dicts as needed"""
        current = result
        for part in path[:-1]:
            if part not in current:
                current[part] = {}
            current = current[part]
        current[path[-1]] = value

    # Start with empty result
    result = {}
    
    # Handle non-_id fields
    for field_path, include in projection.items():
        if field_path == '_id':
            continue
            
        if include != 1:  # Skip if not inclusion
            continue
            
        path_parts = field_path.split('.')
        value = _get_nested_value(doc, path_parts)
        
        if value is not None:
            _set_nested_value(result, path_parts, value)
    
    # Handle _id separately
    if projection.get('_id', 1) == 1:
        result['_id'] = doc.get('_id')
        
    return result

In [29]:
# project specific fields
projection = {
    "list.team.competitors.name": 1,
    "list.team.bib": 1,"_id":0
}

# Apply projection to each filtered document
projected_docs = [project_fields2(doc, projection) for doc in filtered_docs]
projected_docs

[{'list': {'team': {'competitors': {'name': [['S. LOEB', 'F. LURQUIN']]},
    'bib': [219]}}}]

In [None]:
# Not tried yet - pipeline
from typing import List, Dict, Any, Callable, Union
from functools import reduce


class MontyPipeline:
    """Pipeline operator for chaining MontyFilter operations"""

    def __init__(self, collection):
        self.collection = collection
        self.filter = MontyFilter(collection)

    class Operation:
        def __init__(self, func: Callable, args: tuple, kwargs: dict):
            self.func = func
            self.args = args
            self.kwargs = kwargs

        def __call__(self, docs):
            return self.func(*self.args, **self.kwargs)

    def filter_array(self, array_field: str, conditions: Dict) -> 'Operation':
        """Create a filter array operation"""
        def _filter(docs=None):
            if docs is None:
                return self.filter.filter_array(array_field, conditions)
            # If we have incoming docs, create a temporary collection wrapper
            temp_filter = MontyFilter(
                type('TempCollection', (), {'find': lambda: docs})())
            return temp_filter.filter_array(array_field, conditions)
        return self.Operation(_filter, (), {})

    def project(self, projection: Dict) -> 'Operation':
        """Create a project operation"""
        def _project(docs):
            return [self.filter.project_fields(doc, projection) for doc in docs]
        return self.Operation(_project, (), {})

    def execute(self, operations: List[Operation]) -> List[Dict]:
        """Execute a pipeline of operations"""
        def _apply_operation(docs, operation):
            return operation(docs)

        # Start with None for the first operation to handle initial collection access
        return reduce(_apply_operation, operations, None)

# Helper function to make pipeline creation more readable


def pipeline(collection, operations: List[Operation]) -> List[Dict]:
    """Create and execute a pipeline of operations"""
    return MontyPipeline(collection).execute(operations)

# Simple usage

```python
result = pipeline(collection, [
    MontyPipeline(collection).filter_array("list", {
        "reason": "2",
        "team.w2rc": True
    }),
    MontyPipeline(collection).project({
        "list.team.competitors.name": 1,
        "list.team.bib": 1,
        "_id": 0
    })
])

# Or create a pipeline instance for multiple uses
mp = MontyPipeline(collection)
result = pipeline(collection, [
    mp.filter_array("list", {"reason": "2", "team.w2rc": True}),
    mp.project({
        "list.team.competitors.name": 1,
        "list.team.bib": 1,
        "_id": 0
    })
])
```