Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return intermediate nodes output in pipelines #1558

Merged
merged 40 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
db38a3e
Add rest api endpoint to delete documents by filter.
ZanSara Sep 30, 2021
5398e60
Remove parametrization of rest api test to see if they solve the CI i…
ZanSara Sep 30, 2021
e67e5c7
Make the paths in rest_api/config.py absolute
ZanSara Oct 1, 2021
e1903ce
Fix path to pipelines.yaml
ZanSara Oct 1, 2021
bbd05b2
Restructuring test_rest_api.py to be able to test only my endpoint (a…
ZanSara Oct 1, 2021
681a803
Convert DELETE /documents into POST /documents/delete_by_filters
ZanSara Oct 1, 2021
09bfef0
Merge branch 'master' into debug_pipelines
ZanSara Oct 4, 2021
a6729c1
First rough implementation
ZanSara Oct 4, 2021
1c3b9cc
Merge branch 'master' into debug_pipelines
ZanSara Oct 5, 2021
b940693
Add a flag to dump the debug logs to the console as well
ZanSara Oct 5, 2021
4468a45
Add type to the debug dictionary
ZanSara Oct 5, 2021
a714ad4
Typing run() and _dispatch_run() to please mypy
ZanSara Oct 5, 2021
42d6724
Mypy requires more types
ZanSara Oct 5, 2021
75f80f5
Clarify docstrings a bit
ZanSara Oct 5, 2021
736d3fd
Allow enable_debug and console_debug to be passed as arguments of run()
ZanSara Oct 5, 2021
be670b9
Avoid overwriting _debug, later we might want to store other objects …
ZanSara Oct 5, 2021
36f6b55
Put logs under a separate key of the _debug dictionary and add input …
ZanSara Oct 5, 2021
ce1aaf4
Introduce global arguments for pipeline.run() that get applied to eve…
ZanSara Oct 6, 2021
08092b3
Change default values of debug variables to None, otherwise their def…
ZanSara Oct 6, 2021
da9d586
Remove unused import
ZanSara Oct 6, 2021
0c77b29
more typing for mypy
ZanSara Oct 6, 2021
6f244e6
Remove a potential infinite recursion on the overridden __getattr__
ZanSara Oct 6, 2021
24ceba8
Add a simple test for the debug attributes
ZanSara Oct 6, 2021
53a582e
Add test and fix small issue on global debug=False not overriding nod…
ZanSara Oct 6, 2021
97fe6b9
Do not append the output of the last node in the _debug key, it cause…
ZanSara Oct 6, 2021
bb6b63d
Fix tests
ZanSara Oct 6, 2021
8c61e8c
Removed recursion between _debug and output and fixed tests
ZanSara Oct 7, 2021
96c1418
Apparently node_input can be None :/
ZanSara Oct 7, 2021
a03eee7
Move the input/output collection into _dispatch_run to gather only re…
ZanSara Oct 7, 2021
bfcea46
Minor cleanup
ZanSara Oct 7, 2021
4b0a28c
Add partial Pipeline.run() docstring
ZanSara Oct 7, 2021
e6503a9
Move InMemoryLogger into utils.py
ZanSara Oct 7, 2021
443146b
Add latest docstring and tutorial changes
github-actions[bot] Oct 7, 2021
9308faa
Add io import to utils.py
ZanSara Oct 7, 2021
8f17063
Merge branch 'debug_pipelines' of github.com:deepset-ai/haystack into…
ZanSara Oct 7, 2021
a5ca35b
Update docstring
tholor Oct 7, 2021
c1c7d6b
Add latest docstring and tutorial changes
github-actions[bot] Oct 7, 2021
d893cf7
Revert "Move InMemoryLogger into utils.py"
ZanSara Oct 7, 2021
d18a630
Merge branch 'debug_pipelines' of github.com:deepset-ai/haystack into…
ZanSara Oct 7, 2021
66c771b
Add latest docstring and tutorial changes
github-actions[bot] Oct 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions haystack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import logging

# Configure the root logger t0 DEBUG to allow the "debug" flag to receive the logs
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)

# Then reconfigure the StreamHandler not to display anything below WARNING as default
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.WARNING)
root_logger.addHandler(stream_handler)

ZanSara marked this conversation as resolved.
Show resolved Hide resolved
# Change log-levels before modules are loaded to avoid verbose log messages.
logging.getLogger('haystack.modeling').setLevel(logging.WARNING)
logging.getLogger('haystack.modeling.utils').setLevel(logging.INFO)
Expand Down
9 changes: 7 additions & 2 deletions haystack/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pickle
import urllib
from functools import wraps
from networkx.algorithms.boundary import node_boundary
ZanSara marked this conversation as resolved.
Show resolved Hide resolved

