Skip to content

Commit

Permalink
feat: Adjust LinkContentFetcher run method, use ByteStream (#5972)
Browse files Browse the repository at this point in the history
  • Loading branch information
vblagoje authored Oct 10, 2023
1 parent c102b15 commit 6a50123
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 65 deletions.
126 changes: 92 additions & 34 deletions haystack/preview/components/fetchers/link_content.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import io
import logging
from collections import defaultdict
from datetime import datetime
from typing import Optional, Dict, List, Callable, Any, IO
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional, Tuple

import requests
from requests import Response
from requests.exceptions import HTTPError
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryCallState
from haystack.preview import component, default_from_dict, default_to_dict
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential

from haystack import __version__
from haystack.preview import Document
from haystack.preview import component, default_from_dict, default_to_dict
from haystack.preview.dataclasses import ByteStream

logger = logging.getLogger(__name__)

Expand All @@ -26,26 +25,27 @@
}


def text_content_handler(response: Response) -> Dict[str, str]:
def text_content_handler(response: Response) -> ByteStream:
"""
:param response: Response object from the request.
:return: The extracted text.
"""
return {"text": response.text}
return ByteStream.from_string(response.text)


def binary_content_handler(response: Response) -> Dict[str, IO[bytes]]:
def binary_content_handler(response: Response) -> ByteStream:
"""
:param response: Response object from the request.
:return: The extracted binary file-like object.
"""
return {"blob": io.BytesIO(response.content)}
return ByteStream(data=response.content)


@component
class LinkContentFetcher:
"""
LinkContentFetcher fetches content from a URL link and converts it to a Document object.
LinkContentFetcher is a component for fetching and extracting content from URLs. It supports handling various
content types, retries on failures, and automatic user-agent rotation for failed web requests.
"""

