Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return intermediate nodes output in pipelines #1558

Merged
merged 40 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
db38a3e
Add rest api endpoint to delete documents by filter.
ZanSara Sep 30, 2021
5398e60
Remove parametrization of rest api test to see if they solve the CI i…
ZanSara Sep 30, 2021
e67e5c7
Make the paths in rest_api/config.py absolute
ZanSara Oct 1, 2021
e1903ce
Fix path to pipelines.yaml
ZanSara Oct 1, 2021
bbd05b2
Restructuring test_rest_api.py to be able to test only my endpoint (a…
ZanSara Oct 1, 2021
681a803
Convert DELETE /documents into POST /documents/delete_by_filters
ZanSara Oct 1, 2021
09bfef0
Merge branch 'master' into debug_pipelines
ZanSara Oct 4, 2021
a6729c1
First rough implementation
ZanSara Oct 4, 2021
1c3b9cc
Merge branch 'master' into debug_pipelines
ZanSara Oct 5, 2021
b940693
Add a flag to dump the debug logs to the console as well
ZanSara Oct 5, 2021
4468a45
Add type to the debug dictionary
ZanSara Oct 5, 2021
a714ad4
Typing run() and _dispatch_run() to please mypy
ZanSara Oct 5, 2021
42d6724
Mypy requires more types
ZanSara Oct 5, 2021
75f80f5
Clarify docstrings a bit
ZanSara Oct 5, 2021
736d3fd
Allow enable_debug and console_debug to be passed as arguments of run()
ZanSara Oct 5, 2021
be670b9
Avoid overwriting _debug, later we might want to store other objects …
ZanSara Oct 5, 2021
36f6b55
Put logs under a separate key of the _debug dictionary and add input …
ZanSara Oct 5, 2021
ce1aaf4
Introduce global arguments for pipeline.run() that get applied to eve…
ZanSara Oct 6, 2021
08092b3
Change default values of debug variables to None, otherwise their def…
ZanSara Oct 6, 2021
da9d586
Remove unused import
ZanSara Oct 6, 2021
0c77b29
more typing for mypy
ZanSara Oct 6, 2021
6f244e6
Remove a potential infinite recursion on the overridden __getattr__
ZanSara Oct 6, 2021
24ceba8
Add a simple test for the debug attributes
ZanSara Oct 6, 2021
53a582e
Add test and fix small issue on global debug=False not overriding nod…
ZanSara Oct 6, 2021
97fe6b9
Do not append the output of the last node in the _debug key, it cause…
ZanSara Oct 6, 2021
bb6b63d
Fix tests
ZanSara Oct 6, 2021
8c61e8c
Removed recursion between _debug and output and fixed tests
ZanSara Oct 7, 2021
96c1418
Apparently node_input can be None :/
ZanSara Oct 7, 2021
a03eee7
Move the input/output collection into _dispatch_run to gather only re…
ZanSara Oct 7, 2021
bfcea46
Minor cleanup
ZanSara Oct 7, 2021
4b0a28c
Add partial Pipeline.run() docstring
ZanSara Oct 7, 2021
e6503a9
Move InMemoryLogger into utils.py
ZanSara Oct 7, 2021
443146b
Add latest docstring and tutorial changes
github-actions[bot] Oct 7, 2021
9308faa
Add io import to utils.py
ZanSara Oct 7, 2021
8f17063
Merge branch 'debug_pipelines' of github.com:deepset-ai/haystack into…
ZanSara Oct 7, 2021
a5ca35b
Update docstring
tholor Oct 7, 2021
c1c7d6b
Add latest docstring and tutorial changes
github-actions[bot] Oct 7, 2021
d893cf7
Revert "Move InMemoryLogger into utils.py"
ZanSara Oct 7, 2021
d18a630
Merge branch 'debug_pipelines' of github.com:deepset-ai/haystack into…
ZanSara Oct 7, 2021
66c771b
Add latest docstring and tutorial changes
github-actions[bot] Oct 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/_src/api/api/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,34 @@ Set the component for a node in the Pipeline.
- `name`: The name of the node.
- `component`: The component object to be set at the node.

<a name="pipeline.Pipeline.run"></a>
#### run

```python
| run(query: Optional[str] = None, file_paths: Optional[List[str]] = None, labels: Optional[MultiLabel] = None, documents: Optional[List[Document]] = None, meta: Optional[dict] = None, params: Optional[dict] = None, debug: Optional[bool] = None, debug_logs: Optional[bool] = None)
```

Runs the pipeline, one node at a time.

**Arguments**:

- `query`: The search query (for query pipelines only)
- `file_paths`: The files to index (for indexing pipelines only)
- `labels`:
- `documents`:
- `meta`:
- `params`: Dictionary of parameters to be dispatched to the nodes.
If you want to pass a param to all nodes, you can just use: {"top_k":10}
If you want to pass it to targeted nodes, you can do:
{"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}}
- `debug`: Whether the pipeline should instruct nodes to collect debug information
about their execution. By default these include the input parameters
they received, the output they generated, and eventual logs (of any severity)
emitted. All debug information can then be found in the dict returned
by this method under the key "_debug"
- `debug_logs`: Whether all the logs of the node should be printed in the console,
regardless of their severity and of the existing logger's settings.

<a name="pipeline.Pipeline.get_nodes_by_class"></a>
#### get\_nodes\_by\_class

Expand Down
9 changes: 9 additions & 0 deletions haystack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import logging

# Configure the root logger t0 DEBUG to allow the "debug" flag to receive the logs
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)

# Then reconfigure the StreamHandler not to display anything below WARNING as default
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.WARNING)
root_logger.addHandler(stream_handler)

