Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/crawlee/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ async def push_data(
**kwargs: Unpack[PushDataKwargs],
) -> None:
"""Track a call to the `push_data` context helper."""
from crawlee.storages._dataset import Dataset

await Dataset.check_and_serialize(data)

self.push_data_calls.append(
PushDataFunctionCall(
data=data,
Expand Down
11 changes: 6 additions & 5 deletions src/crawlee/storages/_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,11 @@ async def push_data(self, data: JsonSerializable, **kwargs: Unpack[PushDataKwarg
"""
# Handle singular items
if not isinstance(data, list):
items = await self._check_and_serialize(data)
items = await self.check_and_serialize(data)
return await self._resource_client.push_items(items, **kwargs)

# Handle lists
payloads_generator = (await self._check_and_serialize(item, index) for index, item in enumerate(data))
payloads_generator = (await self.check_and_serialize(item, index) for index, item in enumerate(data))

# Invoke client in series to preserve the order of data
async for items in self._chunk_by_size(payloads_generator):
Expand Down Expand Up @@ -417,7 +417,8 @@ async def iterate_items(
):
yield item

async def _check_and_serialize(self, item: JsonSerializable, index: int | None = None) -> str:
@classmethod
async def check_and_serialize(cls, item: JsonSerializable, index: int | None = None) -> str:
"""Serializes a given item to JSON, checks its serializability and size against a limit.

Args:
Expand All @@ -438,8 +439,8 @@ async def _check_and_serialize(self, item: JsonSerializable, index: int | None =
raise ValueError(f'Data item{s}is not serializable to JSON.') from exc

payload_size = ByteSize(len(payload.encode('utf-8')))
if payload_size > self._EFFECTIVE_LIMIT_SIZE:
raise ValueError(f'Data item{s}is too large (size: {payload_size}, limit: {self._EFFECTIVE_LIMIT_SIZE})')
if payload_size > cls._EFFECTIVE_LIMIT_SIZE:
raise ValueError(f'Data item{s}is too large (size: {payload_size}, limit: {cls._EFFECTIVE_LIMIT_SIZE})')

return payload

Expand Down
12 changes: 12 additions & 0 deletions tests/unit/basic_crawler/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,18 @@ async def handler(context: BasicCrawlingContext) -> None:
assert exported_json_str == expected_json_str


async def test_crawler_push_data_over_limit() -> None:
crawler = BasicCrawler()

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
# Push a roughly 15MB payload - this should be enough to break the 9MB limit
await context.push_data({'hello': 'world' * 3 * 1024 * 1024})

stats = await crawler.run(['http://example.tld/1'])
assert stats.requests_failed == 1


async def test_context_update_kv_store() -> None:
crawler = BasicCrawler()

Expand Down