From db38a3e416f01b33d882876ec676f35cb8599dbb Mon Sep 17 00:00:00 2001 From: ZanSara Date: Thu, 30 Sep 2021 15:31:41 +0200 Subject: [PATCH 01/36] Add rest api endpoint to delete documents by filter. --- rest_api/application.py | 16 +++++++++++- rest_api/controller/document.py | 26 +++++++++++++++++++ rest_api/controller/feedback.py | 38 +++++---------------------- rest_api/controller/router.py | 3 ++- rest_api/controller/search.py | 45 ++++++-------------------------- rest_api/schema.py | 46 +++++++++++++++++++++++++++++++++ 6 files changed, 104 insertions(+), 70 deletions(-) create mode 100644 rest_api/controller/document.py create mode 100644 rest_api/schema.py diff --git a/rest_api/application.py b/rest_api/application.py index ddce374965..46047e83f4 100644 --- a/rest_api/application.py +++ b/rest_api/application.py @@ -1,19 +1,32 @@ import logging +from pathlib import Path import uvicorn from fastapi import FastAPI, HTTPException from starlette.middleware.cors import CORSMiddleware +from haystack import Pipeline +from rest_api.config import PIPELINE_YAML_PATH, QUERY_PIPELINE_NAME from rest_api.controller.errors.http_error import http_error_handler -from rest_api.controller.router import router as api_router from rest_api.config import ROOT_PATH + logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p") logger = logging.getLogger(__name__) logging.getLogger("elasticsearch").setLevel(logging.WARNING) logging.getLogger("haystack").setLevel(logging.INFO) +PIPELINE = Pipeline.load_from_yaml(Path(PIPELINE_YAML_PATH), pipeline_name=QUERY_PIPELINE_NAME) +# TODO make this generic for other pipelines with different naming +RETRIEVER = PIPELINE.get_node(name="Retriever") +DOCUMENT_STORE = RETRIEVER.document_store if RETRIEVER else None +logging.info(f"Loaded pipeline nodes: {PIPELINE.graph.nodes.keys()}") + + +from rest_api.controller.router import router as api_router + + def get_application() -> FastAPI: application = FastAPI(title="Haystack-API", debug=True, version="0.1", root_path=ROOT_PATH) @@ -32,6 +45,7 @@ def get_application() -> FastAPI: app = get_application() + logger.info("Open http://127.0.0.1:8000/docs to see Swagger API Documentation.") logger.info( """ diff --git a/rest_api/controller/document.py b/rest_api/controller/document.py new file mode 100644 index 0000000000..10181c0791 --- /dev/null +++ b/rest_api/controller/document.py @@ -0,0 +1,26 @@ +import logging + +from fastapi import APIRouter + +from rest_api.application import DOCUMENT_STORE +from rest_api.config import LOG_LEVEL +from rest_api.schema import FilterRequest + + +logging.getLogger("haystack").setLevel(LOG_LEVEL) +logger = logging.getLogger("haystack") + + +router = APIRouter() + + +@router.delete("/documents", response_model=bool) +def delete_documents(filters: FilterRequest): + """ + Can be used to delete documents from a document store. + + :param filters: Filters to narrow down the documents to delete. + Example: {"name": ["some", "more"], "category": ["only_one"]} + """ + DOCUMENT_STORE.delete_documents(filters=filters.filters) + return True \ No newline at end of file diff --git a/rest_api/controller/feedback.py b/rest_api/controller/feedback.py index 8b771f849f..d5b0f924ea 100644 --- a/rest_api/controller/feedback.py +++ b/rest_api/controller/feedback.py @@ -3,41 +3,17 @@ from typing import Dict, Union, List, Optional from fastapi import APIRouter, HTTPException -from pydantic import BaseModel, Field -from rest_api.controller.search import PIPELINE +from rest_api.schema import ExtractiveQAFeedback, FilterRequest +from rest_api.application import DOCUMENT_STORE router = APIRouter() logger = logging.getLogger(__name__) -# TODO make this generic for other pipelines with different naming -retriever = PIPELINE.get_node(name="ESRetriever") -document_store = retriever.document_store if retriever else None - - -class ExtractiveQAFeedback(BaseModel): - question: str = Field(..., description="The question input by the user, i.e., the query.") - is_correct_answer: bool = Field(..., description="Whether the answer is correct or not.") - document_id: str = Field(..., description="The document in the query result for which feedback is given.") - model_id: Optional[int] = Field(None, description="The model used for the query.") - is_correct_document: bool = Field( - ..., - description="In case of negative feedback, there could be two cases; incorrect answer but correct " - "document & incorrect document. This flag denotes if the returned document was correct.", - ) - answer: str = Field(..., description="The answer string.") - offset_start_in_doc: int = Field( - ..., description="The answer start offset in the original doc. Only required for doc-qa feedback." - ) - - -class FilterRequest(BaseModel): - filters: Optional[Dict[str, Optional[Union[str, List[str]]]]] = None - @router.post("/feedback") def user_feedback(feedback: ExtractiveQAFeedback): - document_store.write_labels([{"origin": "user-feedback", **feedback.dict()}]) + DOCUMENT_STORE.write_labels([{"origin": "user-feedback", **feedback.dict()}]) @router.post("/eval-feedback") @@ -62,7 +38,7 @@ def eval_extractive_qa_feedback(filters: FilterRequest = None): else: filters = {"origin": ["user-feedback"]} - labels = document_store.get_all_labels(filters=filters) + labels = DOCUMENT_STORE.get_all_labels(filters=filters) if len(labels) > 0: answer_feedback = [1 if l.is_correct_answer else 0 for l in labels] @@ -87,9 +63,9 @@ def export_extractive_qa_feedback( The context_size param can be used to limit response size for large documents. """ if only_positive_labels: - labels = document_store.get_all_labels(filters={"is_correct_answer": [True], "origin": ["user-feedback"]}) + labels = DOCUMENT_STORE.get_all_labels(filters={"is_correct_answer": [True], "origin": ["user-feedback"]}) else: - labels = document_store.get_all_labels(filters={"origin": ["user-feedback"]}) + labels = DOCUMENT_STORE.get_all_labels(filters={"origin": ["user-feedback"]}) # Filter out the labels where the passage is correct but answer is wrong (in SQuAD this matches # neither a "positive example" nor a negative "is_impossible" one) labels = [l for l in labels if not (l.is_correct_document is True and l.is_correct_answer is False)] @@ -97,7 +73,7 @@ def export_extractive_qa_feedback( export_data = [] for label in labels: - document = document_store.get_document_by_id(label.document_id) + document = DOCUMENT_STORE.get_document_by_id(label.document_id) if document is None: raise HTTPException( status_code=500, detail="Could not find document with id {label.document_id} for label id {label.id}" diff --git a/rest_api/controller/router.py b/rest_api/controller/router.py index b498fdb56b..52c362da14 100644 --- a/rest_api/controller/router.py +++ b/rest_api/controller/router.py @@ -1,9 +1,10 @@ from fastapi import APIRouter -from rest_api.controller import file_upload, search, feedback +from rest_api.controller import file_upload, search, feedback, document router = APIRouter() router.include_router(search.router, tags=["search"]) router.include_router(feedback.router, tags=["feedback"]) router.include_router(file_upload.router, tags=["file-upload"]) +router.include_router(document.router, tags=["document"]) diff --git a/rest_api/controller/search.py b/rest_api/controller/search.py index e729c9ad45..01377ea5c3 100644 --- a/rest_api/controller/search.py +++ b/rest_api/controller/search.py @@ -1,48 +1,19 @@ -import json import logging import time -from pathlib import Path -from typing import Dict, List, Optional, Union, Any from fastapi import APIRouter -from pydantic import BaseModel -from haystack import Pipeline -from rest_api.config import PIPELINE_YAML_PATH, LOG_LEVEL, QUERY_PIPELINE_NAME, CONCURRENT_REQUEST_PER_WORKER +from rest_api.application import PIPELINE +from rest_api.config import LOG_LEVEL, CONCURRENT_REQUEST_PER_WORKER +from rest_api.schema import QueryRequest, QueryResponse from rest_api.controller.utils import RequestLimiter + logging.getLogger("haystack").setLevel(LOG_LEVEL) logger = logging.getLogger("haystack") -router = APIRouter() - - -class Request(BaseModel): - query: str - params: Optional[dict] = None - -class Answer(BaseModel): - answer: Optional[str] - question: Optional[str] - score: Optional[float] = None - probability: Optional[float] = None - context: Optional[str] - offset_start: Optional[int] - offset_end: Optional[int] - offset_start_in_doc: Optional[int] - offset_end_in_doc: Optional[int] - document_id: Optional[str] = None - meta: Optional[Dict[str, Any]] - - -class Response(BaseModel): - query: str - answers: List[Answer] - - -PIPELINE = Pipeline.load_from_yaml(Path(PIPELINE_YAML_PATH), pipeline_name=QUERY_PIPELINE_NAME) -logger.info(f"Loaded pipeline nodes: {PIPELINE.graph.nodes.keys()}") +router = APIRouter() concurrency_limiter = RequestLimiter(CONCURRENT_REQUEST_PER_WORKER) @@ -58,14 +29,14 @@ def initialized(): return True -@router.post("/query", response_model=Response) -def query(request: Request): +@router.post("/query", response_model=QueryResponse) +def query(request: QueryRequest): with concurrency_limiter.run(): result = _process_request(PIPELINE, request) return result -def _process_request(pipeline, request) -> Response: +def _process_request(pipeline, request) -> QueryResponse: start_time = time.time() params = request.params or {} diff --git a/rest_api/schema.py b/rest_api/schema.py new file mode 100644 index 0000000000..7dff5e0b06 --- /dev/null +++ b/rest_api/schema.py @@ -0,0 +1,46 @@ +from typing import Dict, List, Optional, Union, Any +from pydantic import BaseModel, Field + + +class QueryRequest(BaseModel): + query: str + params: Optional[dict] = None + + +class FilterRequest(BaseModel): + filters: Optional[Dict[str, Optional[Union[str, List[str]]]]] = None + + +class QueryAnswer(BaseModel): + answer: Optional[str] + question: Optional[str] + score: Optional[float] = None + probability: Optional[float] = None + context: Optional[str] + offset_start: Optional[int] + offset_end: Optional[int] + offset_start_in_doc: Optional[int] + offset_end_in_doc: Optional[int] + document_id: Optional[str] = None + meta: Optional[Dict[str, Any]] + + +class QueryResponse(BaseModel): + query: str + answers: List[QueryAnswer] + + +class ExtractiveQAFeedback(BaseModel): + question: str = Field(..., description="The question input by the user, i.e., the query.") + is_correct_answer: bool = Field(..., description="Whether the answer is correct or not.") + document_id: str = Field(..., description="The document in the query result for which feedback is given.") + model_id: Optional[int] = Field(None, description="The model used for the query.") + is_correct_document: bool = Field( + ..., + description="In case of negative feedback, there could be two cases; incorrect answer but correct " + "document & incorrect document. This flag denotes if the returned document was correct.", + ) + answer: str = Field(..., description="The answer string.") + offset_start_in_doc: int = Field( + ..., description="The answer start offset in the original doc. Only required for doc-qa feedback." + ) From 5398e601cb558ca42fa044cd1f81ad06b2fd276b Mon Sep 17 00:00:00 2001 From: ZanSara Date: Thu, 30 Sep 2021 17:47:20 +0200 Subject: [PATCH 02/36] Remove parametrization of rest api test to see if they solve the CI issue (they now run locally) --- test/test_rest_api.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/test_rest_api.py b/test/test_rest_api.py index 8ee9297de4..ed9ac2387a 100644 --- a/test/test_rest_api.py +++ b/test/test_rest_api.py @@ -3,10 +3,12 @@ import pytest from fastapi.testclient import TestClient +from rest_api.application import DOCUMENT_STORE + def get_test_client_and_override_dependencies(): import os - os.environ["PIPELINE_YAML_PATH"] = "samples/pipeline/test_pipeline.yaml" + os.environ["PIPELINE_YAML_PATH"] = str((Path(__file__).parent / "samples"/"pipeline"/"test_pipeline.yaml").absolute()) os.environ["QUERY_PIPELINE_NAME"] = "query_pipeline" os.environ["INDEXING_PIPELINE_NAME"] = "indexing_pipeline" @@ -16,13 +18,13 @@ def get_test_client_and_override_dependencies(): @pytest.mark.slow @pytest.mark.elasticsearch -@pytest.mark.parametrize("reader", ["farm"], indirect=True) -@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) -def test_api(reader, document_store): +#@pytest.mark.parametrize("reader", ["farm"], indirect=True) +#@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) +def test_api():#reader, document_store): client = get_test_client_and_override_dependencies() # test file upload API - file_to_upload = {'files': Path("samples/pdf/sample_pdf_1.pdf").open('rb')} + file_to_upload = {'files': (Path(__file__).parent / "samples"/"pdf"/"sample_pdf_1.pdf").open('rb')} response = client.post(url="/file-upload", files=file_to_upload, data={"meta": '{"meta_key": "meta_value"}'}) assert 200 == response.status_code From e67e5c7696e386287efdc5b7e285e254c5d0e415 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Fri, 1 Oct 2021 10:30:21 +0200 Subject: [PATCH 03/36] Make the paths in rest_api/config.py absolute --- rest_api/config.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rest_api/config.py b/rest_api/config.py index 4317036b75..878349a408 100644 --- a/rest_api/config.py +++ b/rest_api/config.py @@ -1,10 +1,12 @@ import os +from pathlib import Path -PIPELINE_YAML_PATH = os.getenv("PIPELINE_YAML_PATH", "rest_api/pipeline/pipelines.yaml") + +PIPELINE_YAML_PATH = os.getenv("PIPELINE_YAML_PATH", str((Path(__file__).parent / "rest_api"/"pipeline"/"pipelines.yaml").absolute())) QUERY_PIPELINE_NAME = os.getenv("QUERY_PIPELINE_NAME", "query") INDEXING_PIPELINE_NAME = os.getenv("INDEXING_PIPELINE_NAME", "indexing") -FILE_UPLOAD_PATH = os.getenv("FILE_UPLOAD_PATH", "./file-upload") +FILE_UPLOAD_PATH = os.getenv("FILE_UPLOAD_PATH", str((Path(__file__).parent / "file-upload").absolute())) LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") ROOT_PATH = os.getenv("ROOT_PATH", "/") From e1903cea282b12d598dd08f1fa16f57eee7dc0f3 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Fri, 1 Oct 2021 11:25:49 +0200 Subject: [PATCH 04/36] Fix path to pipelines.yaml --- rest_api/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rest_api/config.py b/rest_api/config.py index 878349a408..7b3b9f1c06 100644 --- a/rest_api/config.py +++ b/rest_api/config.py @@ -2,7 +2,7 @@ from pathlib import Path -PIPELINE_YAML_PATH = os.getenv("PIPELINE_YAML_PATH", str((Path(__file__).parent / "rest_api"/"pipeline"/"pipelines.yaml").absolute())) +PIPELINE_YAML_PATH = os.getenv("PIPELINE_YAML_PATH", str((Path(__file__).parent / "pipeline" / "pipelines.yaml").absolute())) QUERY_PIPELINE_NAME = os.getenv("QUERY_PIPELINE_NAME", "query") INDEXING_PIPELINE_NAME = os.getenv("INDEXING_PIPELINE_NAME", "indexing") From bbd05b24f255fc7ff02a96102ebf622eab15f919 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Fri, 1 Oct 2021 12:15:22 +0200 Subject: [PATCH 05/36] Restructuring test_rest_api.py to be able to test only my endpoint (and to make the suite more structured) --- test/test_rest_api.py | 70 +++++++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 22 deletions(-) diff --git a/test/test_rest_api.py b/test/test_rest_api.py index ed9ac2387a..5405272b89 100644 --- a/test/test_rest_api.py +++ b/test/test_rest_api.py @@ -1,65 +1,80 @@ +import os from pathlib import Path import pytest from fastapi.testclient import TestClient +from rest_api.application import app from rest_api.application import DOCUMENT_STORE -def get_test_client_and_override_dependencies(): - import os +@pytest.fixture(scope="session") +def client() -> TestClient: os.environ["PIPELINE_YAML_PATH"] = str((Path(__file__).parent / "samples"/"pipeline"/"test_pipeline.yaml").absolute()) os.environ["QUERY_PIPELINE_NAME"] = "query_pipeline" os.environ["INDEXING_PIPELINE_NAME"] = "indexing_pipeline" - - from rest_api.application import app return TestClient(app) +@pytest.fixture(scope="session") +def populated_client(client: TestClient) -> TestClient: + file_to_upload = {'files': (Path(__file__).parent / "samples"/"pdf"/"sample_pdf_1.pdf").open('rb')} + client.post(url="/file-upload", files=file_to_upload, data={"meta": '{"meta_key": "meta_value"}'}) + yield client + client.delete(url="/documents", data={"meta_key": ["meta_value"]}) + -@pytest.mark.slow -@pytest.mark.elasticsearch -#@pytest.mark.parametrize("reader", ["farm"], indirect=True) -#@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) -def test_api():#reader, document_store): - client = get_test_client_and_override_dependencies() - # test file upload API +def test_file_upload(client: TestClient): + file_to_upload = {'files': (Path(__file__).parent / "samples"/"pdf"/"sample_pdf_1.pdf").open('rb')} + response = client.post(url="/file-upload", files=file_to_upload, data={"meta": '{"meta_key": "meta_value"}'}) + assert 200 == response.status_code + +def test_delete_documents(client: TestClient): file_to_upload = {'files': (Path(__file__).parent / "samples"/"pdf"/"sample_pdf_1.pdf").open('rb')} response = client.post(url="/file-upload", files=file_to_upload, data={"meta": '{"meta_key": "meta_value"}'}) + + filters = {"filters": {"meta_key": ["meta_value"]}} + response = client.delete(url="/documents", json=filters) assert 200 == response.status_code - # test query API +def test_query_with_no_filter(populated_client: TestClient): query_with_no_filter_value = {"query": "Who made the PDF specification?"} - response = client.post(url="/query", json=query_with_no_filter_value) + response = populated_client.post(url="/query", json=query_with_no_filter_value) assert 200 == response.status_code response_json = response.json() assert response_json["answers"][0]["answer"] == "Adobe Systems" - document_id = response_json["answers"][0]["document_id"] +def test_query_with_one_filter(populated_client: TestClient): query_with_filter = {"query": "Who made the PDF specification?", "params": {"filters": {"meta_key": "meta_value"}}} - response = client.post(url="/query", json=query_with_filter) + response = populated_client.post(url="/query", json=query_with_filter) assert 200 == response.status_code response_json = response.json() assert response_json["answers"][0]["answer"] == "Adobe Systems" +def test_query_with_filter_list(populated_client: TestClient): query_with_filter_list = { "query": "Who made the PDF specification?", "params": {"filters": {"meta_key": ["meta_value", "another_value"]}} } - response = client.post(url="/query", json=query_with_filter_list) + response = populated_client.post(url="/query", json=query_with_filter_list) assert 200 == response.status_code response_json = response.json() assert response_json["answers"][0]["answer"] == "Adobe Systems" +def test_query_with_invalid_filter(populated_client: TestClient): query_with_invalid_filter = { "query": "Who made the PDF specification?", "params": {"filters": {"meta_key": "invalid_value"}} } - response = client.post(url="/query", json=query_with_invalid_filter) + response = populated_client.post(url="/query", json=query_with_invalid_filter) assert 200 == response.status_code response_json = response.json() assert len(response_json["answers"]) == 0 - # test write feedback +def test_write_feedback(populated_client: TestClient): + response = populated_client.post(url="/query", json={"query": "Who made the PDF specification?"}) + response_json = response.json() + document_id = response_json["answers"][0]["document_id"] + feedback = { "question": "Who made the PDF specification?", "is_correct_answer": True, @@ -68,20 +83,31 @@ def test_api():#reader, document_store): "answer": "Adobe Systems", "offset_start_in_doc": 60 } - response = client.post(url="/feedback", json=feedback) + response = populated_client.post(url="/feedback", json=feedback) assert 200 == response.status_code - # test export feedback +def test_export_feedback(populated_client: TestClient): + response = populated_client.post(url="/query", json={"query": "Who made the PDF specification?"}) + response_json = response.json() + document_id = response_json["answers"][0]["document_id"] + + feedback = { + "question": "Who made the PDF specification?", + "is_correct_answer": True, + "document_id": document_id, + "is_correct_document": True, + "answer": "Adobe Systems", + "offset_start_in_doc": 60 + } feedback_urls = [ "/export-feedback?full_document_context=true", "/export-feedback?full_document_context=false&context_size=50", "/export-feedback?full_document_context=false&context_size=50000", ] for url in feedback_urls: - response = client.get(url=url, json=feedback) + response = populated_client.get(url=url, json=feedback) response_json = response.json() context = response_json["data"][0]["paragraphs"][0]["context"] answer_start = response_json["data"][0]["paragraphs"][0]["qas"][0]["answers"][0]["answer_start"] answer = response_json["data"][0]["paragraphs"][0]["qas"][0]["answers"][0]["text"] assert context[answer_start:answer_start+len(answer)] == answer - From 681a803e8db54e55b8e1b449ba334fcc6ad98918 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Fri, 1 Oct 2021 15:00:06 +0200 Subject: [PATCH 06/36] Convert DELETE /documents into POST /documents/delete_by_filters --- rest_api/controller/document.py | 2 +- test/test_rest_api.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/rest_api/controller/document.py b/rest_api/controller/document.py index 10181c0791..ecb6cf2db7 100644 --- a/rest_api/controller/document.py +++ b/rest_api/controller/document.py @@ -14,7 +14,7 @@ router = APIRouter() -@router.delete("/documents", response_model=bool) +@router.post("/documents/delete_by_filters", response_model=bool) def delete_documents(filters: FilterRequest): """ Can be used to delete documents from a document store. diff --git a/test/test_rest_api.py b/test/test_rest_api.py index 5405272b89..5e61647ca7 100644 --- a/test/test_rest_api.py +++ b/test/test_rest_api.py @@ -20,7 +20,7 @@ def populated_client(client: TestClient) -> TestClient: file_to_upload = {'files': (Path(__file__).parent / "samples"/"pdf"/"sample_pdf_1.pdf").open('rb')} client.post(url="/file-upload", files=file_to_upload, data={"meta": '{"meta_key": "meta_value"}'}) yield client - client.delete(url="/documents", data={"meta_key": ["meta_value"]}) + client.post(url="/documents/delete_by_filters", data={"meta_key": ["meta_value"]}) @@ -33,8 +33,7 @@ def test_delete_documents(client: TestClient): file_to_upload = {'files': (Path(__file__).parent / "samples"/"pdf"/"sample_pdf_1.pdf").open('rb')} response = client.post(url="/file-upload", files=file_to_upload, data={"meta": '{"meta_key": "meta_value"}'}) - filters = {"filters": {"meta_key": ["meta_value"]}} - response = client.delete(url="/documents", json=filters) + client.post(url="/documents/delete_by_filters", data={"meta_key": ["meta_value"]}) assert 200 == response.status_code def test_query_with_no_filter(populated_client: TestClient): From a6729c14a29109b262240149c202f4c5b4f8a31f Mon Sep 17 00:00:00 2001 From: ZanSara Date: Mon, 4 Oct 2021 18:08:09 +0200 Subject: [PATCH 07/36] First rough implementation --- haystack/__init__.py | 9 +++++++ haystack/pipeline.py | 9 ++++++- haystack/schema.py | 60 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/haystack/__init__.py b/haystack/__init__.py index 5dad0c9bb7..10227eb803 100644 --- a/haystack/__init__.py +++ b/haystack/__init__.py @@ -1,5 +1,14 @@ import logging +# Configure the root logger t0 DEBUG to allow the "debug" flag to receive the logs +root_logger = logging.getLogger() +root_logger.setLevel(logging.DEBUG) + +# Then reconfigure the StreamHandler not to display anything below WARNING as default +stream_handler = logging.StreamHandler() +stream_handler.setLevel(logging.WARNING) +root_logger.addHandler(stream_handler) + # Change log-levels before modules are loaded to avoid verbose log messages. logging.getLogger('haystack.modeling').setLevel(logging.WARNING) logging.getLogger('haystack.modeling.utils').setLevel(logging.INFO) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 01d72e7e43..927d5f56b4 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -10,6 +10,7 @@ import pickle import urllib from functools import wraps +from networkx.algorithms.boundary import node_boundary try: from ray import serve @@ -259,9 +260,10 @@ def run( # type: ignore labels: Optional[MultiLabel] = None, documents: Optional[List[Document]] = None, meta: Optional[dict] = None, - params: Optional[dict] = None, + params: Optional[dict] = None ): node_output = None + debug_output = {} queue = { self.root_node: {"root_node": self.root_node, "params": params} } # ordered dict with "node_id" -> "input" mapping that acts as a FIFO queue @@ -286,6 +288,7 @@ def run( # type: ignore try: logger.debug(f"Running node `{node_id}` with input `{node_input}`") node_output, stream_id = self.graph.nodes[node_id]["component"]._dispatch_run(**node_input) + debug_output.update(node_output.get("_debug", {})) except Exception as e: tb = traceback.format_exc() raise Exception(f"Exception while running node `{node_id}` with input `{node_input}`: {e}, full stack trace: {tb}") @@ -315,6 +318,8 @@ def run( # type: ignore i = 0 else: i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors + if debug_output: + node_output["_debug"] = debug_output return node_output def get_next_nodes(self, node_id: str, stream_id: str): @@ -1029,6 +1034,8 @@ def __init__( self.top_k_join = top_k_join def run(self, inputs: List[dict]): # type: ignore + logging.debug("Test debug message for JoinDocuments nodes") + if self.join_mode == "concatenate": document_map = {} for input_from_node in inputs: diff --git a/haystack/schema.py b/haystack/schema.py index 191905f80e..6b5a86f834 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -1,10 +1,16 @@ -from typing import Any, Optional, Dict, List +from typing import Any, Optional, Dict, List, Callable, Tuple, Optional + from uuid import uuid4 from copy import deepcopy import mmh3 import numpy as np from abc import abstractmethod import inspect +import logging +import io +from functools import wraps +import types + class Document: def __init__( @@ -249,6 +255,42 @@ def __str__(self): return str(self.to_dict()) +class InMemoryLogger(io.TextIOBase): + + def __init__(self, *args): + io.TextIOBase.__init__(self, *args) + self.logs = [] + + def write(self, x): + self.logs.append(x[:100]) + + +def supports_debug(func: Callable, debug: Optional[bool] = False): + """ + Captures the debug logs of the wrapped functions and + saves it in a _debug key of the output dictionary. + """ + @wraps(func) + def inner(*args, **kwargs): + if not debug: + return func(*args, **kwargs) + + with InMemoryLogger() as logs_container: + logger = logging.getLogger() + + handler = logging.StreamHandler(logs_container) + handler.setLevel(logging.DEBUG) + logger.addHandler(handler) + + output, stream = func(*args, **kwargs) + + output["_debug"] = logs_container.logs + logger.removeHandler(handler) + return output, stream + + return inner + + class BaseComponent: """ A base class for implementing nodes in a Pipeline. @@ -266,6 +308,22 @@ def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) cls.subclasses[cls.__name__] = cls + def __getattribute__(self, name): + """ + This modified `__getattribute__` automagically decorates + every `BaseComponent.run()` implementation with the + `@supports_debug` decorator, which makes the function + dump its debug logs into a `_debug` key of the output + dictionary. + + Relies on a class attribute called `enable_debug` to know + if it should actually populate the `_debug` key or not. + """ + if name == "run": + func = getattr(type(self), "run") + return types.MethodType(supports_debug(func, getattr(self, 'enable_debug', False)), self) + return object.__getattribute__(self, name) + @classmethod def get_subclass(cls, component_type: str): if component_type not in cls.subclasses.keys(): From b940693c51cf03ef684508623a937b38853af6ef Mon Sep 17 00:00:00 2001 From: ZanSara Date: Tue, 5 Oct 2021 09:56:41 +0200 Subject: [PATCH 08/36] Add a flag to dump the debug logs to the console as well --- haystack/schema.py | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/haystack/schema.py b/haystack/schema.py index 6b5a86f834..fcf802ca73 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -265,27 +265,37 @@ def write(self, x): self.logs.append(x[:100]) -def supports_debug(func: Callable, debug: Optional[bool] = False): +def record_debug_logs(func: Callable, to_console_too: Optional[bool] = False): """ Captures the debug logs of the wrapped functions and - saves it in a _debug key of the output dictionary. + saves it in a _debug key of the output dictionary. If `to_console_too` + is True, dumps the same logs to the console as well. + + Used in BaseComponent.__getattribute__() to wrap run() functions. + This makes sure that every implementation of run() by a subclass will + be automagically decorated """ @wraps(func) def inner(*args, **kwargs): - if not debug: - return func(*args, **kwargs) - with InMemoryLogger() as logs_container: logger = logging.getLogger() handler = logging.StreamHandler(logs_container) handler.setLevel(logging.DEBUG) - logger.addHandler(handler) + logger.addHandler(handler) - output, stream = func(*args, **kwargs) + if to_console_too: + handler_console = logging.StreamHandler() + handler_console.setLevel(logging.DEBUG) + logger.addHandler(handler_console) + output, stream = func(*args, **kwargs) output["_debug"] = logs_container.logs - logger.removeHandler(handler) + + logger.removeHandler(handler) + if to_console_too: + logger.removeHandler(handler_console) + return output, stream return inner @@ -316,12 +326,20 @@ def __getattribute__(self, name): dump its debug logs into a `_debug` key of the output dictionary. - Relies on a class attribute called `enable_debug` to know - if it should actually populate the `_debug` key or not. + Relies on a class attribute called `enable_debug_in_output` to know + if it should actually decorate the function or not. Therefore + the `enable_debug_in_output` flag turns on and off this functionality. + + `enable_debug_in_output` does not have to be defined, default behavior + is False (do not enable debug dump). + + if `enable_debug_in_console`, the same logs collected in `_debug` + are also printed in the console during the execution. """ if name == "run": func = getattr(type(self), "run") - return types.MethodType(supports_debug(func, getattr(self, 'enable_debug', False)), self) + if getattr(self, 'enable_debug_in_output', False): + return types.MethodType(record_debug_logs(func, getattr(self, 'enable_debug_in_console', False)), self) return object.__getattribute__(self, name) @classmethod From 4468a45272cb21b53dab50c058b07dea3c119843 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Tue, 5 Oct 2021 10:11:42 +0200 Subject: [PATCH 09/36] Add type to the debug dictionary --- haystack/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 927d5f56b4..f2c25fc0e4 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -263,7 +263,7 @@ def run( # type: ignore params: Optional[dict] = None ): node_output = None - debug_output = {} + debug_output: Dict[str, List[str]] = {} queue = { self.root_node: {"root_node": self.root_node, "params": params} } # ordered dict with "node_id" -> "input" mapping that acts as a FIFO queue From a714ad4d37c2fdda96b097bbd9940ff452ec38bb Mon Sep 17 00:00:00 2001 From: ZanSara Date: Tue, 5 Oct 2021 10:16:33 +0200 Subject: [PATCH 10/36] Typing run() and _dispatch_run() to please mypy --- haystack/pipeline.py | 4 ++-- haystack/schema.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index f2c25fc0e4..4bcc0142dd 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -262,8 +262,8 @@ def run( # type: ignore meta: Optional[dict] = None, params: Optional[dict] = None ): - node_output = None - debug_output: Dict[str, List[str]] = {} + node_output = {} + debug_output = {} queue = { self.root_node: {"root_node": self.root_node, "params": params} } # ordered dict with "node_id" -> "input" mapping that acts as a FIFO queue diff --git a/haystack/schema.py b/haystack/schema.py index fcf802ca73..2677260178 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -393,7 +393,7 @@ def run( documents: Optional[List[Document]] = None, meta: Optional[dict] = None, params: Optional[dict] = None, - ): + ) -> Tuple[Dict, str]: """ Method that will be executed when the node in the graph is called. @@ -406,7 +406,7 @@ def run( """ pass - def _dispatch_run(self, **kwargs): + def _dispatch_run(self, **kwargs) -> Tuple[Dict, str]: """ The Pipelines call this method which in turn executes the run() method of Component. From 42d67249c249c2fa28e06ce41ff570de676a273c Mon Sep 17 00:00:00 2001 From: ZanSara Date: Tue, 5 Oct 2021 10:21:08 +0200 Subject: [PATCH 11/36] Mypy requires more types --- haystack/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/haystack/schema.py b/haystack/schema.py index 2677260178..243f8d88a9 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -421,7 +421,7 @@ def _dispatch_run(self, **kwargs) -> Tuple[Dict, str]: run_signature_args = inspect.signature(self.run).parameters.keys() - run_params = {} + run_params: Dict[str, Any] = {} for key, value in params.items(): if key == self.name: # targeted params for this node if isinstance(value, dict): From 75f80f5e6e71ec8ffe67dd1e31c540a7d729fd87 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Tue, 5 Oct 2021 11:08:02 +0200 Subject: [PATCH 12/36] Clarify docstrings a bit --- haystack/pipeline.py | 2 -- haystack/schema.py | 62 +++++++++++++++++++++++++++----------------- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 4bcc0142dd..a084eaa42f 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -1034,8 +1034,6 @@ def __init__( self.top_k_join = top_k_join def run(self, inputs: List[dict]): # type: ignore - logging.debug("Test debug message for JoinDocuments nodes") - if self.join_mode == "concatenate": document_map = {} for input_from_node in inputs: diff --git a/haystack/schema.py b/haystack/schema.py index 243f8d88a9..6de7b5ccbe 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -256,6 +256,11 @@ def __str__(self): class InMemoryLogger(io.TextIOBase): + """ + Implementation of a logger that keeps track + of the log lines in a list called `logs`, + from where they can be accessed freely. + """ def __init__(self, *args): io.TextIOBase.__init__(self, *args) @@ -265,25 +270,33 @@ def write(self, x): self.logs.append(x[:100]) -def record_debug_logs(func: Callable, to_console_too: Optional[bool] = False): +def record_debug_logs(func: Callable, to_console_too: Optional[bool] = False) -> Callable: """ - Captures the debug logs of the wrapped functions and - saves it in a _debug key of the output dictionary. If `to_console_too` - is True, dumps the same logs to the console as well. - - Used in BaseComponent.__getattribute__() to wrap run() functions. - This makes sure that every implementation of run() by a subclass will - be automagically decorated + Captures the debug logs of the wrapped function and + saves them in the `_debug` key of the output dictionary. + If `to_console_too` is True, dumps the same logs to the console as well. + + Used in `BaseComponent.__getattribute__()` to wrap `run()` functions. + This makes sure that every implementation of `run()` by a subclass will + be automagically decorated with this method when requested. + + :param func: the function to decorate (must be an implementation of + `BaseComponent.run()`). + :param to_console_too: whether the captured logs should also be displayed + in the console during the execution of the pipeline. """ @wraps(func) - def inner(*args, **kwargs): + def inner(*args, **kwargs) -> Tuple[Dict[str, Any], str]: + with InMemoryLogger() as logs_container: logger = logging.getLogger() + # Adds a handler that stores the logs in a variable handler = logging.StreamHandler(logs_container) handler.setLevel(logging.DEBUG) logger.addHandler(handler) + # Add a handler that prints DEBUG messages in the console if to_console_too: handler_console = logging.StreamHandler() handler_console.setLevel(logging.DEBUG) @@ -291,12 +304,13 @@ def inner(*args, **kwargs): output, stream = func(*args, **kwargs) output["_debug"] = logs_container.logs - + + # Remove both handlers logger.removeHandler(handler) if to_console_too: - logger.removeHandler(handler_console) + logger.removeHandler(handler_console) - return output, stream + return output, stream return inner @@ -320,21 +334,21 @@ def __init_subclass__(cls, **kwargs): def __getattribute__(self, name): """ - This modified `__getattribute__` automagically decorates + This modified `__getattribute__` method automagically decorates every `BaseComponent.run()` implementation with the - `@supports_debug` decorator, which makes the function - dump its debug logs into a `_debug` key of the output - dictionary. - - Relies on a class attribute called `enable_debug_in_output` to know - if it should actually decorate the function or not. Therefore - the `enable_debug_in_output` flag turns on and off this functionality. + `record_debug_logs` decorator defined above. + + This decorator makes the function collect its debug logs into a + `_debug` key of the output dictionary. - `enable_debug_in_output` does not have to be defined, default behavior - is False (do not enable debug dump). + The logs collection is not always performed. Before applying the decorator, + it checks for an instance attribute called `enable_debug_in_output` to know + whether it should or not. The decorator is applied if the attribute is + defined and True. - if `enable_debug_in_console`, the same logs collected in `_debug` - are also printed in the console during the execution. + In addition, the value of the instance attribute `enable_debug_in_console` is + passed to the decorator. If it's defined and True, the same logs collected in + `_debug` are also printed in the console during the execution. """ if name == "run": func = getattr(type(self), "run") From 736d3fdefd28a29df62a0ff41c9deb90064ca488 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Tue, 5 Oct 2021 12:13:53 +0200 Subject: [PATCH 13/36] Allow enable_debug and console_debug to be passed as arguments of run() --- haystack/schema.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/haystack/schema.py b/haystack/schema.py index 6de7b5ccbe..4be3b08425 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -342,18 +342,18 @@ def __getattribute__(self, name): `_debug` key of the output dictionary. The logs collection is not always performed. Before applying the decorator, - it checks for an instance attribute called `enable_debug_in_output` to know + it checks for an instance attribute called `enable_debug` to know whether it should or not. The decorator is applied if the attribute is defined and True. - In addition, the value of the instance attribute `enable_debug_in_console` is + In addition, the value of the instance attribute `console_debug` is passed to the decorator. If it's defined and True, the same logs collected in `_debug` are also printed in the console during the execution. """ if name == "run": func = getattr(type(self), "run") - if getattr(self, 'enable_debug_in_output', False): - return types.MethodType(record_debug_logs(func, getattr(self, 'enable_debug_in_console', False)), self) + if getattr(self, 'enable_debug', False): + return types.MethodType(record_debug_logs(func, getattr(self, 'console_debug', False)), self) return object.__getattribute__(self, name) @classmethod @@ -426,6 +426,7 @@ def _dispatch_run(self, **kwargs) -> Tuple[Dict, str]: It takes care of the following: - inspect run() signature to validate if all necessary arguments are available + - pop `enable_debug` and `console_debug` and sets them on the instance to control debug output - call run() with the corresponding arguments and gather output - collate _debug information if present - merge component output with the preceding output and pass it on to the subsequent Component in the Pipeline @@ -439,9 +440,17 @@ def _dispatch_run(self, **kwargs) -> Tuple[Dict, str]: for key, value in params.items(): if key == self.name: # targeted params for this node if isinstance(value, dict): + + # Debug attributes + if "enable_debug" in value.keys(): + self.enable_debug = value.pop("enable_debug") + if "console_debug" in value.keys(): + self.console_debug = value.pop("console_debug") + for _k, _v in value.items(): if _k not in run_signature_args: raise Exception(f"Invalid parameter '{_k}' for the node '{self.name}'.") + run_params.update(**value) elif key in run_signature_args: # global params run_params[key] = value From be670b9f752ff9a3c611464ae859ea9746c0f225 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Tue, 5 Oct 2021 15:44:08 +0200 Subject: [PATCH 14/36] Avoid overwriting _debug, later we might want to store other objects in it --- haystack/pipeline.py | 10 +++++++++- haystack/schema.py | 23 +++++++++++++++++------ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index a084eaa42f..4952090f8d 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -288,7 +288,15 @@ def run( # type: ignore try: logger.debug(f"Running node `{node_id}` with input `{node_input}`") node_output, stream_id = self.graph.nodes[node_id]["component"]._dispatch_run(**node_input) - debug_output.update(node_output.get("_debug", {})) + + # Collect all debug information + if self.graph.nodes[node_id]["component"].enable_debug: + debug_output[node_id] = {} + if "_debug" in node_output.keys(): + debug_output[node_id]["run"] = node_output.pop("_debug")[node_id] + #debug_output[node_id]["input"] = node_input + #debug_output[node_id]["output"] = node_output + except Exception as e: tb = traceback.format_exc() raise Exception(f"Exception while running node `{node_id}` with input `{node_input}`: {e}, full stack trace: {tb}") diff --git a/haystack/schema.py b/haystack/schema.py index 4be3b08425..e413b868e2 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -267,7 +267,7 @@ def __init__(self, *args): self.logs = [] def write(self, x): - self.logs.append(x[:100]) + self.logs.append(x) def record_debug_logs(func: Callable, to_console_too: Optional[bool] = False) -> Callable: @@ -297,15 +297,18 @@ def inner(*args, **kwargs) -> Tuple[Dict[str, Any], str]: logger.addHandler(handler) # Add a handler that prints DEBUG messages in the console - if to_console_too: + if to_console_too and logger.level != logging.DEBUG: handler_console = logging.StreamHandler() handler_console.setLevel(logging.DEBUG) logger.addHandler(handler_console) output, stream = func(*args, **kwargs) - output["_debug"] = logs_container.logs - # Remove both handlers + if not "_debug" in output.keys(): + output["_debug"] = {} + output["_debug"]["logs"] = logs_container.logs + + # Remove both handlers logger.removeHandler(handler) if to_console_too: logger.removeHandler(handler_console) @@ -352,10 +355,18 @@ def __getattribute__(self, name): """ if name == "run": func = getattr(type(self), "run") - if getattr(self, 'enable_debug', False): - return types.MethodType(record_debug_logs(func, getattr(self, 'console_debug', False)), self) + if self.enable_debug: + return types.MethodType(record_debug_logs(func, self.console_debug), self) return object.__getattribute__(self, name) + def __getattr__(self, name): + """ + Ensures that `enable_debug` and `console_debug` are always defined. + """ + if name in ["enable_debug", "console_debug"]: + return False + raise AttributeError(f"{self.__class__.__name__} object has no attribute {name}") + @classmethod def get_subclass(cls, component_type: str): if component_type not in cls.subclasses.keys(): From 36f6b55f572d6dbfc3e4c14fc383af845ba0b881 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Tue, 5 Oct 2021 17:15:37 +0200 Subject: [PATCH 15/36] Put logs under a separate key of the _debug dictionary and add input and output of the node alongside it --- haystack/pipeline.py | 6 +++--- haystack/schema.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 4952090f8d..3db37f6c1d 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -293,9 +293,9 @@ def run( # type: ignore if self.graph.nodes[node_id]["component"].enable_debug: debug_output[node_id] = {} if "_debug" in node_output.keys(): - debug_output[node_id]["run"] = node_output.pop("_debug")[node_id] - #debug_output[node_id]["input"] = node_input - #debug_output[node_id]["output"] = node_output + debug_output[node_id] = node_output.pop("_debug")[node_id] + debug_output[node_id]["input"] = node_input + debug_output[node_id]["output"] = node_output except Exception as e: tb = traceback.format_exc() diff --git a/haystack/schema.py b/haystack/schema.py index e413b868e2..ca1fca1a91 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -307,7 +307,7 @@ def inner(*args, **kwargs) -> Tuple[Dict[str, Any], str]: if not "_debug" in output.keys(): output["_debug"] = {} output["_debug"]["logs"] = logs_container.logs - + # Remove both handlers logger.removeHandler(handler) if to_console_too: From ce1aaf4ddcb5114ff1ea10ca7752b9b235a1b522 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Wed, 6 Oct 2021 12:25:32 +0200 Subject: [PATCH 16/36] Introduce global arguments for pipeline.run() that get applied to every node when defined --- haystack/pipeline.py | 15 +++++++++++-- haystack/schema.py | 52 +++++++++++++++++++++++--------------------- 2 files changed, 40 insertions(+), 27 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 3db37f6c1d..f26850c30b 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -260,7 +260,9 @@ def run( # type: ignore labels: Optional[MultiLabel] = None, documents: Optional[List[Document]] = None, meta: Optional[dict] = None, - params: Optional[dict] = None + params: Optional[dict] = None, + debug: Optional[bool] = False, + debug_logs: Optional[bool] = False ): node_output = {} debug_output = {} @@ -283,6 +285,14 @@ def run( # type: ignore node_id = list(queue.keys())[i] node_input = queue[node_id] node_input["node_id"] = node_id + + # Apply debug attributes to the node input params + if debug: + if node_id not in node_input["params"].keys(): + node_input["params"][node_id] = {} + node_input["params"][node_id]["debug"] = debug + node_input["params"][node_id]["debug_logs"] = debug_logs + predecessors = set(nx.ancestors(self.graph, node_id)) if predecessors.isdisjoint(set(queue.keys())): # only execute if predecessor nodes are executed try: @@ -290,7 +300,7 @@ def run( # type: ignore node_output, stream_id = self.graph.nodes[node_id]["component"]._dispatch_run(**node_input) # Collect all debug information - if self.graph.nodes[node_id]["component"].enable_debug: + if debug: debug_output[node_id] = {} if "_debug" in node_output.keys(): debug_output[node_id] = node_output.pop("_debug")[node_id] @@ -300,6 +310,7 @@ def run( # type: ignore except Exception as e: tb = traceback.format_exc() raise Exception(f"Exception while running node `{node_id}` with input `{node_input}`: {e}, full stack trace: {tb}") + queue.pop(node_id) next_nodes = self.get_next_nodes(node_id, stream_id) for n in next_nodes: # add successor nodes with corresponding inputs to the queue diff --git a/haystack/schema.py b/haystack/schema.py index ca1fca1a91..349531141a 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -270,11 +270,11 @@ def write(self, x): self.logs.append(x) -def record_debug_logs(func: Callable, to_console_too: Optional[bool] = False) -> Callable: +def record_debug_logs(func: Callable, node_name: str, logs: Optional[bool] = False) -> Callable: """ Captures the debug logs of the wrapped function and saves them in the `_debug` key of the output dictionary. - If `to_console_too` is True, dumps the same logs to the console as well. + If `logs` is True, dumps the same logs to the console as well. Used in `BaseComponent.__getattribute__()` to wrap `run()` functions. This makes sure that every implementation of `run()` by a subclass will @@ -282,8 +282,8 @@ def record_debug_logs(func: Callable, to_console_too: Optional[bool] = False) -> :param func: the function to decorate (must be an implementation of `BaseComponent.run()`). - :param to_console_too: whether the captured logs should also be displayed - in the console during the execution of the pipeline. + :param logs: whether the captured logs should also be displayed + in the console during the execution of the pipeline. """ @wraps(func) def inner(*args, **kwargs) -> Tuple[Dict[str, Any], str]: @@ -293,13 +293,16 @@ def inner(*args, **kwargs) -> Tuple[Dict[str, Any], str]: # Adds a handler that stores the logs in a variable handler = logging.StreamHandler(logs_container) - handler.setLevel(logging.DEBUG) + handler.setLevel(logger.level or logging.DEBUG) logger.addHandler(handler) - # Add a handler that prints DEBUG messages in the console - if to_console_too and logger.level != logging.DEBUG: + # Add a handler that prints log messages in the console + # to the specified level for the node + if logs: handler_console = logging.StreamHandler() handler_console.setLevel(logging.DEBUG) + formatter = logging.Formatter(f'[{node_name} logs] %(message)s') + handler_console.setFormatter(formatter) logger.addHandler(handler_console) output, stream = func(*args, **kwargs) @@ -310,7 +313,7 @@ def inner(*args, **kwargs) -> Tuple[Dict[str, Any], str]: # Remove both handlers logger.removeHandler(handler) - if to_console_too: + if logs: logger.removeHandler(handler_console) return output, stream @@ -345,26 +348,25 @@ def __getattribute__(self, name): `_debug` key of the output dictionary. The logs collection is not always performed. Before applying the decorator, - it checks for an instance attribute called `enable_debug` to know + it checks for an instance attribute called `debug` to know whether it should or not. The decorator is applied if the attribute is defined and True. - In addition, the value of the instance attribute `console_debug` is - passed to the decorator. If it's defined and True, the same logs collected in - `_debug` are also printed in the console during the execution. + In addition, the value of the instance attribute `debug_logs` is + passed to the decorator. If it's True, it will print the + logs in the console as well. """ - if name == "run": + if name == "run" and self.debug: func = getattr(type(self), "run") - if self.enable_debug: - return types.MethodType(record_debug_logs(func, self.console_debug), self) + return types.MethodType(record_debug_logs(func=func, node_name=self.__class__.__name__, logs=self.debug_logs), self) return object.__getattribute__(self, name) def __getattr__(self, name): """ - Ensures that `enable_debug` and `console_debug` are always defined. + Ensures that `debug` and `debug_logs` are always defined. """ - if name in ["enable_debug", "console_debug"]: - return False + if name in ["debug", "debug_logs"]: + return None raise AttributeError(f"{self.__class__.__name__} object has no attribute {name}") @classmethod @@ -437,9 +439,9 @@ def _dispatch_run(self, **kwargs) -> Tuple[Dict, str]: It takes care of the following: - inspect run() signature to validate if all necessary arguments are available - - pop `enable_debug` and `console_debug` and sets them on the instance to control debug output + - pop `debug` and `debug_logs` and sets them on the instance to control debug output - call run() with the corresponding arguments and gather output - - collate _debug information if present + - collate `_debug` information if present - merge component output with the preceding output and pass it on to the subsequent Component in the Pipeline """ arguments = deepcopy(kwargs) @@ -452,11 +454,11 @@ def _dispatch_run(self, **kwargs) -> Tuple[Dict, str]: if key == self.name: # targeted params for this node if isinstance(value, dict): - # Debug attributes - if "enable_debug" in value.keys(): - self.enable_debug = value.pop("enable_debug") - if "console_debug" in value.keys(): - self.console_debug = value.pop("console_debug") + # Extract debug attributes + if "debug" in value.keys(): + self.debug = value.pop("debug") + if "debug_logs" in value.keys(): + self.debug_logs = value.pop("debug_logs") for _k, _v in value.items(): if _k not in run_signature_args: From 08092b3e14fe8175004ce980f40ec4fd5d87b490 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Wed, 6 Oct 2021 13:21:29 +0200 Subject: [PATCH 17/36] Change default values of debug variables to None, otherwise their default would override the params values --- haystack/pipeline.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index f26850c30b..96632c918c 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -261,8 +261,8 @@ def run( # type: ignore documents: Optional[List[Document]] = None, meta: Optional[dict] = None, params: Optional[dict] = None, - debug: Optional[bool] = False, - debug_logs: Optional[bool] = False + debug: Optional[bool] = None, + debug_logs: Optional[bool] = None ): node_output = {} debug_output = {} @@ -291,7 +291,8 @@ def run( # type: ignore if node_id not in node_input["params"].keys(): node_input["params"][node_id] = {} node_input["params"][node_id]["debug"] = debug - node_input["params"][node_id]["debug_logs"] = debug_logs + if debug_logs: + node_input["params"][node_id]["debug_logs"] = debug_logs predecessors = set(nx.ancestors(self.graph, node_id)) if predecessors.isdisjoint(set(queue.keys())): # only execute if predecessor nodes are executed From da9d5869a2687b0da3875549930f54b338d215f1 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Wed, 6 Oct 2021 13:36:26 +0200 Subject: [PATCH 18/36] Remove unused import --- haystack/pipeline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 96632c918c..a2cf908ff6 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -10,7 +10,6 @@ import pickle import urllib from functools import wraps -from networkx.algorithms.boundary import node_boundary try: from ray import serve From 0c77b2955dbe0d13d332f4a0685bc7c27ff49d80 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Wed, 6 Oct 2021 13:37:56 +0200 Subject: [PATCH 19/36] more typing for mypy --- haystack/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index a2cf908ff6..04f9b0511a 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -264,7 +264,7 @@ def run( # type: ignore debug_logs: Optional[bool] = None ): node_output = {} - debug_output = {} + debug_output: Dict[str, Dict[str, Any]] = {} queue = { self.root_node: {"root_node": self.root_node, "params": params} } # ordered dict with "node_id" -> "input" mapping that acts as a FIFO queue From 6f244e628ee86c719ed596bbb75e1e08691db99b Mon Sep 17 00:00:00 2001 From: ZanSara Date: Wed, 6 Oct 2021 13:55:39 +0200 Subject: [PATCH 20/36] Remove a potential infinite recursion on the overridden __getattr__ --- haystack/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/haystack/schema.py b/haystack/schema.py index 349531141a..f54a525947 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -367,7 +367,7 @@ def __getattr__(self, name): """ if name in ["debug", "debug_logs"]: return None - raise AttributeError(f"{self.__class__.__name__} object has no attribute {name}") + raise AttributeError(name) @classmethod def get_subclass(cls, component_type: str): From 24ceba8efd1815a68ea81de95309a11b522df43b Mon Sep 17 00:00:00 2001 From: ZanSara Date: Wed, 6 Oct 2021 14:37:19 +0200 Subject: [PATCH 21/36] Add a simple test for the debug attributes --- test/test_pipeline.py | 77 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 71 insertions(+), 6 deletions(-) diff --git a/test/test_pipeline.py b/test/test_pipeline.py index 8e29663999..377f49ee3b 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -26,29 +26,28 @@ def test_load_and_save_yaml(document_store, tmp_path): # test correct load of indexing pipeline from yaml pipeline = Pipeline.load_from_yaml( - Path("samples/pipeline/test_pipeline.yaml"), pipeline_name="indexing_pipeline" + Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline" ) pipeline.run( - file_paths=Path("samples/pdf/sample_pdf_1.pdf"), + file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf", params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}, ) - # test correct load of query pipeline from yaml pipeline = Pipeline.load_from_yaml( - Path("samples/pipeline/test_pipeline.yaml"), pipeline_name="query_pipeline" + Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline" ) prediction = pipeline.run( query="Who made the PDF specification?", params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}} ) assert prediction["query"] == "Who made the PDF specification?" assert prediction["answers"][0]["answer"] == "Adobe Systems" + assert "_debug" not in prediction.keys() # test invalid pipeline name with pytest.raises(Exception): Pipeline.load_from_yaml( - path=Path("samples/pipeline/test_pipeline.yaml"), pipeline_name="invalid" + path=Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="invalid" ) - # test config export pipeline.save_to_yaml(tmp_path / "test.yaml") with open(tmp_path / "test.yaml", "r", encoding="utf-8") as stream: @@ -86,6 +85,72 @@ def test_load_and_save_yaml(document_store, tmp_path): ).replace("\n", "") +@pytest.mark.elasticsearch +@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) +def test_debug_attributes(document_store, tmp_path): + # test correct load of pipelines from yaml + pipeline = Pipeline.load_from_yaml( + Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline" + ) + pipeline.run( + file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf", + params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}, + ) + pipeline = Pipeline.load_from_yaml( + Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline" + ) + prediction = pipeline.run( + query="Who made the PDF specification?", + params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True, "debug_logs": True}}, + debug=True, + debug_logs=True + ) + assert prediction["query"] == "Who made the PDF specification?" + assert prediction["answers"][0]["answer"] == "Adobe Systems" + assert "_debug" in prediction.keys() + assert any(node in prediction["_debug"].keys() for node in ["Query", "ESRetriever", "Reader"]) + assert any(key in prediction["_debug"]["Query"].keys() for key in ["input", "output"]) + assert any(key in prediction["_debug"]["ESRetriever"].keys() for key in ["input", "output"]) + assert any(key in prediction["_debug"]["Reader"].keys() for key in ["input", "output"]) + assert prediction["_debug"]["Reader"]["output"]["answers"][0]["answer"] == "Adobe Systems" + + # test config export does not contain debug attributes + pipeline.save_to_yaml(tmp_path / "test.yaml") + with open(tmp_path / "test.yaml", "r", encoding="utf-8") as stream: + saved_yaml = stream.read() + expected_yaml = """ + components: + - name: ESRetriever + params: + document_store: ElasticsearchDocumentStore + type: ElasticsearchRetriever + - name: ElasticsearchDocumentStore + params: + index: haystack_test + label_index: haystack_test_label + type: ElasticsearchDocumentStore + - name: Reader + params: + model_name_or_path: deepset/roberta-base-squad2 + no_ans_boost: -10 + type: FARMReader + pipelines: + - name: query + nodes: + - inputs: + - Query + name: ESRetriever + - inputs: + - ESRetriever + name: Reader + type: Pipeline + version: '0.8' + """ + assert saved_yaml.replace(" ", "").replace("\n", "") == expected_yaml.replace( + " ", "" + ).replace("\n", "") + + # @pytest.mark.slow # @pytest.mark.elasticsearch # @pytest.mark.parametrize( From 53a582eb2c5fbfb15650d173b041932121313ad9 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Wed, 6 Oct 2021 15:28:10 +0200 Subject: [PATCH 22/36] Add test and fix small issue on global debug=False not overriding node specific settings --- haystack/pipeline.py | 8 ++-- haystack/schema.py | 2 +- test/test_pipeline.py | 94 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 98 insertions(+), 6 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 04f9b0511a..98cedc3a14 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -286,11 +286,13 @@ def run( # type: ignore node_input["node_id"] = node_id # Apply debug attributes to the node input params - if debug: + # NOTE: global debug attributes will override the value specified + # in each node's params dictionary. + if debug is not None: if node_id not in node_input["params"].keys(): node_input["params"][node_id] = {} node_input["params"][node_id]["debug"] = debug - if debug_logs: + if debug_logs is not None: node_input["params"][node_id]["debug_logs"] = debug_logs predecessors = set(nx.ancestors(self.graph, node_id)) @@ -337,7 +339,7 @@ def run( # type: ignore i = 0 else: i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors - if debug_output: + if debug: node_output["_debug"] = debug_output return node_output diff --git a/haystack/schema.py b/haystack/schema.py index f54a525947..fb798af2d7 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -270,7 +270,7 @@ def write(self, x): self.logs.append(x) -def record_debug_logs(func: Callable, node_name: str, logs: Optional[bool] = False) -> Callable: +def record_debug_logs(func: Callable, node_name: str, logs: bool) -> Callable: """ Captures the debug logs of the wrapped function and saves them in the `_debug` key of the output dictionary. diff --git a/test/test_pipeline.py b/test/test_pipeline.py index 377f49ee3b..3b109cf049 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -87,7 +87,7 @@ def test_load_and_save_yaml(document_store, tmp_path): @pytest.mark.elasticsearch @pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) -def test_debug_attributes(document_store, tmp_path): +def test_debug_attributes_global(document_store, tmp_path): # test correct load of pipelines from yaml pipeline = Pipeline.load_from_yaml( Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline" @@ -99,9 +99,10 @@ def test_debug_attributes(document_store, tmp_path): pipeline = Pipeline.load_from_yaml( Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline" ) + # Test global arguments prediction = pipeline.run( query="Who made the PDF specification?", - params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True, "debug_logs": True}}, + params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}, debug=True, debug_logs=True ) @@ -151,6 +152,95 @@ def test_debug_attributes(document_store, tmp_path): ).replace("\n", "") +@pytest.mark.elasticsearch +@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) +def test_debug_attributes_per_node(document_store, tmp_path): + # test correct load of pipelines from yaml + pipeline = Pipeline.load_from_yaml( + Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline" + ) + pipeline.run( + file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf", + params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}, + ) + pipeline = Pipeline.load_from_yaml( + Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline" + ) + # Test node-specific arguments + prediction = pipeline.run( + query="Who made the PDF specification?", + params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True, "debug_logs": True}}, + ) + assert prediction["query"] == "Who made the PDF specification?" + assert prediction["answers"][0]["answer"] == "Adobe Systems" + assert "_debug" in prediction.keys() + assert "Query" not in prediction["_debug"].keys() + assert "ESRetriever" not in prediction["_debug"].keys() + assert "Reader" in prediction["_debug"].keys() + assert any(key in prediction["_debug"]["Reader"].keys() for key in ["input", "output"]) + assert prediction["_debug"]["Reader"]["output"]["answers"][0]["answer"] == "Adobe Systems" + + # test config export does not contain debug attributes + pipeline.save_to_yaml(tmp_path / "test.yaml") + with open(tmp_path / "test.yaml", "r", encoding="utf-8") as stream: + saved_yaml = stream.read() + expected_yaml = """ + components: + - name: ESRetriever + params: + document_store: ElasticsearchDocumentStore + type: ElasticsearchRetriever + - name: ElasticsearchDocumentStore + params: + index: haystack_test + label_index: haystack_test_label + type: ElasticsearchDocumentStore + - name: Reader + params: + model_name_or_path: deepset/roberta-base-squad2 + no_ans_boost: -10 + type: FARMReader + pipelines: + - name: query + nodes: + - inputs: + - Query + name: ESRetriever + - inputs: + - ESRetriever + name: Reader + type: Pipeline + version: '0.8' + """ + assert saved_yaml.replace(" ", "").replace("\n", "") == expected_yaml.replace( + " ", "" + ).replace("\n", "") + + +@pytest.mark.elasticsearch +@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) +def test_global_debug_attributes_override_node_ones(document_store, tmp_path): + # test correct load of pipelines from yaml + pipeline = Pipeline.load_from_yaml( + Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline" + ) + pipeline.run( + file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf", + params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}, + ) + pipeline = Pipeline.load_from_yaml( + Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline" + ) + # Test node-specific arguments + prediction = pipeline.run( + query="Who made the PDF specification?", + params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}},#, "debug_logs": True}}, + debug=False, + #debug_logs=False + ) + assert "_debug" not in prediction.keys() + + # @pytest.mark.slow # @pytest.mark.elasticsearch # @pytest.mark.parametrize( From 97fe6b9767581b52231635b8b76b585b1fb66eb6 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Wed, 6 Oct 2021 16:31:25 +0200 Subject: [PATCH 23/36] Do not append the output of the last node in the _debug key, it causes infinite recursion --- haystack/pipeline.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 98cedc3a14..98f40f2f4a 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -307,7 +307,8 @@ def run( # type: ignore if "_debug" in node_output.keys(): debug_output[node_id] = node_output.pop("_debug")[node_id] debug_output[node_id]["input"] = node_input - debug_output[node_id]["output"] = node_output + if len(queue) > 1: # Exclude the output of the last node to avoid infinite recursion + debug_output[node_id]["output"] = node_output except Exception as e: tb = traceback.format_exc() From bb6b63da2042574b7eb30fd3dca274dc8698204c Mon Sep 17 00:00:00 2001 From: ZanSara Date: Wed, 6 Oct 2021 16:37:41 +0200 Subject: [PATCH 24/36] Fix tests --- test/test_pipeline.py | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/test/test_pipeline.py b/test/test_pipeline.py index 3b109cf049..a2f3eb3120 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -169,15 +169,15 @@ def test_debug_attributes_per_node(document_store, tmp_path): # Test node-specific arguments prediction = pipeline.run( query="Who made the PDF specification?", - params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True, "debug_logs": True}}, + params={"Retriever": {"top_k": 10, "debug": True, "debug_logs": True}, "Reader": {"top_k": 3, "debug": True,}}, ) assert prediction["query"] == "Who made the PDF specification?" assert prediction["answers"][0]["answer"] == "Adobe Systems" assert "_debug" in prediction.keys() assert "Query" not in prediction["_debug"].keys() - assert "ESRetriever" not in prediction["_debug"].keys() + assert "ESRetriever" in prediction["_debug"].keys() assert "Reader" in prediction["_debug"].keys() - assert any(key in prediction["_debug"]["Reader"].keys() for key in ["input", "output"]) + assert any(key in prediction["_debug"]["ESRetriever"].keys() for key in ["input", "output"]) assert prediction["_debug"]["Reader"]["output"]["answers"][0]["answer"] == "Adobe Systems" # test config export does not contain debug attributes @@ -234,13 +234,37 @@ def test_global_debug_attributes_override_node_ones(document_store, tmp_path): # Test node-specific arguments prediction = pipeline.run( query="Who made the PDF specification?", - params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}},#, "debug_logs": True}}, + params={"Retriever": {"top_k": 10, "debug": True}, "Reader": {"top_k": 3}}, debug=False, - #debug_logs=False ) assert "_debug" not in prediction.keys() +@pytest.mark.elasticsearch +@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) +def test_last_node_debug_does_not_contain_output(document_store, tmp_path): + # test correct load of pipelines from yaml + pipeline = Pipeline.load_from_yaml( + Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline" + ) + pipeline.run( + file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf", + params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}, + ) + pipeline = Pipeline.load_from_yaml( + Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline" + ) + # Test node-specific arguments + prediction = pipeline.run( + query="Who made the PDF specification?", + params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True,}}, + ) + assert "Reader" in prediction["_debug"].keys() + assert "input" in prediction["_debug"]["ESRetriever"].keys() + assert "output" not in prediction["_debug"]["ESRetriever"].keys() + + + # @pytest.mark.slow # @pytest.mark.elasticsearch # @pytest.mark.parametrize( From 8c61e8c0b70e0985add5476340a2b78f346b03a8 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Thu, 7 Oct 2021 15:21:43 +0200 Subject: [PATCH 25/36] Removed recursion between _debug and output and fixed tests --- haystack/pipeline.py | 24 +++-- test/test_pipeline.py | 239 +++++++++++++++--------------------------- 2 files changed, 96 insertions(+), 167 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 98f40f2f4a..bfdb06ec2a 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -300,16 +300,6 @@ def run( # type: ignore try: logger.debug(f"Running node `{node_id}` with input `{node_input}`") node_output, stream_id = self.graph.nodes[node_id]["component"]._dispatch_run(**node_input) - - # Collect all debug information - if debug: - debug_output[node_id] = {} - if "_debug" in node_output.keys(): - debug_output[node_id] = node_output.pop("_debug")[node_id] - debug_output[node_id]["input"] = node_input - if len(queue) > 1: # Exclude the output of the last node to avoid infinite recursion - debug_output[node_id]["output"] = node_output - except Exception as e: tb = traceback.format_exc() raise Exception(f"Exception while running node `{node_id}` with input `{node_input}`: {e}, full stack trace: {tb}") @@ -337,10 +327,22 @@ def run( # type: ignore queue[n] = updated_input else: queue[n] = node_output + + # Collect all debug information + #if "_debug" in node_output.keys(): + if node_input.get("params", {}).get(node_id, {}).get("debug", False): + debug_output[node_id] = node_output.get("_debug", {}).get(node_id, {}) + debug_output[node_id]["input"] = node_input + # Exclude the _debug key from the output to avoid infinite recursion + node_output_without_debug = {key: value for key, value in node_output.items() if key != "_debug"} + node_output_without_debug["_debug"] = "" + debug_output[node_id]["output"] = node_output_without_debug + i = 0 else: i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors - if debug: + + if debug_output: node_output["_debug"] = debug_output return node_output diff --git a/test/test_pipeline.py b/test/test_pipeline.py index a2f3eb3120..4a551ab260 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -1,5 +1,6 @@ from pathlib import Path +import json import math import pytest @@ -16,6 +17,7 @@ TransformersQueryClassifier, MostSimilarDocumentsPipeline, ) +from haystack.reader import FARMReader from haystack.retriever.dense import DensePassageRetriever from haystack.retriever.sparse import ElasticsearchRetriever from haystack.schema import Document @@ -86,182 +88,107 @@ def test_load_and_save_yaml(document_store, tmp_path): @pytest.mark.elasticsearch -@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) -def test_debug_attributes_global(document_store, tmp_path): - # test correct load of pipelines from yaml - pipeline = Pipeline.load_from_yaml( - Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline" - ) - pipeline.run( - file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf", - params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}, - ) - pipeline = Pipeline.load_from_yaml( - Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline" - ) - # Test global arguments +@pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True) +def test_debug_attributes_global(document_store_with_docs, tmp_path): + + es_retriever = ElasticsearchRetriever(document_store=document_store_with_docs) + reader = FARMReader(model_name_or_path="deepset/roberta-base-squad2") + + pipeline = Pipeline() + pipeline.add_node(component=es_retriever, name="ESRetriever", inputs=["Query"]) + pipeline.add_node(component=reader, name="Reader", inputs=["ESRetriever"]) + prediction = pipeline.run( - query="Who made the PDF specification?", - params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}, + query="Who lives in Berlin?", + params={"ESRetriever": {"top_k": 10}, "Reader": {"top_k": 3}}, debug=True, debug_logs=True ) - assert prediction["query"] == "Who made the PDF specification?" - assert prediction["answers"][0]["answer"] == "Adobe Systems" assert "_debug" in prediction.keys() - assert any(node in prediction["_debug"].keys() for node in ["Query", "ESRetriever", "Reader"]) - assert any(key in prediction["_debug"]["Query"].keys() for key in ["input", "output"]) - assert any(key in prediction["_debug"]["ESRetriever"].keys() for key in ["input", "output"]) - assert any(key in prediction["_debug"]["Reader"].keys() for key in ["input", "output"]) - assert prediction["_debug"]["Reader"]["output"]["answers"][0]["answer"] == "Adobe Systems" + assert "ESRetriever" in prediction["_debug"].keys() + assert "Reader" in prediction["_debug"].keys() + assert "input" in prediction["_debug"]["ESRetriever"].keys() + assert "output" in prediction["_debug"]["ESRetriever"].keys() + assert "input" in prediction["_debug"]["Reader"].keys() + assert "output" in prediction["_debug"]["Reader"].keys() + assert prediction["_debug"]["ESRetriever"]["input"] + assert prediction["_debug"]["ESRetriever"]["output"] + assert prediction["_debug"]["Reader"]["input"] + assert prediction["_debug"]["Reader"]["output"] + + # Avoid circular reference: easiest way to detect those is to use json.dumps + json.dumps(prediction, default=str) - # test config export does not contain debug attributes - pipeline.save_to_yaml(tmp_path / "test.yaml") - with open(tmp_path / "test.yaml", "r", encoding="utf-8") as stream: - saved_yaml = stream.read() - expected_yaml = """ - components: - - name: ESRetriever - params: - document_store: ElasticsearchDocumentStore - type: ElasticsearchRetriever - - name: ElasticsearchDocumentStore - params: - index: haystack_test - label_index: haystack_test_label - type: ElasticsearchDocumentStore - - name: Reader - params: - model_name_or_path: deepset/roberta-base-squad2 - no_ans_boost: -10 - type: FARMReader - pipelines: - - name: query - nodes: - - inputs: - - Query - name: ESRetriever - - inputs: - - ESRetriever - name: Reader - type: Pipeline - version: '0.8' - """ - assert saved_yaml.replace(" ", "").replace("\n", "") == expected_yaml.replace( - " ", "" - ).replace("\n", "") +@pytest.mark.elasticsearch +@pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True) +def test_debug_attributes_per_node(document_store_with_docs, tmp_path): + + es_retriever = ElasticsearchRetriever(document_store=document_store_with_docs) + reader = FARMReader(model_name_or_path="deepset/roberta-base-squad2") + pipeline = Pipeline() + pipeline.add_node(component=es_retriever, name="ESRetriever", inputs=["Query"]) + pipeline.add_node(component=reader, name="Reader", inputs=["ESRetriever"]) -@pytest.mark.elasticsearch -@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) -def test_debug_attributes_per_node(document_store, tmp_path): - # test correct load of pipelines from yaml - pipeline = Pipeline.load_from_yaml( - Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline" - ) - pipeline.run( - file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf", - params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}, - ) - pipeline = Pipeline.load_from_yaml( - Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline" - ) - # Test node-specific arguments prediction = pipeline.run( - query="Who made the PDF specification?", - params={"Retriever": {"top_k": 10, "debug": True, "debug_logs": True}, "Reader": {"top_k": 3, "debug": True,}}, + query="Who lives in Berlin?", + params={ + "ESRetriever": {"top_k": 10, "debug": True, "debug_logs":True}, + "Reader": {"top_k": 3} + }, ) - assert prediction["query"] == "Who made the PDF specification?" - assert prediction["answers"][0]["answer"] == "Adobe Systems" assert "_debug" in prediction.keys() - assert "Query" not in prediction["_debug"].keys() assert "ESRetriever" in prediction["_debug"].keys() - assert "Reader" in prediction["_debug"].keys() - assert any(key in prediction["_debug"]["ESRetriever"].keys() for key in ["input", "output"]) - assert prediction["_debug"]["Reader"]["output"]["answers"][0]["answer"] == "Adobe Systems" + assert "Reader" not in prediction["_debug"].keys() + assert "input" in prediction["_debug"]["ESRetriever"].keys() + assert "output" in prediction["_debug"]["ESRetriever"].keys() + assert prediction["_debug"]["ESRetriever"]["input"] + assert prediction["_debug"]["ESRetriever"]["output"] - # test config export does not contain debug attributes - pipeline.save_to_yaml(tmp_path / "test.yaml") - with open(tmp_path / "test.yaml", "r", encoding="utf-8") as stream: - saved_yaml = stream.read() - expected_yaml = """ - components: - - name: ESRetriever - params: - document_store: ElasticsearchDocumentStore - type: ElasticsearchRetriever - - name: ElasticsearchDocumentStore - params: - index: haystack_test - label_index: haystack_test_label - type: ElasticsearchDocumentStore - - name: Reader - params: - model_name_or_path: deepset/roberta-base-squad2 - no_ans_boost: -10 - type: FARMReader - pipelines: - - name: query - nodes: - - inputs: - - Query - name: ESRetriever - - inputs: - - ESRetriever - name: Reader - type: Pipeline - version: '0.8' - """ - assert saved_yaml.replace(" ", "").replace("\n", "") == expected_yaml.replace( - " ", "" - ).replace("\n", "") + # Avoid circular reference: easiest way to detect those is to use json.dumps + json.dumps(prediction, default=str) @pytest.mark.elasticsearch -@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) -def test_global_debug_attributes_override_node_ones(document_store, tmp_path): - # test correct load of pipelines from yaml - pipeline = Pipeline.load_from_yaml( - Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline" - ) - pipeline.run( - file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf", - params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}, - ) - pipeline = Pipeline.load_from_yaml( - Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline" - ) - # Test node-specific arguments +@pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True) +def test_global_debug_attributes_override_node_ones(document_store_with_docs, tmp_path): + + es_retriever = ElasticsearchRetriever(document_store=document_store_with_docs) + reader = FARMReader(model_name_or_path="deepset/roberta-base-squad2") + + pipeline = Pipeline() + pipeline.add_node(component=es_retriever, name="ESRetriever", inputs=["Query"]) + pipeline.add_node(component=reader, name="Reader", inputs=["ESRetriever"]) + prediction = pipeline.run( - query="Who made the PDF specification?", - params={"Retriever": {"top_k": 10, "debug": True}, "Reader": {"top_k": 3}}, - debug=False, + query="Who lives in Berlin?", + params={ + "ESRetriever": {"top_k": 10, "debug": True, "debug_logs":True}, + "Reader": {"top_k": 3, "debug": True} + }, + debug=False ) assert "_debug" not in prediction.keys() - -@pytest.mark.elasticsearch -@pytest.mark.parametrize("document_store", ["elasticsearch"], indirect=True) -def test_last_node_debug_does_not_contain_output(document_store, tmp_path): - # test correct load of pipelines from yaml - pipeline = Pipeline.load_from_yaml( - Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="indexing_pipeline" - ) - pipeline.run( - file_paths=Path(__file__).parent/"samples"/"pdf"/"sample_pdf_1.pdf", - params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3}}, - ) - pipeline = Pipeline.load_from_yaml( - Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline" - ) - # Test node-specific arguments prediction = pipeline.run( - query="Who made the PDF specification?", - params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True,}}, + query="Who lives in Berlin?", + params={ + "ESRetriever": {"top_k": 10, "debug": False}, + "Reader": {"top_k": 3, "debug": False} + }, + debug=True ) + assert "_debug" in prediction.keys() + assert "ESRetriever" in prediction["_debug"].keys() assert "Reader" in prediction["_debug"].keys() - assert "input" in prediction["_debug"]["ESRetriever"].keys() - assert "output" not in prediction["_debug"]["ESRetriever"].keys() + assert "input" in prediction["_debug"]["ESRetriever"].keys() + assert "output" in prediction["_debug"]["ESRetriever"].keys() + assert "input" in prediction["_debug"]["Reader"].keys() + assert "output" in prediction["_debug"]["Reader"].keys() + assert prediction["_debug"]["ESRetriever"]["input"] + assert prediction["_debug"]["ESRetriever"]["output"] + assert prediction["_debug"]["Reader"]["input"] + assert prediction["_debug"]["Reader"]["output"] @@ -317,7 +244,7 @@ def test_graph_creation(retriever_with_docs, document_store_with_docs): def test_invalid_run_args(): pipeline = Pipeline.load_from_yaml( - Path("samples/pipeline/test_pipeline.yaml"), pipeline_name="query_pipeline" + Path(__file__).parent/"samples"/"pipeline"/"test_pipeline.yaml", pipeline_name="query_pipeline" ) with pytest.raises(Exception) as exc: pipeline.run(params={"ESRetriever": {"top_k": 10}}) @@ -551,7 +478,7 @@ def test_debug_info_propagation(): class A(RootNode): def run(self): test = "A" - return {"test": test, "_debug": "debug_from_a"}, "output_1" + return {"test": test, "_debug": {"debug_key_a": "debug_value_a"}}, "output_1" class B(RootNode): def run(self, test): @@ -575,7 +502,7 @@ def run(self, test, _debug): pipeline.add_node(name="C", component=C(), inputs=["B"]) pipeline.add_node(name="D", component=D(), inputs=["C"]) output = pipeline.run(query="test") - assert output["_debug"]["A"] == "debug_from_a" + assert output["_debug"]["A"]["debug_key_a"] == "debug_value_a" assert output["_debug"]["B"]["debug_key_b"] == "debug_value_b" From 96c141805c3afcbbcc974b9e2d0587fd4b15a429 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Thu, 7 Oct 2021 15:32:43 +0200 Subject: [PATCH 26/36] Apparently node_input can be None :/ --- haystack/pipeline.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index bfdb06ec2a..bda13bbb0c 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -329,8 +329,7 @@ def run( # type: ignore queue[n] = node_output # Collect all debug information - #if "_debug" in node_output.keys(): - if node_input.get("params", {}).get(node_id, {}).get("debug", False): + if node_input and node_input.get("params", {}).get(node_id, {}).get("debug", False): debug_output[node_id] = node_output.get("_debug", {}).get(node_id, {}) debug_output[node_id]["input"] = node_input # Exclude the _debug key from the output to avoid infinite recursion From a03eee737c2f9056fcfc61ad2c23d88d4c837b72 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Thu, 7 Oct 2021 16:23:52 +0200 Subject: [PATCH 27/36] Move the input/output collection into _dispatch_run to gather only relevant info --- haystack/pipeline.py | 16 +--------------- haystack/schema.py | 12 +++++++++++- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index bda13bbb0c..45528269b2 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -264,7 +264,6 @@ def run( # type: ignore debug_logs: Optional[bool] = None ): node_output = {} - debug_output: Dict[str, Dict[str, Any]] = {} queue = { self.root_node: {"root_node": self.root_node, "params": params} } # ordered dict with "node_id" -> "input" mapping that acts as a FIFO queue @@ -303,7 +302,7 @@ def run( # type: ignore except Exception as e: tb = traceback.format_exc() raise Exception(f"Exception while running node `{node_id}` with input `{node_input}`: {e}, full stack trace: {tb}") - + queue.pop(node_id) next_nodes = self.get_next_nodes(node_id, stream_id) for n in next_nodes: # add successor nodes with corresponding inputs to the queue @@ -327,22 +326,9 @@ def run( # type: ignore queue[n] = updated_input else: queue[n] = node_output - - # Collect all debug information - if node_input and node_input.get("params", {}).get(node_id, {}).get("debug", False): - debug_output[node_id] = node_output.get("_debug", {}).get(node_id, {}) - debug_output[node_id]["input"] = node_input - # Exclude the _debug key from the output to avoid infinite recursion - node_output_without_debug = {key: value for key, value in node_output.items() if key != "_debug"} - node_output_without_debug["_debug"] = "" - debug_output[node_id]["output"] = node_output_without_debug - i = 0 else: i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors - - if debug_output: - node_output["_debug"] = debug_output return node_output def get_next_nodes(self, node_id: str, stream_id: str): diff --git a/haystack/schema.py b/haystack/schema.py index fb798af2d7..05f050fe95 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -475,9 +475,19 @@ def _dispatch_run(self, **kwargs) -> Tuple[Dict, str]: output, stream = self.run(**run_inputs, **run_params) + # Collect debug information + current_debug = output.get("_debug", {}) + if self.debug: + current_debug["input"] = {**run_inputs, **run_params} + if self.debug: + current_debug["input"]["debug"] = self.debug + if self.debug_logs: + current_debug["input"]["debug_logs"] = self.debug_logs + filtered_output = {key: value for key, value in output.items() if key != "_debug"} # Exclude _debug to avoid recursion + current_debug["output"] = filtered_output + # append _debug information from nodes all_debug = arguments.get("_debug", {}) - current_debug = output.get("_debug") if current_debug: all_debug[self.name] = current_debug if all_debug: From bfcea463d63fa7a75a3b94786ca0e0630d620c35 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Thu, 7 Oct 2021 16:48:32 +0200 Subject: [PATCH 28/36] Minor cleanup --- haystack/pipeline.py | 3 +-- haystack/schema.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 45528269b2..719a952218 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -263,7 +263,7 @@ def run( # type: ignore debug: Optional[bool] = None, debug_logs: Optional[bool] = None ): - node_output = {} + node_output = None queue = { self.root_node: {"root_node": self.root_node, "params": params} } # ordered dict with "node_id" -> "input" mapping that acts as a FIFO queue @@ -302,7 +302,6 @@ def run( # type: ignore except Exception as e: tb = traceback.format_exc() raise Exception(f"Exception while running node `{node_id}` with input `{node_input}`: {e}, full stack trace: {tb}") - queue.pop(node_id) next_nodes = self.get_next_nodes(node_id, stream_id) for n in next_nodes: # add successor nodes with corresponding inputs to the queue diff --git a/haystack/schema.py b/haystack/schema.py index 05f050fe95..555da3d198 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -9,7 +9,6 @@ import logging import io from functools import wraps -import types class Document: @@ -358,7 +357,7 @@ def __getattribute__(self, name): """ if name == "run" and self.debug: func = getattr(type(self), "run") - return types.MethodType(record_debug_logs(func=func, node_name=self.__class__.__name__, logs=self.debug_logs), self) + return record_debug_logs(func=func, node_name=self.__class__.__name__, logs=self.debug_logs).__get__(self) return object.__getattribute__(self, name) def __getattr__(self, name): From 4b0a28ce7f8e3a1be38139c8c3dde2a9fd64f3db Mon Sep 17 00:00:00 2001 From: ZanSara Date: Thu, 7 Oct 2021 18:19:55 +0200 Subject: [PATCH 29/36] Add partial Pipeline.run() docstring --- haystack/pipeline.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 719a952218..1d9f9d3906 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -263,6 +263,23 @@ def run( # type: ignore debug: Optional[bool] = None, debug_logs: Optional[bool] = None ): + """ + Runs the pipeline, one note at a time. + + :param query: The question, for question answering pipelines + :param file_paths: The files to index, for indexing pipelines + :param labels: + :param documents: + :param meta: + :param params: Dictionary of parameters to be dispatched to every node. Example: + {"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}} + :param debug: Whether the pipeline should instruct nodes to collect debug information + about their execution. By default these include the input parameters + they received, the output they generated, and eventual logs (of any severity) + emitted. + :param debug_logs: Whether all the logs of the node should be printed in the console, + regardless of their severity and of the existing logger's settings. + """ node_output = None queue = { self.root_node: {"root_node": self.root_node, "params": params} From e6503a991df66ef72a5ffd5c0600a3445d7e6a81 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Thu, 7 Oct 2021 18:28:35 +0200 Subject: [PATCH 30/36] Move InMemoryLogger into utils.py --- haystack/pipeline.py | 2 +- haystack/schema.py | 16 +--------------- haystack/utils.py | 15 +++++++++++++++ 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 1d9f9d3906..7618dddef7 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -269,7 +269,7 @@ def run( # type: ignore :param query: The question, for question answering pipelines :param file_paths: The files to index, for indexing pipelines :param labels: - :param documents: + :param documents: The documents to create question on, for question-generating pipelines :param meta: :param params: Dictionary of parameters to be dispatched to every node. Example: {"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}} diff --git a/haystack/schema.py b/haystack/schema.py index 555da3d198..347c9b8048 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -9,6 +9,7 @@ import logging import io from functools import wraps +from haystack.utils import InMemoryLogger class Document: @@ -254,21 +255,6 @@ def __str__(self): return str(self.to_dict()) -class InMemoryLogger(io.TextIOBase): - """ - Implementation of a logger that keeps track - of the log lines in a list called `logs`, - from where they can be accessed freely. - """ - - def __init__(self, *args): - io.TextIOBase.__init__(self, *args) - self.logs = [] - - def write(self, x): - self.logs.append(x) - - def record_debug_logs(func: Callable, node_name: str, logs: bool) -> Callable: """ Captures the debug logs of the wrapped function and diff --git a/haystack/utils.py b/haystack/utils.py index 60dca8b1c7..9a31d4fc5e 100644 --- a/haystack/utils.py +++ b/haystack/utils.py @@ -14,6 +14,21 @@ logger = logging.getLogger(__name__) +class InMemoryLogger(io.TextIOBase): + """ + Implementation of a logger that keeps track + of the log lines in a list called `logs`, + from where they can be accessed freely. + """ + + def __init__(self, *args): + io.TextIOBase.__init__(self, *args) + self.logs = [] + + def write(self, x): + self.logs.append(x) + + def launch_es(sleep=15): # Start an Elasticsearch server via Docker From 443146b77558356f6e0755740e00513fc36bd160 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 7 Oct 2021 16:29:29 +0000 Subject: [PATCH 31/36] Add latest docstring and tutorial changes --- docs/_src/api/api/pipelines.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md index d1c7c3204f..b909292eeb 100644 --- a/docs/_src/api/api/pipelines.md +++ b/docs/_src/api/api/pipelines.md @@ -121,6 +121,31 @@ Set the component for a node in the Pipeline. - `name`: The name of the node. - `component`: The component object to be set at the node. + +#### run + +```python + | run(query: Optional[str] = None, file_paths: Optional[List[str]] = None, labels: Optional[MultiLabel] = None, documents: Optional[List[Document]] = None, meta: Optional[dict] = None, params: Optional[dict] = None, debug: Optional[bool] = None, debug_logs: Optional[bool] = None) +``` + +Runs the pipeline, one note at a time. + +**Arguments**: + +- `query`: The question, for question answering pipelines +- `file_paths`: The files to index, for indexing pipelines +- `labels`: +- `documents`: The documents to create question on, for question-generating pipelines +- `meta`: +- `params`: Dictionary of parameters to be dispatched to every node. Example: + {"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}} +- `debug`: Whether the pipeline should instruct nodes to collect debug information + about their execution. By default these include the input parameters + they received, the output they generated, and eventual logs (of any severity) + emitted. +- `debug_logs`: Whether all the logs of the node should be printed in the console, + regardless of their severity and of the existing logger's settings. + #### get\_nodes\_by\_class From 9308faa606a9e6011edb5bdec372f778210be410 Mon Sep 17 00:00:00 2001 From: ZanSara Date: Thu, 7 Oct 2021 18:29:58 +0200 Subject: [PATCH 32/36] Add io import to utils.py --- haystack/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/haystack/utils.py b/haystack/utils.py index 9a31d4fc5e..a654e295c8 100644 --- a/haystack/utils.py +++ b/haystack/utils.py @@ -1,3 +1,4 @@ +import io import json from collections import defaultdict from itertools import islice From a5ca35b903c1048782210092d0a0e56b9bf6fcd6 Mon Sep 17 00:00:00 2001 From: Malte Pietsch Date: Thu, 7 Oct 2021 18:49:31 +0200 Subject: [PATCH 33/36] Update docstring --- haystack/pipeline.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 7618dddef7..663c001193 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -264,19 +264,22 @@ def run( # type: ignore debug_logs: Optional[bool] = None ): """ - Runs the pipeline, one note at a time. + Runs the pipeline, one node at a time. - :param query: The question, for question answering pipelines - :param file_paths: The files to index, for indexing pipelines + :param query: The search query (for query pipelines only) + :param file_paths: The files to index (for indexing pipelines only) :param labels: :param documents: The documents to create question on, for question-generating pipelines :param meta: - :param params: Dictionary of parameters to be dispatched to every node. Example: + :param params: Dictionary of parameters to be dispatched to the nodes. + If you want to pass a param to all nodes, you can just use: {"top_k":10} + If you want to pass it to targeted nodes, you can do: {"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}} :param debug: Whether the pipeline should instruct nodes to collect debug information about their execution. By default these include the input parameters they received, the output they generated, and eventual logs (of any severity) - emitted. + emitted. All debug information can then be found in the dict returned + by this method under the key "_debug" :param debug_logs: Whether all the logs of the node should be printed in the console, regardless of their severity and of the existing logger's settings. """ From c1c7d6b0fc6bea069286ded92bf89d925ab49ffb Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 7 Oct 2021 16:50:19 +0000 Subject: [PATCH 34/36] Add latest docstring and tutorial changes --- docs/_src/api/api/pipelines.md | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md index b909292eeb..3d98a072d8 100644 --- a/docs/_src/api/api/pipelines.md +++ b/docs/_src/api/api/pipelines.md @@ -128,21 +128,24 @@ Set the component for a node in the Pipeline. | run(query: Optional[str] = None, file_paths: Optional[List[str]] = None, labels: Optional[MultiLabel] = None, documents: Optional[List[Document]] = None, meta: Optional[dict] = None, params: Optional[dict] = None, debug: Optional[bool] = None, debug_logs: Optional[bool] = None) ``` -Runs the pipeline, one note at a time. +Runs the pipeline, one node at a time. **Arguments**: -- `query`: The question, for question answering pipelines -- `file_paths`: The files to index, for indexing pipelines +- `query`: The search query (for query pipelines only) +- `file_paths`: The files to index (for indexing pipelines only) - `labels`: - `documents`: The documents to create question on, for question-generating pipelines - `meta`: -- `params`: Dictionary of parameters to be dispatched to every node. Example: +- `params`: Dictionary of parameters to be dispatched to the nodes. + If you want to pass a param to all nodes, you can just use: {"top_k":10} + If you want to pass it to targeted nodes, you can do: {"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}} - `debug`: Whether the pipeline should instruct nodes to collect debug information about their execution. By default these include the input parameters they received, the output they generated, and eventual logs (of any severity) - emitted. + emitted. All debug information can then be found in the dict returned + by this method under the key "_debug" - `debug_logs`: Whether all the logs of the node should be printed in the console, regardless of their severity and of the existing logger's settings. From d893cf70eb128670e18f1476b4f50ca96cb943bc Mon Sep 17 00:00:00 2001 From: ZanSara Date: Thu, 7 Oct 2021 19:11:28 +0200 Subject: [PATCH 35/36] Revert "Move InMemoryLogger into utils.py" This reverts commit e6503a991df66ef72a5ffd5c0600a3445d7e6a81. --- haystack/pipeline.py | 2 +- haystack/schema.py | 16 +++++++++++++++- haystack/utils.py | 15 --------------- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 7618dddef7..1d9f9d3906 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -269,7 +269,7 @@ def run( # type: ignore :param query: The question, for question answering pipelines :param file_paths: The files to index, for indexing pipelines :param labels: - :param documents: The documents to create question on, for question-generating pipelines + :param documents: :param meta: :param params: Dictionary of parameters to be dispatched to every node. Example: {"Retriever": {"top_k": 10}, "Reader": {"top_k": 3, "debug": True}} diff --git a/haystack/schema.py b/haystack/schema.py index 347c9b8048..555da3d198 100644 --- a/haystack/schema.py +++ b/haystack/schema.py @@ -9,7 +9,6 @@ import logging import io from functools import wraps -from haystack.utils import InMemoryLogger class Document: @@ -255,6 +254,21 @@ def __str__(self): return str(self.to_dict()) +class InMemoryLogger(io.TextIOBase): + """ + Implementation of a logger that keeps track + of the log lines in a list called `logs`, + from where they can be accessed freely. + """ + + def __init__(self, *args): + io.TextIOBase.__init__(self, *args) + self.logs = [] + + def write(self, x): + self.logs.append(x) + + def record_debug_logs(func: Callable, node_name: str, logs: bool) -> Callable: """ Captures the debug logs of the wrapped function and diff --git a/haystack/utils.py b/haystack/utils.py index a654e295c8..5362662025 100644 --- a/haystack/utils.py +++ b/haystack/utils.py @@ -15,21 +15,6 @@ logger = logging.getLogger(__name__) -class InMemoryLogger(io.TextIOBase): - """ - Implementation of a logger that keeps track - of the log lines in a list called `logs`, - from where they can be accessed freely. - """ - - def __init__(self, *args): - io.TextIOBase.__init__(self, *args) - self.logs = [] - - def write(self, x): - self.logs.append(x) - - def launch_es(sleep=15): # Start an Elasticsearch server via Docker From 66c771bf9d7836557630709259a93f4befb2d419 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 7 Oct 2021 17:15:22 +0000 Subject: [PATCH 36/36] Add latest docstring and tutorial changes --- docs/_src/api/api/pipelines.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md index 3d98a072d8..747e54cf94 100644 --- a/docs/_src/api/api/pipelines.md +++ b/docs/_src/api/api/pipelines.md @@ -135,7 +135,7 @@ Runs the pipeline, one node at a time. - `query`: The search query (for query pipelines only) - `file_paths`: The files to index (for indexing pipelines only) - `labels`: -- `documents`: The documents to create question on, for question-generating pipelines +- `documents`: - `meta`: - `params`: Dictionary of parameters to be dispatched to the nodes. If you want to pass a param to all nodes, you can just use: {"top_k":10}