Conversation
packages/memory-storage/src/utils.ts
Outdated
| let currentPage = await firstPagePromise; | ||
| yield* currentPage.items; | ||
|
|
||
| while (currentPage.isTruncated && currentPage.nextExclusiveStartKey) { |
There was a problem hiding this comment.
isTruncated is true until the very end of the KVS.
This has consequences - e.g. the following snippet will always return 10 items (discarding the limit: 2 param).
const storage = new MemoryStorage().keyValueStore('test');
for (let i = 0; i < 10; i++) {
await storage.setRecord({ key: `key-${i}`, value: `value-${i}` });
}
for await (const item of storage.listKeys({ limit: 2 })) {
console.log(item);
}edit: note that apify-client respects the limit param, only printing two items in the example above.
There was a problem hiding this comment.
this is great catch, I add test for this scenario and fix in 828f5cc
| * | ||
| * @param options Options for the iteration. | ||
| */ | ||
| async *keys(options: KeyValueStoreIteratorOptions = {}): AsyncGenerator<string, void, undefined> { |
There was a problem hiding this comment.
Should we make all the new methods also awaitable? So e.g. this works?
const keys = await kvs.keys();
// keys == ['a', 'b', 'c', ...];Array.fromAsync is a fairly recent addition to Node and might not be the best DX. On the other hand, the AsyncIterable Promise might also be an unexpected twist for many.
wdyt? :)
There was a problem hiding this comment.
As Array.fromAsync is a thing I would follow the Map and Set native approach where this functions always return iterator 🤔 But I don't have a problem with AsyncIterable Promise way
There was a problem hiding this comment.
I also don't have a horse in this race, let's hear other's ideas then 👍
One note is that Set and Map methods all return synchronous Iterators, so you can do spreads ([...Set().keys]), map, reduce etc. - users won't even notice they are not using native Arrays. AsyncIterators are much understandably more limited, so the DX might suffer.
There was a problem hiding this comment.
I'd probably support both AsyncIterable and Promise so that we provide a more consistent API. No strong opinion though.
There was a problem hiding this comment.
@B4nan any chance you have strong opinion on this?
There was a problem hiding this comment.
speed is one part of issue, other is users ends with results in memory 🤔 Frankly I can't find problem I would solve with this solution (await entries()).
There was a problem hiding this comment.
users end with results in memory
I think this is the expected outcome if they call KVS.entries(), and e.g. hitting the potential memory limit is understandable for the end-user.
Ofc I also like the iterative approach better (and we still should educate the user through the docs), but I would still like to have the Promise option, just to have the API predictable among the storages.
There was a problem hiding this comment.
Yeah, I can imagine a legit use case for the Promise option, too - such as loading a bunch of small JSON files and doing some aggregations on them.
There was a problem hiding this comment.
ok lets do it if it is valuable for us, with my current context, I can't evaluate this.
|
Sorry, I dropped the main comment for the review 😅 I'm mostly pro-merging here (good job!), please treat most of the comments above as discussion points rather than directly actionable commands. Thanks! |
Co-authored-by: Jindřich Bär <jindrichbar@gmail.com>
…tors-in-crawleememory-storage' into 3338-implement-the-missing-iterators-in-crawleememory-storage
janbuchar
left a comment
There was a problem hiding this comment.
Looks mostly fine 🙂 The PR title feels a bit off - the changes are not limited to memory-storage.
| * | ||
| * @param options Options for the iteration. | ||
| */ | ||
| async *values<T = unknown>(options: KeyValueStoreIteratorOptions = {}): AsyncGenerator<T, void, undefined> { |
There was a problem hiding this comment.
The T type parameter is just trust-me-bro-typing. I believe we should improve this as a part of #3082
| * | ||
| * @param options Options for the iteration. | ||
| */ | ||
| async *keys(options: KeyValueStoreIteratorOptions = {}): AsyncGenerator<string, void, undefined> { |
There was a problem hiding this comment.
I'd probably support both AsyncIterable and Promise so that we provide a more consistent API. No strong opinion though.
| listItems(options?: DatasetClientListOptions): AsyncIterable<Data> & Promise<PaginatedList<Data>>; | ||
| listEntries( | ||
| options?: DatasetClientListOptions, | ||
| ): AsyncIterable<[number, Data]> & Promise<PaginatedList<[number, Data]>>; |
There was a problem hiding this comment.
@B4nan now that I think of it, isn't this a BC break? Not that I'm aware of any 3rd party storage implementations, but still, this would require them to implement additional methods.
There was a problem hiding this comment.
Good point, we have at least the old sql storage, which I think some people still use. So let's make those optional to be sure?
There was a problem hiding this comment.
Sounds good to me. And an exception in the "storage frontends" if the method is not implemented?
There was a problem hiding this comment.
new functions optional now, what do you mean by "storage frontends" ?
There was a problem hiding this comment.
Yeah, if we can't use a simple fallback, let's throw.
There was a problem hiding this comment.
do a runtime check and throw in resource-clients? I am not following probably
There was a problem hiding this comment.
storage frontends means the Dataset, KeyValueStore and RequestQueue classes from @crawlee/core, i.e., the ones that delegate to a resource client which may or may not have the new, optional iteration methods implemented.
There was a problem hiding this comment.
@B4nan @janbuchar something like this 1464b60 ?
There was a problem hiding this comment.
Exactly. Just a bunch of nits:
- it's a method, not a function
- missing "the" before " function"
- it's good to delimit identifiers so that the reader can easily distinguish them from natural language - "missing the
keysmethod" would be much better
| this.listItemsPage({ | ||
| desc, | ||
| offset: pageOffset, | ||
| limit: Math.min(pageLimit, LIST_ITEMS_LIMIT), |
There was a problem hiding this comment.
Maybe it's too late for this, but do we really want to cap the limit without telling the user? In their place, I would prefer the library to tell me, instead of quietly changing behavior.
There was a problem hiding this comment.
default limit was there also before and it is quite a number 999_999_999_999
There was a problem hiding this comment.
that leaves me wondering what the purpose of this check even is... but it's not important enough to be resolved here and now
| delete(): Promise<void>; | ||
| downloadItems(...args: unknown[]): Promise<Buffer>; | ||
| listItems(options?: DatasetClientListOptions): Promise<PaginatedList<Data>>; | ||
| listItems(options?: DatasetClientListOptions): AsyncIterable<Data> & Promise<PaginatedList<Data>>; |
There was a problem hiding this comment.
In a similar vein to #3352 (comment), this is breaking. Can we use Partial<AsyncIterable<Data>> in the interface and use the isAsyncIterable helper to check the return value in the storage frontends?
| if (!isAsyncIterable(result)) { | ||
| throw new Error('Resource client "listItems" method does not return an async iterable.'); | ||
| } |
There was a problem hiding this comment.
I'm afraid that achieving full BC will be trickier than that - you'll need to return something that will behave normally when await-ed and throw when async-iterated.
There was a problem hiding this comment.
🤔 any specific idea how to do so? I am getting a feeling that these combined Promise + Iterator types would introduce this BC problems everywhere
There was a problem hiding this comment.
If the result is not AsyncIterable, just Object.defineProperty a Symbol.asyncIterator that throws an error. Not sure about the exact symbol, perhaps it's the other one.
Well, that's my idea of it, anyways. Let's see if that works.
There was a problem hiding this comment.
This is about some (legacy) storage clients not having the async iterable interface, right?
Any chance we can add a compat layer here in Crawlee, something like:
if (!(Symbol.asyncIterator in maybeAsyncIterable)) {
Object.defineProperty(maybeAsyncIterable, Symbol.asyncIterator, {
value: async function* () {
yield* await maybeAsyncIterable;
}
});
}and provide all Crawlee methods to everyone, regardless of the storage implementation?
Or am I understanding the problem wrong?
There was a problem hiding this comment.
@barjin I think you are right, but I moved all this Object.defineProperty out of storage frontends to the memory-storage (so everything is handled there) and now I need to redefine object here anyway. 🤔 but I can't see any other option
There was a problem hiding this comment.
got it 515dd77 but for Iterator it returns only first page which is not the same as iterator returned from memory-storage. I can reimplement it with something like
Object.defineProperty(result, Symbol.asyncIterator, {
async *value() {
let offset = opts.offset ?? 0;
const limit = opts.limit ?? DATASET_ITERATORS_DEFAULT_LIMIT;
while (true) {
const page = await client.listItems({ ...opts, offset, limit });
yield* page.items;
if (offset + page.count >= page.total) break;
offset += limit;
}
},
});
but then we are back with solution 10 commits ago with iterator in storage frontend (and we are doing same thing twice on 2 different places).
There was a problem hiding this comment.
Right, nice catch 👍 maybe BC for the new iterable interface is not
really necessary (given the near-100% market share that memory-storage
and apify have) and we can throw from the Symbol.asyncIterator, if
it's not specified by the storage backend.
Sorry about the detour, my bad here.
There was a problem hiding this comment.
@barjin no problem I guess 👍
@janbuchar what do you say?
- keep throwing the error
- return iterator for first page (current state, but different behaviour than iterator from memory storage)
- reimplement iterator again in fronted for this method (but the same logic is applied in memory storage)
- some other idea I cannot see right now
1 seems to me less noise in a code and flow, if we break BC for ~1% of users it is not big deal.
There was a problem hiding this comment.
1, but throw it only in Symbol.asyncIterator
| const firstPagePromise = (async () => { | ||
| const firstPageKeys = await self.keys(options); | ||
| const entries: [string, unknown][] = []; | ||
| for (const item of firstPageKeys.items) { | ||
| const record = await self.getRecord(item.key); | ||
| if (record) { | ||
| entries.push([item.key, record.value]); | ||
| } | ||
| } | ||
| return entries; | ||
| })(); | ||
|
|
||
| async function* asyncGenerator(): AsyncGenerator<[string, unknown]> { | ||
| for await (const key of self.keys(options)) { | ||
| const record = await self.getRecord(key); | ||
| if (record) { | ||
| yield [key, record.value]; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
This will fetch each KVS record two times if we use the async iterator. The firstPagePromise IIFE is executing immediately, but its result is unused if we for await the returned object (and the asyncGenerator will fetch all the results again).
| // eslint-disable-next-line @typescript-eslint/no-this-alias | ||
| const self = this; |
There was a problem hiding this comment.
ESLint knows (usually) better, why this? 😅
If it's for the this binding in async function* asyncGenerator, can't we use value: asyncGenerator.bind(this) instead?
B4nan
left a comment
There was a problem hiding this comment.
I mostly checked the tests and it looks good to me. Lets address the double fetching Jindra mentioned, otherwise lets ship it finally :]
| test('respects limit option when iterating', async () => { | ||
| const values: unknown[] = []; |
There was a problem hiding this comment.
Why dropping the tests? The limit option should still work as before this commit, right?
|
|
||
| // Yield first page entries | ||
| for (const item of firstPageKeys.items) { | ||
| const record = await getRecord(item.key); |
There was a problem hiding this comment.
This will still double-fetch each record's value on for await... but since this is memory-storage, let's merge this now and keep optimizations for later.
Can you please create an issue about this once merged @l2ysho ?
PR Summary: Implement Async Iterators for Storage Classes
Overview
This PR implements async iterators for KeyValueStore and Dataset storage classes, allowing users to iterate over storage items using
for await...ofloops. The implementation follows the pattern established in apify-client-js.Changes
@crawlee/types (packages/types/src/storages.ts)
DatasetClient.listItems()return type toPartial<AsyncIterable<Data>> & Promise<PaginatedList<Data>>DatasetClient.listEntries()optional method returningAsyncIterable<[number, Data]> & Promise<PaginatedList<[number, Data]>>KeyValueStoreClient.listKeys()return type toPartial<AsyncIterable<KeyValueStoreItemData>> & Promise<KeyValueStoreClientListData>KeyValueStoreClient.keys()optional method returningAsyncIterable<string> & Promise<KeyValueStoreClientListData>KeyValueStoreClient.values()optional method returningAsyncIterable<unknown> & Promise<unknown[]>KeyValueStoreClient.entries()optional method returningAsyncIterable<[string, unknown]> & Promise<[string, unknown][]>@crawlee/memory-storage
packages/memory-storage/src/utils.ts
createPaginatedList()helper for offset-based pagination (Dataset)createPaginatedEntryList()helper for offset-based pagination with index-value entries (Dataset)createKeyList()helper for cursor-based pagination (KeyValueStore)createKeyStringList()helper for cursor-based pagination yielding key stringsfor await...ofpackages/memory-storage/src/resource-clients/dataset.ts
listItems()to usecreatePaginatedListhelperlistEntries()method usingcreatePaginatedEntryListhelperlistItemsPage()method for fetching individual pagespackages/memory-storage/src/resource-clients/key-value-store.ts
listKeys()to usecreateKeyListhelperkeys()method usingcreateKeyStringListhelpervalues()method yielding record values (not full records)entries()method yielding[key, value]tupleslistKeysPage()method for fetching individual pages@crawlee/core
packages/core/src/storages/key_value_store.ts
keys(options?)- async generator yielding all keysvalues<T>(options?)- returns hybridAsyncIterable<T> & Promise<T[]>yielding all valuesentries<T>(options?)- returns hybridAsyncIterable<[string, T]> & Promise<[string, T][]>yielding[key, value]tuples[Symbol.asyncIterator]()- makes KeyValueStore directly iterable (yields entries)KeyValueStoreIteratorOptionsinterfacepackages/core/src/storages/dataset.ts
values(options?)- returns hybridAsyncIterable<Data> & Promise<PaginatedList<Data>>yielding all itemsentries(options?)- returns hybridAsyncIterable<[number, Data]> & Promise<PaginatedList<[number, Data]>>yielding[index, item]tuples[Symbol.asyncIterator]()- makes Dataset directly iterable (yields items)DatasetIteratorOptionsinterfaceTests
packages/memory-storage/test/async-iteration.test.ts (new file)
listItems,listKeys,keys,values, andentriesmethodstest/core/storages/key_value_store.test.ts
keys,values,entries,Symbol.asyncIterator)test/core/storages/dataset.test.ts
values,entries,Symbol.asyncIterator)Usage Examples
Backward Compatibility
All existing code continues to work unchanged. The
listItems()andlistKeys()methods can still be awaited directly to get the paginated response object.Closes #3338