Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/base/resource_collection_client.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { PaginatedResponse, PaginationOptions } from '../utils';
import { parseDateFields, pluckData } from '../utils';
import { ApiClient } from './api_client';

Expand All @@ -18,6 +19,41 @@ export class ResourceCollectionClient extends ApiClient {
return parseDateFields(pluckData(response.data)) as R;
}

/**
* Returns async iterator to paginate through all items and first page of results is returned immediately as well.
*/
protected _getIterablePagination<T extends PaginationOptions, Data, R extends PaginatedResponse<Data>>(
options: T = {} as T,
): AsyncIterable<Data> & Promise<R> {
const getPaginatedList = this._list.bind(this);
const paginatedListPromise = getPaginatedList<T, R>(options);

async function* asyncGenerator() {
let currentPage = await paginatedListPromise;
yield* currentPage.items;
const offset = options.offset || 0;
const limit = Math.min(options.limit || currentPage.total, currentPage.total);

let currentOffset = offset + currentPage.items.length;
let remainingItems = Math.min(currentPage.total - offset, limit) - currentPage.items.length;

while (
currentPage.items.length > 0 && // Continue only if at least some items were returned in the last page.
remainingItems > 0
) {
const newOptions = { ...options, limit: remainingItems, offset: currentOffset };
currentPage = await getPaginatedList<T, R>(newOptions);
yield* currentPage.items;
currentOffset += currentPage.items.length;
remainingItems -= currentPage.items.length;
}
}

return Object.defineProperty(paginatedListPromise, Symbol.asyncIterator, {
value: asyncGenerator,
}) as unknown as AsyncIterable<Data> & Promise<R>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there is some type-scriptish way to do this without the as keyword as unknown as AsyncIterable<Data> & Promise<R>;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik you can make a better-typed defineProperty method like this:

function defineProperty<T, K extends PropertyKey>(
    obj: T,
    key: K,
    descriptor: PropertyDescriptor,
): T & { [P in K]: PropertyDescriptor['value'] } {
    Object.defineProperty(obj, key, descriptor);
    return obj as T & { [P in K]: PropertyDescriptor['value'] };
}

This implementation will cast the return type to the original type & { key: typeof value }.

I'm not entirely sure if it's worth the extra 8 lines. It's a rather hacky solution, so explicit cast is IMO okay here.

Copy link
Contributor Author

@Pijukatel Pijukatel Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To completely avoid any casts, I could do something like this. Is it worth it? Or should I just leave it as it is with as unknown as AsyncIterable<Data> & Promise<R>;

class IterablePromise<Data, R extends PaginatedResponse<Data>> implements AsyncIterable<Data>, Promise<R> {
    private iteratorFactory: () => AsyncIterator<Data>;
    private promise: Promise<R>;

    constructor(promise: Promise<R>, iteratorFactory: () => AsyncIterator<Data>) {
        this.iteratorFactory = iteratorFactory;
        this.promise = promise;
    }

    async then<TResult1 = R, TResult2 = never>(
        onfulfilled?: ((value: R) => TResult1 | PromiseLike<TResult1>) | undefined | null,
        onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null,
    ): Promise<TResult1 | TResult2> {
        return this.promise.then(onfulfilled, onrejected);
    }

    async catch<TResult = never>(
        onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | undefined | null,
    ): Promise<R | TResult> {
        return this.promise.catch(onrejected);
    }

    async finally(onfinally?: (() => void) | undefined | null): Promise<R> {
        return this.promise.finally(onfinally);
    }

    [Symbol.asyncIterator](): AsyncIterator<Data> {
        return this.iteratorFactory();
    }

    get [Symbol.toStringTag]() {
        return 'Promise';
    }
}

and use it in a simple way: new IterablePromise<Data, R>(paginatedListPromise, asyncIterator);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...And I was doubting whether adding 8 LOC would be too much 😅

IMO it's not necessary, especially if it can be solved by one cast (admittedly, not a good practice, but here it's afaiac fine).

}