try:
from ray import serve
Expand Down Expand Up @@ -259,9 +260,10 @@ def run( # type: ignore
labels: Optional[MultiLabel] = None,
documents: Optional[List[Document]] = None,
meta: Optional[dict] = None,
params: Optional[dict] = None,
params: Optional[dict] = None
):
tholor marked this conversation as resolved.
Show resolved Hide resolved
node_output = None
node_output = {}
debug_output = {}
queue = {
self.root_node: {"root_node": self.root_node, "params": params}
} # ordered dict with "node_id" -> "input" mapping that acts as a FIFO queue
Expand All @@ -286,6 +288,7 @@ def run( # type: ignore
try:
logger.debug(f"Running node `{node_id}` with input `{node_input}`")
node_output, stream_id = self.graph.nodes[node_id]["component"]._dispatch_run(**node_input)
debug_output.update(node_output.get("_debug", {}))
except Exception as e:
tb = traceback.format_exc()
raise Exception(f"Exception while running node `{node_id}` with input `{node_input}`: {e}, full stack trace: {tb}")
Expand Down Expand Up @@ -315,6 +318,8 @@ def run( # type: ignore
i = 0
else:
i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors
if debug_output:
node_output["_debug"] = debug_output
return node_output

def get_next_nodes(self, node_id: str, stream_id: str):
Expand Down
107 changes: 103 additions & 4 deletions haystack/schema.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from typing import Any, Optional, Dict, List
from typing import Any, Optional, Dict, List, Callable, Tuple, Optional

from uuid import uuid4
from copy import deepcopy
import mmh3
import numpy as np
from abc import abstractmethod
import inspect
import logging
import io
from functools import wraps
import types


class Document:
def __init__(
Expand Down Expand Up @@ -249,6 +255,66 @@ def __str__(self):
return str(self.to_dict())


class InMemoryLogger(io.TextIOBase):
tholor marked this conversation as resolved.
Show resolved Hide resolved
"""
Implementation of a logger that keeps track
of the log lines in a list called `logs`,
from where they can be accessed freely.
"""

def __init__(self, *args):
io.TextIOBase.__init__(self, *args)
self.logs = []

def write(self, x):
self.logs.append(x[:100])


def record_debug_logs(func: Callable, to_console_too: Optional[bool] = False) -> Callable:
"""
Captures the debug logs of the wrapped function and
saves them in the `_debug` key of the output dictionary.
If `to_console_too` is True, dumps the same logs to the console as well.

Used in `BaseComponent.__getattribute__()` to wrap `run()` functions.
This makes sure that every implementation of `run()` by a subclass will
be automagically decorated with this method when requested.

:param func: the function to decorate (must be an implementation of
`BaseComponent.run()`).
:param to_console_too: whether the captured logs should also be displayed
in the console during the execution of the pipeline.
"""
@wraps(func)
def inner(*args, **kwargs) -> Tuple[Dict[str, Any], str]:

with InMemoryLogger() as logs_container:
logger = logging.getLogger()

# Adds a handler that stores the logs in a variable
handler = logging.StreamHandler(logs_container)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)

# Add a handler that prints DEBUG messages in the console
if to_console_too:
handler_console = logging.StreamHandler()
handler_console.setLevel(logging.DEBUG)
logger.addHandler(handler_console)

output, stream = func(*args, **kwargs)
output["_debug"] = logs_container.logs

# Remove both handlers
logger.removeHandler(handler)
if to_console_too:
logger.removeHandler(handler_console)

return output, stream

return inner


class BaseComponent:
"""
A base class for implementing nodes in a Pipeline.
Expand All @@ -266,6 +332,30 @@ def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls.subclasses[cls.__name__] = cls

def __getattribute__(self, name):
"""
This modified `__getattribute__` method automagically decorates
every `BaseComponent.run()` implementation with the
`record_debug_logs` decorator defined above.

This decorator makes the function collect its debug logs into a
`_debug` key of the output dictionary.

The logs collection is not always performed. Before applying the decorator,
it checks for an instance attribute called `enable_debug` to know
whether it should or not. The decorator is applied if the attribute is
defined and True.

In addition, the value of the instance attribute `console_debug` is
passed to the decorator. If it's defined and True, the same logs collected in
`_debug` are also printed in the console during the execution.
"""
if name == "run":
func = getattr(type(self), "run")
if getattr(self, 'enable_debug', False):
return types.MethodType(record_debug_logs(func, getattr(self, 'console_debug', False)), self)
return object.__getattribute__(self, name)

@classmethod
def get_subclass(cls, component_type: str):
if component_type not in cls.subclasses.keys():
Expand Down Expand Up @@ -317,7 +407,7 @@ def run(
documents: Optional[List[Document]] = None,
meta: Optional[dict] = None,
params: Optional[dict] = None,
):
) -> Tuple[Dict, str]:
"""
Method that will be executed when the node in the graph is called.

Expand All @@ -330,12 +420,13 @@ def run(
"""
pass

def _dispatch_run(self, **kwargs):
def _dispatch_run(self, **kwargs) -> Tuple[Dict, str]:
"""
The Pipelines call this method which in turn executes the run() method of Component.

It takes care of the following:
- inspect run() signature to validate if all necessary arguments are available
- pop `enable_debug` and `console_debug` and sets them on the instance to control debug output
- call run() with the corresponding arguments and gather output
- collate _debug information if present
- merge component output with the preceding output and pass it on to the subsequent Component in the Pipeline
Expand All @@ -345,13 +436,21 @@ def _dispatch_run(self, **kwargs):

run_signature_args = inspect.signature(self.run).parameters.keys()

run_params = {}
run_params: Dict[str, Any] = {}
for key, value in params.items():
if key == self.name: # targeted params for this node
if isinstance(value, dict):

# Debug attributes
if "enable_debug" in value.keys():
self.enable_debug = value.pop("enable_debug")
if "console_debug" in value.keys():
self.console_debug = value.pop("console_debug")

for _k, _v in value.items():
if _k not in run_signature_args:
raise Exception(f"Invalid parameter '{_k}' for the node '{self.name}'.")

run_params.update(**value)
elif key in run_signature_args: # global params
run_params[key] = value
Expand Down