Skip to content

Commit

Permalink
Allow chainable search in pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
VianneyMI committed Nov 3, 2023
1 parent 56a89d0 commit 176948d
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 35 deletions.
117 changes: 82 additions & 35 deletions monggregate/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from monggregate.base import BaseModel
from monggregate.stages import (
AnyStage,
Stage,
BucketAuto,
GranularityEnum,
Expand Down Expand Up @@ -104,7 +105,7 @@ class Pipeline(BaseModel): # pylint: disable=too-many-public-methods
# name of the collection to run the pipeline on
collection : str | None =None
# list of stages that compose the pipeline
stages : list[Stage] = []
stages : list[AnyStage] = []



Expand Down Expand Up @@ -204,12 +205,12 @@ def __add__(self, other:Self)->Self:
stages=self.stages + other.stages
)

def __getitem__(self, index:int)->Stage:
def __getitem__(self, index:int)->AnyStage:
"""Returns a stage from the pipeline"""
# https://realpython.com/inherit-python-list/
return self.stages[index]

def __setitem__(self, index:int, stage:Stage)->None:
def __setitem__(self, index:int, stage:AnyStage)->None:
"""Sets a stage in the pipeline"""
self.stages[index] = stage

Expand All @@ -221,15 +222,15 @@ def __len__(self)->int:
"""Returns the length of the pipeline"""
return len(self.stages)

def append(self, stage:Stage)->None:
def append(self, stage:AnyStage)->None:
"""Appends a stage to the pipeline"""
self.stages.append(stage)

def insert(self, index:int, stage:Stage)->None:
def insert(self, index:int, stage:AnyStage)->None:
"""Inserts a stage in the pipeline"""
self.stages.insert(index, stage)

def extend(self, stages:list[Stage])->None:
def extend(self, stages:list[AnyStage])->None:
"""Extends the pipeline with a list of stages"""
self.stages.extend(stages)

Expand Down Expand Up @@ -706,12 +707,14 @@ def sample(self, value:int)->Self:
self.stages.append(
Sample(value=value)
)

return self

# TODO : Factorize the below functions <VM, 03/11/2023>
def search(
self,
path:str|list[str]=None,
query:str|list[str]=None,
path:str|list[str]|None=None,
query:str|list[str]|None=None,
*,
operator_name:OperatorLiteral="text",
index:str="default",
Expand Down Expand Up @@ -756,28 +759,50 @@ def search(
- like, dict|list[dict] (allow looking for similar documents)
"""

self.stages.append(
Search.from_operator(
operator_name=operator_name,
path=path,
query=query,
index=index,
count=count,
highlight=highlight,
return_stored_source=return_stored_source,
score_details=score_details,
**kwargs
# If pipeline is empty, adds a search stage
if len(self) == 0:
self.stages.append(
Search.from_operator(
operator_name=operator_name,
path=path,
query=query,
index=index,
count=count,
highlight=highlight,
return_stored_source=return_stored_source,
score_details=score_details,
**kwargs
)
)
)


# If pipeline is not empty then the first stage must be Search stage.
# If so, adds the operator to the existing stage using Compound.
elif len(self) >= 1:
first_stage = self[0]
# first_stage operator is Compound, then just adds a clause to the Compound operator.
if isinstance(first_stage, Search) and isinstance(first_stage.operator, Compound):
first_stage.should(operator_name=operator_name, path=path, query=query, **kwargs)
# If first stage is Search but its operator is not Compound,
# then creates a Compound operator and adds the first stage operator as a clause
elif isinstance(first_stage, Search) and not isinstance(first_stage.operator, Compound):
new_operator = Compound(should=[first_stage.operator], minimum_should_match=1)
new_operator.should_(operator_name=operator_name, path=path, query=query, **kwargs)
first_stage.operator = new_operator
else:
# If the fisrt stage is not a search stage, raises an error
raise TypeError("search stage has to be the first stage of the pipeline")

# The below case is theoretically impossible
else:
raise ValueError("pipeline lenght cannot be negative")

return self


def search_meta(
self,
path:str|list[str]=None,
query:str|list[str]=None,
path:str|list[str]|None=None,
query:str|list[str]|None=None,
*,
operator_name:OperatorLiteral="text",
index:str="default",
Expand Down Expand Up @@ -822,20 +847,42 @@ def search_meta(
- like, dict|list[dict] (allow looking for similar documents)
"""

self.stages.append(
SearchMeta.from_operator(
operator_name=operator_name,
path=path,
query=query,
index=index,
count=count,
highlight=highlight,
return_stored_source=return_stored_source,
score_details=score_details,
**kwargs
# If pipeline is empty, adds a search stage
if len(self) == 0:
self.stages.append(
SearchMeta.from_operator(
operator_name=operator_name,
path=path,
query=query,
index=index,
count=count,
highlight=highlight,
return_stored_source=return_stored_source,
score_details=score_details,
**kwargs
)
)
)

# If pipeline is not empty then the first stage must be Search stage.
# If so, adds the operator to the existing stage using Compound.
elif len(self) >= 1:
first_stage = self[0]
# first_stage operator is Compound, then just adds a clause to the Compound operator.
if isinstance(first_stage, SearchMeta) and isinstance(first_stage.operator, Compound):
first_stage.should(operator_name=operator_name, path=path, query=query, **kwargs)
# If first stage is Search but its operator is not Compound,
# then creates a Compound operator and adds the first stage operator as a clause
elif isinstance(first_stage, SearchMeta) and not isinstance(first_stage.operator, Compound):
new_operator = Compound(should=[first_stage.operator], minimum_should_match=1)
new_operator.should_(operator_name=operator_name, path=path, query=query, **kwargs)
first_stage.operator = new_operator
else:
# If the fisrt stage is not a search stage, raises an error
raise TypeError("search stage has to be the first stage of the pipeline")

# The below case is theoretically impossible
else:
raise ValueError("pipeline lenght cannot be negative")

return self

Expand Down
24 changes: 24 additions & 0 deletions monggregate/stages/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Stage Sub-package"""

from typing import Union
from monggregate.stages.stage import Stage
from monggregate.stages.bucket_auto import BucketAuto, GranularityEnum
from monggregate.stages.bucket import Bucket
Expand Down Expand Up @@ -43,3 +44,26 @@

# Custom aliases
Explode = Unwind # to match pandas equivalent operation

AnyStage = Union[
BucketAuto,
Bucket,
Count,
Group,
Limit,
Lookup,
Match,
Out,
Project,
ReplaceRoot,
Sample,
Search,
SearchMeta,
Set,
Skip,
SortByCount,
Sort,
UnionWith,
Unwind,
Unset,
]

0 comments on commit 176948d

Please sign in to comment.