Skip to content

Commit

Permalink
[SDESK-7156] Allow FeedingServices to provide request kwards for asso…
Browse files Browse the repository at this point in the history
…ciation downloading (superdesk#2555)

* [SDESK-7156] Allow FeedingServices to provide request kwards for association downloading
This is used by AP Media API to provide x-api-key header, among others

* fix: don't duplicate timeout arg to requests.get

* Use request sessions and feeding_services to download media

* Add comment to new config

* Move get_request_kwargs from FeedingService to HTTPFeedingServiceBase

* fix tests

* Create util function to download from service or directly

* fix circular imports
  • Loading branch information
MarkLark86 committed Apr 17, 2024
1 parent bd502b9 commit 39ec18c
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 80 deletions.
6 changes: 6 additions & 0 deletions superdesk/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1094,3 +1094,9 @@ def local_to_utc_hour(hour):
#: .. versionadded:: 2.6
#:
REBUILD_ELASTIC_ON_INIT_DATA_ERROR = strtobool(env("REBUILD_ELASTIC_ON_INIT_DATA_ERROR", "false"))

#: If disabled, it will disable SSL verification for AP Media ingest feeds
#:
#: .. versionadded:: 2.6.8
#:
AP_MEDIA_API_VERIFY_SSL = strtobool(env("AP_MEDIA_API_VERIFY_SSL", "true"))
6 changes: 3 additions & 3 deletions superdesk/io/commands/update_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ def ingest_item(item, provider, feeding_service, rule_set=None, routing_scheme=N
baseImageRend = rend.get("baseImage") or next(iter(rend.values()))
if baseImageRend and not baseImageRend.get("media"): # if there is media should be processed already
href = feeding_service.prepare_href(baseImageRend["href"], rend.get("mimetype"))
update_renditions(item, href, old_item)
update_renditions(item, href, old_item, feeding_service=feeding_service)

# if the item has associated media
for key, assoc in item.get("associations", {}).items():
Expand All @@ -639,7 +639,7 @@ def ingest_item(item, provider, feeding_service, rule_set=None, routing_scheme=N
if _is_new_version(assoc, ingested) and assoc.get("renditions"): # new version
logger.info("new assoc version - re-transfer renditions for %s", assoc_name)
try:
transfer_renditions(assoc["renditions"])
transfer_renditions(assoc["renditions"], feeding_service=feeding_service)
except SuperdeskApiError:
logger.exception(
"failed to update associated item renditions",
Expand All @@ -655,7 +655,7 @@ def ingest_item(item, provider, feeding_service, rule_set=None, routing_scheme=N
if assoc.get("renditions") and has_system_renditions(assoc): # all set, just download
logger.info("new association with system renditions - transfer %s", assoc_name)
try:
transfer_renditions(assoc["renditions"])
transfer_renditions(assoc["renditions"], feeding_service=feeding_service)
except SuperdeskApiError:
logger.exception(
"failed to download renditions",
Expand Down
18 changes: 3 additions & 15 deletions superdesk/io/feed_parsers/ap_media.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def parse(self, s_json, provider=None):

if in_item.get("type") == "picture":
if in_item.get("renditions"):
self._parse_renditions(in_item["renditions"], item, provider)
self._parse_renditions(in_item["renditions"], item)

if in_item.get("description_caption"):
item["description_text"] = in_item.get("description_caption")
Expand Down Expand Up @@ -277,13 +277,13 @@ def _parse_associations(self, associations, item, provider=None):
for key, raw in associations.items():
item["associations"]["{}--{}".format(related_id, key)] = self.parse(raw, provider)

def _parse_renditions(self, renditions, item, provider=None):
def _parse_renditions(self, renditions, item):
try:
item["renditions"] = {}
for dest, src in self.RENDITIONS_MAPPING.items():
rend = renditions[src]
item["renditions"][dest] = {
"href": with_apikey(rend["href"], provider),
"href": rend["href"],
"mimetype": rend["mimetype"],
"width": rend["width"],
"height": rend["height"],
Expand All @@ -292,16 +292,4 @@ def _parse_renditions(self, renditions, item, provider=None):
pass


def with_apikey(href, provider):
if "apikey" in href:
return href
try:
key = provider["config"]["apikey"]
separator = "?" if "?" not in href else "&"
return f"{href}{separator}apikey={key}"
except (KeyError, TypeError):
pass
return href


register_feed_parser(APMediaFeedParser.NAME, APMediaFeedParser())
60 changes: 32 additions & 28 deletions superdesk/io/feeding_services/ap_media.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@
# at https://www.sourcefabric.org/superdesk/license


from typing import Dict, Any
import json
import requests
import superdesk
import logging
from datetime import timedelta, datetime

from superdesk.io.feed_parsers.ap_media import with_apikey
from lxml import etree
from flask import current_app as app

import superdesk
from superdesk.io.registry import register_feeding_service
from superdesk.io.feeding_services.http_base_service import HTTPFeedingServiceBase
from superdesk.errors import IngestApiError
from superdesk.io.feed_parsers import nitf
from lxml import etree
from superdesk.utc import utcnow
from datetime import timedelta, datetime


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -82,6 +85,7 @@ class APMediaFeedingService(HTTPFeedingServiceBase):
HTTP_TIMEOUT = 40

def config_test(self, provider=None):
self.provider = provider
self._get_products(provider)
original = superdesk.get_resource_service("ingest_providers").find_one(req=None, _id=provider.get("_id"))
# If there has been a change in the required products then reset the next link
Expand All @@ -98,12 +102,9 @@ def _get_products(self, provider):
:param provider:
:return:
"""
api_key = provider.get("config", {}).get("apikey")
r = requests.get(
provider.get("config", {}).get("products_url") + "?apikey={}".format(api_key),
timeout=self.HTTP_TIMEOUT,
verify=False,
allow_redirects=True,
r = self.session.get(
provider.get("config", {}).get("products_url"),
**self.get_request_kwargs(),
)
r.raise_for_status()
productList = []
Expand All @@ -113,24 +114,16 @@ def _get_products(self, provider):
productList.append("{}".format(entitlement.get("id")))
provider["config"]["availableProducts"] = ",".join(productList)

def prepare_href(self, href, mimetype=None):
return with_apikey(href, self.provider)

def _update(self, provider, update):
self.HTTP_URL = provider.get("config", {}).get("api_url", "")
self.provider = provider

# Set the apikey parameter we're going to use it on all calls
params = dict()
params["apikey"] = provider.get("config", {}).get("apikey")

# Use the next link if one is available in the config
if provider.get("config", {}).get("next_link"):
r = self.get_url(
url=provider.get("config", {}).get("next_link"), params=params, verify=False, allow_redirects=True
)
r = self.get_url(url=provider.get("config", {}).get("next_link"))
r.raise_for_status()
else:
params = dict()
id_list = provider.get("config", {}).get("productList", "").strip()
recovery_time = provider.get("config", {}).get("recoverytime", "1")
recovery_time = recovery_time.strip() if recovery_time else ""
Expand All @@ -149,7 +142,7 @@ def _update(self, provider, update):
params["versions"] = "all"

logger.info("AP Media Start/Recovery time: {} params {}".format(recovery_time, params))
r = self.get_url(params=params, verify=False, allow_redirects=True)
r = self.get_url(params=params)
r.raise_for_status()
try:
response = json.loads(r.text)
Expand All @@ -172,7 +165,7 @@ def _update(self, provider, update):
item.get("item", {}).get("headline"), item.get("item", {}).get("uri")
)
)
r = self.api_get(item.get("item", {}).get("uri"), provider)
r = self.api_get(item.get("item", {}).get("uri"))
complete_item = json.loads(r.text)

# Get the nitf rendition of the item
Expand All @@ -181,7 +174,7 @@ def _update(self, provider, update):
)
if nitf_ref:
logger.info("Get AP nitf : {}".format(nitf_ref))
r = self.api_get(nitf_ref, provider)
r = self.api_get(nitf_ref)
root_elt = etree.fromstring(r.content)

# If the default namespace definition is the nitf namespace then remove it
Expand All @@ -203,7 +196,7 @@ def _update(self, provider, update):
for key, assoc in associations.items():
logger.info('Get AP association "%s"', assoc.get("headline"))
try:
related_json = self.api_get(assoc["uri"], provider).json()
related_json = self.api_get(assoc["uri"]).json()
complete_item["associations"][key] = related_json
except IngestApiError:
logger.warning("Could not fetch AP association", extra=assoc)
Expand All @@ -222,12 +215,23 @@ def _update(self, provider, update):

return [parsed_items]

def api_get(self, url, provider):
resp = self.get_url(
url=url, params={"apikey": provider["config"]["apikey"]}, verify=False, allow_redirects=True
)
def api_get(self, url):
resp = self.get_url(url=url)
resp.raise_for_status()
return resp

def get_request_kwargs(self) -> Dict[str, Any]:
request_kwargs = dict(
timeout=self.HTTP_TIMEOUT,
verify=app.config.get("AP_MEDIA_API_VERIFY_SSL", True),
allow_redirects=True,
)
try:
request_kwargs["headers"] = {"x-api-key": self.provider["config"]["apikey"]}
except (KeyError, TypeError):
pass

return request_kwargs


register_feeding_service(APMediaFeedingService)
21 changes: 19 additions & 2 deletions superdesk/io/feeding_services/http_base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

from typing import List, Dict, Optional, Union
from typing import List, Dict, Optional, Tuple, Any
from io import BytesIO
import traceback
import requests
from superdesk.errors import IngestApiError, SuperdeskIngestError
from superdesk.io.feeding_services import FeedingService
from superdesk.media.media_operations import download_file_from_url


class HTTPFeedingServiceBase(FeedingService):
Expand Down Expand Up @@ -111,6 +113,7 @@ class HTTPFeedingServiceBase(FeedingService):
def __init__(self):
super().__init__()
self.token = None
self.session = requests.Session()

@property
def auth_info(self):
Expand Down Expand Up @@ -145,6 +148,9 @@ def validate_config(self):
if url and not url.strip().startswith("http"):
raise SuperdeskIngestError.notConfiguredError(Exception("URL must be a valid HTTP link."))

def get_request_kwargs(self) -> Dict[str, Any]:
return {}

def get_url(self, url=None, **kwargs):
"""Do an HTTP Get on URL
Expand Down Expand Up @@ -184,8 +190,13 @@ def get_url(self, url=None, **kwargs):
params.update(self.HTTP_DEFAULT_PARAMETERS)
kwargs["params"] = params

# Let the provided ``kwargs`` override the feeding service's ``kwargs``
request_kwargs = self.get_request_kwargs()
request_kwargs.update(kwargs)
request_kwargs.setdefault("timeout", self.HTTP_TIMEOUT)

try:
response = requests.get(url, timeout=self.HTTP_TIMEOUT, **kwargs)
response = self.session.get(url, **request_kwargs)
except requests.exceptions.Timeout as exception:
raise IngestApiError.apiTimeoutError(exception, self.provider)
except requests.exceptions.ConnectionError as exception:
Expand All @@ -207,6 +218,12 @@ def get_url(self, url=None, **kwargs):

return response

def download_file(self, url: str, **kwargs: Dict[str, Any]) -> Tuple[BytesIO, str, str]:
request_kwargs = self.get_request_kwargs()
request_kwargs.update(kwargs)
request_kwargs.setdefault("timeout", self.HTTP_TIMEOUT)
return download_file_from_url(url, request_kwargs, self.session)

def update(self, provider, update):
self.provider = provider
self.validate_config()
Expand Down
20 changes: 16 additions & 4 deletions superdesk/media/media_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license


from typing import Dict, Any, Optional, Tuple
import arrow
import magic
import base64
Expand All @@ -28,6 +28,7 @@
from superdesk.errors import SuperdeskApiError
from flask import current_app as app
from mimetypes import guess_extension
from superdesk import __version__ as superdesk_version

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,21 +56,32 @@ def fix_content_type(content_type, content):
return str(content_type)


def download_file_from_url(url, request_kwargs=None):
def download_file_from_url(
url: str, request_kwargs: Optional[Dict[str, Any]] = None, session: Optional[requests.Session] = None
) -> Tuple[BytesIO, str, str]:
"""Download file from given url.
In case url is relative it will prefix it with current host.
:param url: file url
:param request_kwargs: Additional keyword arguments to pass to requests.Session.request
:param session: requests.Session instance (one will be created if not supplied)
"""

if not request_kwargs:
request_kwargs = {}

request_kwargs.setdefault("timeout", (5, 25))
request_kwargs.setdefault("headers", {})
request_kwargs["headers"]["User-Agent"] = f"Superdesk-{superdesk_version}"

if session is None:
session = requests.Session()

try:
rv = requests.get(url, headers={"User-Agent": "Superdesk-1.0"}, timeout=(5, 25), **request_kwargs)
rv = session.get(url, **request_kwargs)
except requests.exceptions.MissingSchema: # any route will do here, we only need host
rv = requests.get(urljoin(url_for("static", filename="x", _external=True), url), timeout=15, **request_kwargs)
rv = session.get(urljoin(url_for("static", filename="x", _external=True), url), **request_kwargs)
if rv.status_code not in (200, 201):
raise SuperdeskApiError.internalError("Failed to retrieve file from URL: %s" % url)
content = BytesIO(rv.content)
Expand Down

0 comments on commit 39ec18c

Please sign in to comment.