Skip to content

Commit

Permalink
fix(MemoryStorage): handling of readable streams for key-value stores…
Browse files Browse the repository at this point in the history
… when setting records (#1852)

Fixes an internally reported bug that caused streams provided to never
be read and drained
This also corrects the validator for the value, and removes the other
redundant checks that happen when validating an object (called
constraints, which are extra validators on top of simple typeof checks)
(this is also a feature request for shapeshift!). This might actually
(finally) solve #1843 but testing needs to be done
  • Loading branch information
vladfrangu committed Apr 4, 2023
1 parent 6ad6840 commit a5ee37d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 14 deletions.
11 changes: 10 additions & 1 deletion packages/memory-storage/src/resource-clients/key-value-store.ts
Expand Up @@ -217,7 +217,16 @@ export class KeyValueStoreClient extends BaseClient {
async setRecord(record: storage.KeyValueStoreRecord): Promise<void> {
s.object({
key: s.string.lengthGreaterThan(0),
value: s.union(s.null, s.string, s.number, s.instance(Buffer), s.object({}).passthrough),
value: s.union(
s.null,
s.string,
s.number,
s.instance(Buffer),
s.instance(ArrayBuffer),
s.typedArray(),
// disabling validation will make shapeshift only check the object given is an actual object, not null, nor array
s.object({}).setValidationEnabled(false),
),
contentType: s.string.lengthGreaterThan(0).optional,
}).parse(record);

Expand Down
17 changes: 4 additions & 13 deletions packages/memory-storage/src/utils.ts
Expand Up @@ -33,9 +33,9 @@ export function uniqueKeyToRequestId(uniqueKey: string): string {
export function isBuffer(value: unknown): boolean {
try {
s.union(
s.typedArray(),
s.instance(ArrayBuffer),
s.instance(Buffer),
s.instance(ArrayBuffer),
s.typedArray(),
).parse(value);

return true;
Expand All @@ -44,17 +44,8 @@ export function isBuffer(value: unknown): boolean {
}
}

export function isStream(value: unknown): boolean {
try {
s.object({
on: s.any,
pipe: s.any,
}).passthrough.parse(value);

return true;
} catch {
return false;
}
export function isStream(value: any): boolean {
return typeof value === 'object' && value && ['on', 'pipe'].every((key) => key in value && typeof value[key] === 'function');
}

export const memoryStorageLog = defaultLog.child({ prefix: 'MemoryStorage' });
Expand Down
22 changes: 22 additions & 0 deletions packages/memory-storage/test/key-value-store-stream.test.ts
@@ -0,0 +1,22 @@
import { MemoryStorage } from '@crawlee/memory-storage';
import { Readable } from 'node:stream';

describe('KeyValueStore should drain streams when setting records', () => {
const storage = new MemoryStorage({
persistStorage: false,
});

const fsStream = Readable.from([Buffer.from('hello'), Buffer.from('world')]);

test('should drain stream', async () => {
const defaultStoreInfo = await storage.keyValueStores().getOrCreate('default');
const defaultStore = storage.keyValueStore(defaultStoreInfo.id);

await defaultStore.setRecord({ key: 'streamz', value: fsStream, contentType: 'text/plain' });

expect(fsStream.destroyed).toBeTruthy();

const record = await defaultStore.getRecord('streamz');
expect(record!.value.toString('utf8')).toEqual('helloworld');
});
});

0 comments on commit a5ee37d

Please sign in to comment.