protected async _create<D, R>(resource: D): Promise<R> {
const response = await this.httpClient.call({
url: this._url(),
Expand Down
6 changes: 4 additions & 2 deletions src/resource_clients/store_collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ export class StoreCollectionClient extends ResourceCollectionClient {
/**
* https://docs.apify.com/api/v2/#/reference/store/store-actors-collection/get-list-of-actors-in-store
*/
async list(options: StoreCollectionListOptions = {}): Promise<PaginatedList<ActorStoreList>> {
list(
options: StoreCollectionListOptions = {},
): Promise<PaginatedList<ActorStoreList>> & AsyncIterable<ActorStoreList> {
ow(
options,
ow.object.exactShape({
Expand All @@ -33,7 +35,7 @@ export class StoreCollectionClient extends ResourceCollectionClient {
}),
);

return this._list(options);
return this._getIterablePagination(options);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be easily applied to other similar endpoints, but maybe it is better to do it gradually to limit the size of the change?

}
}

Expand Down
16 changes: 16 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,20 @@ export interface PaginationIteratorOptions {
exclusiveStartId?: string;
}

export interface PaginationOptions {
/** Position of the first returned entry. */
offset?: number;
/** Maximum number of entries requested. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** Maximum number of entries requested. */
/** Maximum number of entries requested for one chunk. */

not sure if page or chunk is better, but it should be clear this is a limit for the chunk and not a total limit for the async iterator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually the total limit for the whole iterator. Chunk size is limited by the length of the platform response; it is not limited by this code.

limit?: number;
}

export interface PaginatedResponse<Data> {
/** Total count of entries. */
total: number;
/** Entries. */
items: Data[];
}

export interface PaginatedList<Data> {
/** Total count of entries in the dataset. */
total: number;
Expand All @@ -248,6 +262,8 @@ export interface PaginatedList<Data> {
items: Data[];
}

export interface IterablePaginatedList<Data> extends PaginatedList<Data>, AsyncIterable<PaginatedList<Data>> {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we could just enhance the PaginatedList interface directly, allowing this for every place where we return it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to keep both types, because internally we will keep using the old PaginatedList


export function cast<T>(input: unknown): T {
return input as T;
}
Expand Down
84 changes: 82 additions & 2 deletions test/store.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { StoreCollectionListOptions } from 'apify-client';
import { ApifyClient } from 'apify-client';

import type { ApifyRequestConfig, ApifyResponse } from '../src/http_client';
import { PaginationOptions } from '../src/utils';
import { Browser, DEFAULT_OPTIONS, validateRequest } from './_helper';
import { mockServer } from './mock_server/server';

Expand Down Expand Up @@ -43,7 +45,6 @@ describe('Store', () => {
username: 'my username',
pricingModel: 'my pricing model',
};

const res: any = client && (await client.store().list(opts));
expect(res.id).toEqual('store-list');
validateRequest(opts);
Expand All @@ -53,7 +54,86 @@ describe('Store', () => {
opts,
);
expect(browserRes.id).toEqual('store-list');
expect(browserRes).toEqual(res);
const { [Symbol.asyncIterator]: _, ...expectedResponse } = res;
expect(browserRes).toEqual(expectedResponse);
validateRequest(opts);
});
});

describe('actor.store.list as async iterable', () => {
// Test using store().list() as an async iterable
const client: ApifyClient = new ApifyClient();

const testCases = [
{
testName: 'User offset, no limit',
userDefinedOptions: { offset: 1000 },
expectedItems: 1500,
},
{
testName: 'No offset, no limit',
userDefinedOptions: {},
expectedItems: 2500,
},
{
testName: 'No offset, user limit',
userDefinedOptions: { limit: 1100 },
expectedItems: 1100,
},
{
testName: 'User offset, user limit',
userDefinedOptions: { offset: 1000, limit: 1100 },
expectedItems: 1100,
},
{
testName: 'User out of range offset, no limit',
userDefinedOptions: { offset: 3000 },
expectedItems: 0,
},
{
testName: 'User no offset, out of range limit',
userDefinedOptions: { limit: 3000 },
expectedItems: 2500,
},
];

test.each(testCases)('$testName', async ({ userDefinedOptions, expectedItems }) => {
const mockedPlatformLogic = async (request: ApifyRequestConfig) => {
// Simulated platform logic for pagination when there are 2500 actors in store.
const maxItems = 2500;
const maxItemsPerPage = 1000;
const offset = request.params.offset ? request.params.offset : 0;
const limit = request.params.limit ? request.params.limit : 0;
if (offset < 0 || limit < 0) {
throw new Error('Offset and limit must be non-negative');
}

const lowerIndex = Math.min(offset, maxItems);
const upperIndex = Math.min(offset + (limit || maxItems), maxItems);
const returnedItemsCount = Math.min(upperIndex - lowerIndex, maxItemsPerPage);

return {
data: {
data: {
total: maxItems,
count: returnedItemsCount,
offset,
limit: returnedItemsCount,
desc: false,
items: new Array(returnedItemsCount).fill('some actor details'),
},
},
} as ApifyResponse;
};

const storeClient = client.store();
const mockedClient = jest.spyOn(storeClient.httpClient, 'call').mockImplementation(mockedPlatformLogic);

const totalItems: any[] = [];
for await (const page of client.store().list(userDefinedOptions)) {
totalItems.push(page);
}
mockedClient.mockRestore();
expect(totalItems.length).toBe(expectedItems);
});
});