def __init__(
Expand All @@ -56,15 +56,13 @@ def __init__(
timeout: int = 3,
):
"""
Creates a LinkContentFetcher instance.
Initializes a LinkContentFetcher instance.
:param raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs
during content extraction. If False, the error is simply logged and the program continues.
Defaults to False.
:param user_agents: A list of user agents to use when fetching content. Defaults to None, in which case a
default user agent is used.
:param retry_attempts: The number of times to retry fetching content. Defaults to 2.
:param timeout: The timeout in seconds for the request. Defaults to 3.
:param raise_on_failure: If True, raises an exception on failure when fetching a single URL.
For multiple URLs, errors are logged and successful fetches are returned. Default is True.
:param user_agents: A list of user agents for fetching content. If None, a default user agent is used.
:param retry_attempts: Number of retry attempts for fetching content. Default is 2.
:param timeout: Timeout in seconds for the request. Default is 3.
"""
self.raise_on_failure = raise_on_failure
self.user_agents = user_agents or [DEFAULT_USER_AGENT]
Expand All @@ -73,7 +71,7 @@ def __init__(
self.timeout = timeout

# register default content handlers that extract data from the response
self.handlers: Dict[str, Callable[[Response], Dict[str, Any]]] = defaultdict(lambda: text_content_handler)
self.handlers: Dict[str, Callable[[Response], ByteStream]] = defaultdict(lambda: text_content_handler)
self.handlers["text/html"] = text_content_handler
self.handlers["text/plain"] = text_content_handler
self.handlers["application/pdf"] = binary_content_handler
Expand Down Expand Up @@ -116,37 +114,96 @@ def from_dict(cls, data: Dict[str, Any]) -> "LinkContentFetcher":
"""
return default_from_dict(cls, data)

@component.output_types(documents=Optional[Document])
def run(self, url: str):
@component.output_types(streams=List[ByteStream])
def run(self, urls: List[str]):
"""
Fetches content from a URL and converts it to a Document objects. If no content is extracted,
an empty Document object is returned (if raise_on_failure is False).
Fetches content from a list of URLs and returns a list of extracted content streams.
Each content stream is a ByteStream object containing the extracted content as binary data.
The content type of each stream is stored in the metadata of the ByteStream object under
the key "content_type". The URL of the fetched content is stored under the key "url".
:param urls: A list of URLs to fetch content from.
:return: A lists of ByteStream objects representing the extracted content.
:param url: URL to fetch content from.
:param timeout: Timeout in seconds for the request.
:return: List of Document objects or an empty list if no content is extracted.
:raises: If the provided list of URLs contains only a single URL, and `raise_on_failure` is set to True,
an exception will be raised in case of an error during content retrieval. In all other scenarios, any
retrieval errors are logged, and a list of successfully retrieved ByteStream objects is returned.
"""
document_data: Dict[str, Any] = {"metadata": {"url": url, "timestamp": int(datetime.utcnow().timestamp())}}
streams: List[ByteStream] = []
if not urls:
return {"streams": streams}

# don't use multithreading if there's only one URL
if len(urls) == 1:
stream_metadata, stream = self.fetch(urls[0])
stream.metadata.update(stream_metadata)
streams.append(stream)
else:
with ThreadPoolExecutor() as executor:
results = executor.map(self._fetch_with_exception_suppression, urls)

for stream_metadata, stream in results: # type: ignore
if stream_metadata is not None and stream is not None:
stream.metadata.update(stream_metadata)
streams.append(stream)

return {"streams": streams}

def fetch(self, url: str) -> Tuple[Dict[str, str], ByteStream]:
"""
Fetches content from a URL and returns it as a ByteStream.
:param url: The URL to fetch content from.
:return: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
ByteStream metadata contains the URL and the content type of the fetched content.
The content type is a string indicating the type of content fetched (e.g., "text/html", "application/pdf").
The ByteStream object contains the fetched content as binary data.
:raises: If an error occurs during content retrieval and `raise_on_failure` is set to True, this method will
raise an exception. Otherwise, all fetching errors are logged, and an empty ByteStream is returned.
"""
content_type: str = "text/html"
stream: ByteStream = ByteStream(data=b"")
try:
response = self._get_response(url)
content_type = self._get_content_type(response)
document_data["mime_type"] = content_type
handler: Callable = self.handlers[content_type]
document_data.update(handler(response))
return {"document": Document(**document_data)}

stream = handler(response)
except Exception as e:
if self.raise_on_failure:
raise e
logger.debug("Couldn't retrieve content from %s", url)
return {"document": None}
# less verbose log as this is expected to happen often (requests failing, blocked, etc.)
logger.debug("Couldn't retrieve content from %s due to %s", url, str(e))

finally:
self.current_user_agent_idx = 0

return {"content_type": content_type, "url": url}, stream

def _fetch_with_exception_suppression(self, url: str) -> Tuple[Optional[Dict[str, str]], Optional[ByteStream]]:
"""
Fetches content from a URL and returns it as a ByteStream.
If `raise_on_failure` is set to True, this method will wrap the fetch method and catch any exceptions.
Otherwise, it will simply call the fetch method.
:param url: The URL to fetch content from.
:return: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
"""
if self.raise_on_failure:
try:
return self.fetch(url)
except Exception as e:
logger.warning("Error fetching %s: %s", url, str(e))
return {"content_type": "Unknown", "url": url}, None
else:
return self.fetch(url)

def _get_content_type(self, response: Response):
"""
Get the content type of the response.
:param response: The response object.
:return: The content type of the response.
"""
Expand All @@ -157,6 +214,7 @@ def _switch_user_agent(self, retry_state: RetryCallState) -> None:
"""
Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents.
Used by tenacity to retry the requests with a different user agent.
:param retry_state: The retry state (unused, required by tenacity).
"""
self.current_user_agent_idx = (self.current_user_agent_idx + 1) % len(self.user_agents)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
preview:
- |
Adds LinkContentFetcher component to Haystack 2.0. LinkContentFetcher fetches content from a given URL and
converts it into a Document object, which can then be used within the Haystack 2.0 pipeline.
Introduced the LinkContentFetcher in Haystack 2.0. This component fetches content from specified
URLs and converts them into ByteStream objects for further processing in Haystack pipelines.
122 changes: 93 additions & 29 deletions test/preview/components/fetchers/test_link_content_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import io
from unittest.mock import patch, Mock

import pytest
import requests

from haystack.preview.components.fetchers.link_content import (
LinkContentFetcher,
Expand Down Expand Up @@ -99,27 +99,29 @@ def test_from_dict(self):

@pytest.mark.unit
def test_run_text(self):
correct_response = b"Example test response"
with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = Mock(
status_code=200, text="Example test response", headers={"Content-Type": "text/plain"}
)
fetcher = LinkContentFetcher()
document = fetcher.run("https://www.example.com")["document"]
assert document.text == "Example test response"
assert document.metadata["url"] == "https://www.example.com"
assert "timestamp" in document.metadata
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
first_stream = streams[0]
assert first_stream.data == correct_response
assert first_stream.metadata["content_type"] == "text/plain"

@pytest.mark.unit
def test_run_html(self):
correct_response = b"<h1>Example test response</h1>"
with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = Mock(
status_code=200, text="<h1>Example test response</h1>", headers={"Content-Type": "text/html"}
)
fetcher = LinkContentFetcher()
document = fetcher.run("https://www.example.com")["document"]
assert document.text == "<h1>Example test response</h1>"
assert document.metadata["url"] == "https://www.example.com"
assert "timestamp" in document.metadata
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
first_stream = streams[0]
assert first_stream.data == correct_response
assert first_stream.metadata["content_type"] == "text/html"

@pytest.mark.unit
def test_run_binary(self, test_files_path):
Expand All @@ -129,42 +131,104 @@ def test_run_binary(self, test_files_path):
status_code=200, content=file_bytes, headers={"Content-Type": "application/pdf"}
)
fetcher = LinkContentFetcher()
document = fetcher.run("https://www.example.com")["document"]
# casting to list to make the blobs comparable
assert list(document.blob) == list(io.BytesIO(file_bytes))
assert document.metadata["url"] == "https://www.example.com"
assert "timestamp" in document.metadata
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
first_stream = streams[0]
assert first_stream.data == file_bytes
assert first_stream.metadata["content_type"] == "application/pdf"

@pytest.mark.unit
def test_run_bad_status_code(self):
empty_byte_stream = b""
fetcher = LinkContentFetcher(raise_on_failure=False)
mock_response = Mock(status_code=403)
with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = mock_response
document = fetcher.run("https://www.example.com")["document"]
assert document is None
streams = fetcher.run(urls=["https://www.example.com"])["streams"]

# empty byte stream is returned because raise_on_failure is False
assert len(streams) == 1
first_stream = streams[0]
assert first_stream.data == empty_byte_stream
assert first_stream.metadata["content_type"] == "text/html"

@pytest.mark.integration
def test_link_content_fetcher_html(self):
fetcher = LinkContentFetcher()
document = fetcher.run(HTML_URL)["document"]
assert document.mime_type == "text/html"
assert "Introduction to Haystack" in document.text
assert document.metadata["url"] == HTML_URL
streams = fetcher.run([HTML_URL])["streams"]
first_stream = streams[0]
assert "Haystack" in first_stream.data.decode("utf-8")
assert first_stream.metadata["content_type"] == "text/html"
assert "url" in first_stream.metadata and first_stream.metadata["url"] == HTML_URL

@pytest.mark.integration
def test_link_content_fetcher_text(self):
fetcher = LinkContentFetcher()
document = fetcher.run(TEXT_URL)["document"]
assert document.mime_type == "text/plain"
assert "Haystack" in document.text
assert document.metadata["url"] == TEXT_URL
streams = fetcher.run([TEXT_URL])["streams"]
first_stream = streams[0]
assert "Haystack" in first_stream.data.decode("utf-8")
assert first_stream.metadata["content_type"] == "text/plain"
assert "url" in first_stream.metadata and first_stream.metadata["url"] == TEXT_URL

@pytest.mark.integration
def test_link_content_fetcher_pdf(self):
fetcher = LinkContentFetcher()
document = fetcher.run(PDF_URL)["document"]
assert document.mime_type == "application/octet-stream" # FIXME Should be "application/pdf"?
assert document.text is None
assert document.blob is not None
assert document.metadata["url"] == PDF_URL
streams = fetcher.run([PDF_URL])["streams"]
assert len(streams) == 1
first_stream = streams[0]
assert first_stream.metadata["content_type"] in ("application/octet-stream", "application/pdf")
assert "url" in first_stream.metadata and first_stream.metadata["url"] == PDF_URL

@pytest.mark.integration
def test_link_content_fetcher_multiple_different_content_types(self):
"""
This test is to ensure that the fetcher can handle a list of URLs that contain different content types.
"""
fetcher = LinkContentFetcher()
streams = fetcher.run([PDF_URL, HTML_URL])["streams"]
assert len(streams) == 2
for stream in streams:
assert stream.metadata["content_type"] in ("text/html", "application/pdf", "application/octet-stream")
if stream.metadata["content_type"] == "text/html":
assert "Haystack" in stream.data.decode("utf-8")
elif stream.metadata["content_type"] == "application/pdf":
assert len(stream.data) > 0

@pytest.mark.integration
def test_link_content_fetcher_multiple_html_streams(self):
"""
This test is to ensure that the fetcher can handle a list of URLs that contain different content types,
and that we have two html streams.
"""

fetcher = LinkContentFetcher()
streams = fetcher.run([PDF_URL, HTML_URL, "https://google.com"])["streams"]
assert len(streams) == 3
for stream in streams:
assert stream.metadata["content_type"] in ("text/html", "application/pdf", "application/octet-stream")
if stream.metadata["content_type"] == "text/html":
assert "Haystack" in stream.data.decode("utf-8") or "Google" in stream.data.decode("utf-8")
elif stream.metadata["content_type"] == "application/pdf":
assert len(stream.data) > 0

@pytest.mark.integration
def test_mix_of_good_and_failed_requests(self):
"""
This test is to ensure that the fetcher can handle a list of URLs that contain URLs that fail to be fetched.
In such a case, the fetcher should return the content of the URLs that were successfully fetched and not raise
an exception.
"""
fetcher = LinkContentFetcher()
result = fetcher.run(["https://non_existent_website_dot.com/", "https://www.google.com/"])
assert len(result["streams"]) == 1
first_stream = result["streams"][0]
assert first_stream.metadata["content_type"] == "text/html"

@pytest.mark.integration
def test_bad_request_exception_raised(self):
"""
This test is to ensure that the fetcher raises an exception when a single bad request is made and it is configured to
do so.
"""
fetcher = LinkContentFetcher()
with pytest.raises(requests.exceptions.ConnectionError):
fetcher.run(["https://non_existent_website_dot.com/"])

0 comments on commit 6a50123

Please sign in to comment.