diff --git a/src/grpc/plugin.ts b/src/grpc/plugin.ts index 964318f..1c777e3 100644 --- a/src/grpc/plugin.ts +++ b/src/grpc/plugin.ts @@ -111,8 +111,16 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu this.plugin.read(call); } Write(call: WriteStream, callback: grpc.sendUnaryData): void { - this.plugin.write(call); - callback(null, new pluginV3.cloudquery.plugin.v3.Write.Response()); + this.plugin + .write(call) + .then(() => { + // eslint-disable-next-line promise/no-callback-in-promise + return callback(null, new pluginV3.cloudquery.plugin.v3.Write.Response()); + }) + .catch((error) => { + // eslint-disable-next-line promise/no-callback-in-promise + return callback(error, null); + }); } Close( call: grpc.ServerUnaryCall< diff --git a/src/memdb/write.ts b/src/memdb/write.ts index 6bd4366..b78a6a0 100644 --- a/src/memdb/write.ts +++ b/src/memdb/write.ts @@ -11,54 +11,45 @@ export const createWrite = ( overwrite: OverwriteFunction, deleteStale: DeleteStaleFunction, ) => { - return (stream: WriteStream): Promise => { - return new Promise((resolve, reject) => { - stream.on('data', (request: WriteRequest) => { - switch (request.message) { - case 'migrate_table': { - // Update table schema in the `tables` map - const table = decodeTable(request.migrate_table.table); - tables[table.name] = table; - break; - } - - case 'insert': { - const [tableName, batches] = decodeRecord(request.insert.record); - - if (!memoryDB[tableName]) { - memoryDB[tableName] = []; - } + return async (stream: WriteStream) => { + for await (const data of stream) { + const request = data as WriteRequest; + switch (request.message) { + case 'migrate_table': { + // Update table schema in the `tables` map + const table = decodeTable(request.migrate_table.table); + tables[table.name] = table; + break; + } - const tableSchema = tables[tableName]; - const pks = getPrimaryKeys(tableSchema); + case 'insert': { + const [tableName, batches] = decodeRecord(request.insert.record); - for (const batch of batches) { - //eslint-disable-next-line unicorn/no-array-for-each - for (const record of batch) { - overwrite(tableSchema, pks, record); - } - } - break; + if (!memoryDB[tableName]) { + memoryDB[tableName] = []; } - case 'delete': { - deleteStale(request.delete); - break; - } + const tableSchema = tables[tableName]; + const pks = getPrimaryKeys(tableSchema); - default: { - throw new Error(`Unknown request message type: ${request.message}`); + for (const batch of batches) { + //eslint-disable-next-line unicorn/no-array-for-each + for (const record of batch) { + overwrite(tableSchema, pks, record); + } } + break; } - }); - stream.on('finish', () => { - resolve(); - }); + case 'delete': { + deleteStale(request.delete); + break; + } - stream.on('error', (error) => { - reject(error); - }); - }); + default: { + throw new Error(`Unknown request message type: ${request.message}`); + } + } + } }; }; diff --git a/src/plugin/plugin.ts b/src/plugin/plugin.ts index fb07a34..cfc516c 100644 --- a/src/plugin/plugin.ts +++ b/src/plugin/plugin.ts @@ -36,7 +36,7 @@ export interface SourceClient { export interface DestinationClient { read: (stream: ReadStream) => void; - write: (stream: WriteStream) => void; + write: (stream: WriteStream) => Promise; } export interface Client extends SourceClient, DestinationClient { @@ -72,7 +72,7 @@ export const newPlugin = (name: string, version: string, newClient: NewClientFun name: () => name, version: () => version, write: (stream: WriteStream) => { - return plugin.client?.write(stream) ?? new Error('client not initialized'); + return plugin.client?.write(stream) ?? Promise.reject(new Error('client not initialized')); }, read: (stream: ReadStream) => { return plugin.client?.read(stream) ?? new Error('client not initialized');