Skip to content

Commit

Permalink
Move Scrapy-related code from Actor template to SDK (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdusek committed Nov 14, 2023
1 parent e238be4 commit 24f2d8c
Show file tree
Hide file tree
Showing 22 changed files with 465 additions and 36 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ filename =
per-file-ignores =
scripts/*: D
tests/*: D
**/__init__.py: F401

# Google docstring convention + D204 & D401
docstring-convention = all
Expand Down
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
Changelog
=========

[1.2.1](../../releases/tag/v1.2.1) - Unreleased
[1.3.0](../../releases/tag/v1.3.0) - Unreleased
-----------------------------------------------

...
### Added

- Added `scrapy` extra

[1.2.0](../../releases/tag/v1.2.0) - 2023-10-23
-----------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ clean:

install-dev:
python -m pip install --upgrade pip
pip install --no-cache-dir -e ".[dev]"
pip install --no-cache-dir -e ".[dev,scrapy]"
pre-commit install

build:
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@ event handling.
If you just need to access the [Apify API](https://docs.apify.com/api/v2) from your Python applications,
check out the [Apify Client for Python](https://docs.apify.com/api/client/python) instead.

## Installation

The Apify SDK for Python is available on PyPI as the `apify` package.
For default installation, using Pip, run the following:

```bash
pip install apify
```

For users interested in integrating Apify with Scrapy, we provide a package extra called `scrapy`.
To install Apify with the `scrapy` extra, use the following command:

```bash
pip install apify[scrapy]
```

## Documentation

For usage instructions, check the documentation on [Apify Docs](https://docs.apify.com/sdk/python/).
Expand Down
6 changes: 6 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ warn_redundant_casts = True
warn_return_any = True
warn_unreachable = True
warn_unused_ignores = True

[mypy-scrapy.*]
ignore_missing_imports = True

[mypy-sortedcollections.*]
ignore_missing_imports = True
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "apify"
version = "1.2.1"
version = "1.3.0"
description = "Apify SDK for Python"
readme = "README.md"
license = {text = "Apache Software License"}
Expand Down Expand Up @@ -72,6 +72,9 @@ dev = [
"types-colorama ~= 0.4.15.11",
"types-psutil ~= 5.9.5.12",
]
scrapy = [
"scrapy ~= 2.11.0",
]

[project.urls]
"Homepage" = "https://docs.apify.com/sdk/python/"
Expand Down
6 changes: 3 additions & 3 deletions src/apify/_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def public_encrypt(value: str, *, public_key: rsa.RSAPublicKey) -> dict:
Returns:
disc: Encrypted password and value.
"""
key_bytes = _crypto_random_object_id(ENCRYPTION_KEY_LENGTH).encode('utf-8')
initialized_vector_bytes = _crypto_random_object_id(ENCRYPTION_IV_LENGTH).encode('utf-8')
key_bytes = crypto_random_object_id(ENCRYPTION_KEY_LENGTH).encode('utf-8')
initialized_vector_bytes = crypto_random_object_id(ENCRYPTION_IV_LENGTH).encode('utf-8')
value_bytes = value.encode('utf-8')

password_bytes = key_bytes + initialized_vector_bytes
Expand Down Expand Up @@ -122,7 +122,7 @@ def _load_public_key(public_key_file_base64: str) -> rsa.RSAPublicKey:
return public_key


def _crypto_random_object_id(length: int = 17) -> str:
def crypto_random_object_id(length: int = 17) -> str:
"""Python reimplementation of cryptoRandomObjectId from `@apify/utilities`."""
chars = 'abcdefghijklmnopqrstuvwxyzABCEDFGHIJKLMNOPQRSTUVWXYZ0123456789'
return ''.join(secrets.choice(chars) for _ in range(length))
Expand Down
4 changes: 2 additions & 2 deletions src/apify/_memory_storage/resource_clients/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from apify_shared.types import JSONSerializable
from apify_shared.utils import ignore_docs

from ..._crypto import _crypto_random_object_id
from ..._crypto import crypto_random_object_id
from ..._utils import _force_rename, _raise_on_duplicate_storage, _raise_on_non_existing_storage
from ...consts import _StorageTypes
from ..file_storage_utils import _update_dataset_items, _update_metadata
Expand Down Expand Up @@ -52,7 +52,7 @@ def __init__(
name: Optional[str] = None,
) -> None:
"""Initialize the DatasetClient."""
self._id = id or _crypto_random_object_id()
self._id = id or crypto_random_object_id()
self._resource_directory = os.path.join(base_storage_directory, name or self._id)
self._memory_storage_client = memory_storage_client
self._name = name
Expand Down
4 changes: 2 additions & 2 deletions src/apify/_memory_storage/resource_clients/key_value_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from apify_shared.utils import ignore_docs, is_file_or_bytes, json_dumps

from ..._crypto import _crypto_random_object_id
from ..._crypto import crypto_random_object_id
from ..._utils import (
_force_remove,
_force_rename,
Expand Down Expand Up @@ -73,7 +73,7 @@ def __init__(
name: Optional[str] = None,
) -> None:
"""Initialize the KeyValueStoreClient."""
self._id = id or _crypto_random_object_id()
self._id = id or crypto_random_object_id()
self._resource_directory = os.path.join(base_storage_directory, name or self._id)
self._memory_storage_client = memory_storage_client
self._name = name
Expand Down
6 changes: 3 additions & 3 deletions src/apify/_memory_storage/resource_clients/request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from typing import TYPE_CHECKING, Dict, List, Optional

import aioshutil
from sortedcollections import ValueSortedDict # type: ignore
from sortedcollections import ValueSortedDict

from apify_shared.utils import filter_out_none_values_recursively, ignore_docs, json_dumps

from ..._crypto import _crypto_random_object_id
from ..._crypto import crypto_random_object_id
from ..._utils import _force_rename, _raise_on_duplicate_storage, _raise_on_non_existing_storage, _unique_key_to_request_id
from ...consts import _StorageTypes
from ..file_storage_utils import _delete_request, _update_metadata, _update_request_queue_item
Expand Down Expand Up @@ -46,7 +46,7 @@ def __init__(
name: Optional[str] = None,
) -> None:
"""Initialize the RequestQueueClient."""
self._id = id or _crypto_random_object_id()
self._id = id or crypto_random_object_id()
self._resource_directory = os.path.join(base_storage_directory, name or self._id)
self._memory_storage_client = memory_storage_client
self._name = name
Expand Down
4 changes: 4 additions & 0 deletions src/apify/scrapy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .middlewares import ApifyRetryMiddleware
from .pipelines import ActorDatasetPushPipeline
from .scheduler import ApifyScheduler
from .utils import get_running_event_loop_id, open_queue_with_custom_client, to_apify_request, to_scrapy_request
100 changes: 100 additions & 0 deletions src/apify/scrapy/middlewares.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import traceback
from typing import Union

try:
from scrapy import Spider
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.exceptions import IgnoreRequest
from scrapy.http import Request, Response
from scrapy.utils.response import response_status_message
except ImportError as exc:
raise ImportError(
'To use this module, you need to install the "scrapy" extra. Run "pip install apify[scrapy]".',
) from exc

from ..actor import Actor
from ..storages import RequestQueue
from .utils import nested_event_loop, open_queue_with_custom_client, to_apify_request


class ApifyRetryMiddleware(RetryMiddleware):
"""The default Scrapy retry middleware enriched with Apify's Request Queue interaction."""

def __init__(self, *args: list, **kwargs: dict) -> None:
"""Create a new instance."""
super().__init__(*args, **kwargs)
try:
self._rq: RequestQueue = nested_event_loop.run_until_complete(open_queue_with_custom_client())
except BaseException:
traceback.print_exc()

def __del__(self) -> None:
"""Before deleting the instance, close the nested event loop."""
nested_event_loop.stop()
nested_event_loop.close()

def process_response(self, request: Request, response: Response, spider: Spider) -> Union[Request, Response]:
"""Process the response and decide whether the request should be retried.
Args:
request: The request that was sent.
response: The response that was received.
spider: The Spider that sent the request.
Returns:
The response, or a new request if the request should be retried.
"""
# Robots requests are bypassed directly, they don't go through a Scrapy Scheduler, and also through our
# Request Queue. Check the scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware for details.
assert isinstance(request.url, str)
if request.url.endswith('robots.txt'):
return response

try:
returned = nested_event_loop.run_until_complete(self._handle_retry_logic(request, response, spider))
except BaseException:
traceback.print_exc()

return returned

def process_exception(
self,
request: Request,
exception: BaseException,
spider: Spider,
) -> Union[Request, Response, None]:
"""Handle the exception and decide whether the request should be retried."""
Actor.log.debug(f'ApifyRetryMiddleware.process_exception was called (scrapy_request={request})...')
apify_request = to_apify_request(request, spider=spider)

if isinstance(exception, IgnoreRequest):
try:
nested_event_loop.run_until_complete(self._rq.mark_request_as_handled(apify_request))
except BaseException:
traceback.print_exc()
else:
nested_event_loop.run_until_complete(self._rq.reclaim_request(apify_request))

return super().process_exception(request, exception, spider)

async def _handle_retry_logic(
self,
request: Request,
response: Response,
spider: Spider,
) -> Union[Request, Response]:
"""Handle the retry logic of the request."""
Actor.log.debug(f'ApifyRetryMiddleware.handle_retry_logic was called (scrapy_request={request})...')
apify_request = to_apify_request(request, spider=spider)

if request.meta.get('dont_retry', False):
await self._rq.mark_request_as_handled(apify_request)
return response

if response.status in self.retry_http_codes:
await self._rq.reclaim_request(apify_request)
reason = response_status_message(response.status)
return self._retry(request, reason, spider) or response

await self._rq.mark_request_as_handled(apify_request)
return response
24 changes: 24 additions & 0 deletions src/apify/scrapy/pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from itemadapter import ItemAdapter

try:
from scrapy import Item, Spider
except ImportError as exc:
raise ImportError(
'To use this module, you need to install the "scrapy" extra. Run "pip install apify[scrapy]".',
) from exc

from ..actor import Actor


class ActorDatasetPushPipeline:
"""A Scrapy pipeline for pushing items to an Actor's default dataset.
This pipeline is designed to be enabled only when the Scrapy project is run as an Actor.
"""

async def process_item(self, item: Item, spider: Spider) -> Item:
"""Pushes the provided Scrapy item to the Actor's default dataset."""
item_dict = ItemAdapter(item).asdict()
Actor.log.debug(f'Pushing item={item_dict} produced by spider={spider} to the dataset.')
await Actor.push_data(item_dict)
return item
Loading

0 comments on commit 24f2d8c

Please sign in to comment.