ZanSara marked this conversation as resolved.
Show resolved Hide resolved
# Change log-levels before modules are loaded to avoid verbose log messages.
logging.getLogger('haystack.modeling').setLevel(logging.WARNING)
logging.getLogger('haystack.modeling.utils').setLevel(logging.INFO)
Expand Down
33 changes: 33 additions & 0 deletions haystack/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,29 @@ def run( # type: ignore
documents: Optional[List[Document]] = None,
meta: Optional[dict] = None,
params: Optional[dict] = None,
debug: Optional[bool] = None,
debug_logs: Optional[bool] = None
):
tholor marked this conversation as resolved.
Show resolved Hide resolved
"""
Runs the pipeline, one node at a time.

:param query: The search query (for query pipelines only)
:param file_paths: The files to index (for indexing pipelines only)
:param labels:
:param documents:
:param meta:
:param params: Dictionary of parameters to be dispatched to the nodes.
If you want to pass a param to all nodes, you can just use: {"top_k":10}
If you want to pass it to targeted nodes, you can do:
{"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}}
:param debug: Whether the pipeline should instruct nodes to collect debug information
about their execution. By default these include the input parameters
they received, the output they generated, and eventual logs (of any severity)
emitted. All debug information can then be found in the dict returned
by this method under the key "_debug"
:param debug_logs: Whether all the logs of the node should be printed in the console,
regardless of their severity and of the existing logger's settings.
"""
node_output = None
queue = {
self.root_node: {"root_node": self.root_node, "params": params}
Expand All @@ -281,6 +303,17 @@ def run( # type: ignore
node_id = list(queue.keys())[i]
node_input = queue[node_id]
node_input["node_id"] = node_id

# Apply debug attributes to the node input params
# NOTE: global debug attributes will override the value specified
# in each node's params dictionary.
if debug is not None:
if node_id not in node_input["params"].keys():
node_input["params"][node_id] = {}
node_input["params"][node_id]["debug"] = debug
if debug_logs is not None:
node_input["params"][node_id]["debug_logs"] = debug_logs

predecessors = set(nx.ancestors(self.graph, node_id))
if predecessors.isdisjoint(set(queue.keys())): # only execute if predecessor nodes are executed
try:
Expand Down
133 changes: 127 additions & 6 deletions haystack/schema.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from typing import Any, Optional, Dict, List
from typing import Any, Optional, Dict, List, Callable, Tuple, Optional

from uuid import uuid4
from copy import deepcopy
import mmh3
import numpy as np
from abc import abstractmethod
import inspect
import logging
import io
from functools import wraps


class Document:
def __init__(
Expand Down Expand Up @@ -249,6 +254,72 @@ def __str__(self):
return str(self.to_dict())


class InMemoryLogger(io.TextIOBase):
tholor marked this conversation as resolved.
Show resolved Hide resolved
"""
Implementation of a logger that keeps track
of the log lines in a list called `logs`,
from where they can be accessed freely.
"""

def __init__(self, *args):
io.TextIOBase.__init__(self, *args)
self.logs = []

def write(self, x):
self.logs.append(x)


def record_debug_logs(func: Callable, node_name: str, logs: bool) -> Callable:
"""
Captures the debug logs of the wrapped function and
saves them in the `_debug` key of the output dictionary.
If `logs` is True, dumps the same logs to the console as well.

Used in `BaseComponent.__getattribute__()` to wrap `run()` functions.
This makes sure that every implementation of `run()` by a subclass will
be automagically decorated with this method when requested.

:param func: the function to decorate (must be an implementation of
`BaseComponent.run()`).
:param logs: whether the captured logs should also be displayed
in the console during the execution of the pipeline.
"""
@wraps(func)
def inner(*args, **kwargs) -> Tuple[Dict[str, Any], str]:

with InMemoryLogger() as logs_container:
logger = logging.getLogger()

# Adds a handler that stores the logs in a variable
handler = logging.StreamHandler(logs_container)
handler.setLevel(logger.level or logging.DEBUG)
logger.addHandler(handler)

# Add a handler that prints log messages in the console
# to the specified level for the node
if logs:
handler_console = logging.StreamHandler()
handler_console.setLevel(logging.DEBUG)
formatter = logging.Formatter(f'[{node_name} logs] %(message)s')
handler_console.setFormatter(formatter)
logger.addHandler(handler_console)

output, stream = func(*args, **kwargs)

if not "_debug" in output.keys():
output["_debug"] = {}
output["_debug"]["logs"] = logs_container.logs

# Remove both handlers
logger.removeHandler(handler)
if logs:
logger.removeHandler(handler_console)

return output, stream

return inner


class BaseComponent:
"""
A base class for implementing nodes in a Pipeline.
Expand All @@ -266,6 +337,37 @@ def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls.subclasses[cls.__name__] = cls

def __getattribute__(self, name):
"""
This modified `__getattribute__` method automagically decorates
every `BaseComponent.run()` implementation with the
`record_debug_logs` decorator defined above.

This decorator makes the function collect its debug logs into a
`_debug` key of the output dictionary.

The logs collection is not always performed. Before applying the decorator,
it checks for an instance attribute called `debug` to know
whether it should or not. The decorator is applied if the attribute is
defined and True.

In addition, the value of the instance attribute `debug_logs` is
passed to the decorator. If it's True, it will print the
logs in the console as well.
"""
if name == "run" and self.debug:
func = getattr(type(self), "run")
return record_debug_logs(func=func, node_name=self.__class__.__name__, logs=self.debug_logs).__get__(self)
return object.__getattribute__(self, name)

def __getattr__(self, name):
"""
Ensures that `debug` and `debug_logs` are always defined.
"""
if name in ["debug", "debug_logs"]:
return None
raise AttributeError(name)

@classmethod
def get_subclass(cls, component_type: str):
if component_type not in cls.subclasses.keys():
Expand Down Expand Up @@ -317,7 +419,7 @@ def run(
documents: Optional[List[Document]] = None,
meta: Optional[dict] = None,
params: Optional[dict] = None,
):
) -> Tuple[Dict, str]:
"""
Method that will be executed when the node in the graph is called.

Expand All @@ -330,28 +432,37 @@ def run(
"""
pass

def _dispatch_run(self, **kwargs):
def _dispatch_run(self, **kwargs) -> Tuple[Dict, str]:
"""
The Pipelines call this method which in turn executes the run() method of Component.

It takes care of the following:
- inspect run() signature to validate if all necessary arguments are available
- pop `debug` and `debug_logs` and sets them on the instance to control debug output
- call run() with the corresponding arguments and gather output
- collate _debug information if present
- collate `_debug` information if present
- merge component output with the preceding output and pass it on to the subsequent Component in the Pipeline
"""
arguments = deepcopy(kwargs)
params = arguments.get("params") or {}

run_signature_args = inspect.signature(self.run).parameters.keys()

run_params = {}
run_params: Dict[str, Any] = {}
for key, value in params.items():
if key == self.name: # targeted params for this node
if isinstance(value, dict):

# Extract debug attributes
if "debug" in value.keys():
self.debug = value.pop("debug")
if "debug_logs" in value.keys():
self.debug_logs = value.pop("debug_logs")

for _k, _v in value.items():
if _k not in run_signature_args:
raise Exception(f"Invalid parameter '{_k}' for the node '{self.name}'.")

run_params.update(**value)
elif key in run_signature_args: # global params
run_params[key] = value
Expand All @@ -363,9 +474,19 @@ def _dispatch_run(self, **kwargs):

output, stream = self.run(**run_inputs, **run_params)

# Collect debug information
current_debug = output.get("_debug", {})
if self.debug:
current_debug["input"] = {**run_inputs, **run_params}
if self.debug:
current_debug["input"]["debug"] = self.debug
if self.debug_logs:
current_debug["input"]["debug_logs"] = self.debug_logs
filtered_output = {key: value for key, value in output.items() if key != "_debug"} # Exclude _debug to avoid recursion
current_debug["output"] = filtered_output

# append _debug information from nodes
all_debug = arguments.get("_debug", {})
current_debug = output.get("_debug")
if current_debug:
all_debug[self.name] = current_debug
if all_debug:
Expand Down
1 change: 1 addition & 0 deletions haystack/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import json
from collections import defaultdict
from itertools import islice
Expand Down
Loading