Skip to content

Commit

Permalink
fix: Write gRPC call, use for await on write readble stream (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
erezrokah committed Aug 11, 2023
1 parent dd71c60 commit 773a0e5
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 44 deletions.
12 changes: 10 additions & 2 deletions src/grpc/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,16 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
this.plugin.read(call);
}
Write(call: WriteStream, callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.Write.Response>): void {
this.plugin.write(call);
callback(null, new pluginV3.cloudquery.plugin.v3.Write.Response());
this.plugin
.write(call)
.then(() => {
// eslint-disable-next-line promise/no-callback-in-promise
return callback(null, new pluginV3.cloudquery.plugin.v3.Write.Response());
})
.catch((error) => {
// eslint-disable-next-line promise/no-callback-in-promise
return callback(error, null);
});
}
Close(
call: grpc.ServerUnaryCall<
Expand Down
71 changes: 31 additions & 40 deletions src/memdb/write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,54 +11,45 @@ export const createWrite = (
overwrite: OverwriteFunction,
deleteStale: DeleteStaleFunction,
) => {
return (stream: WriteStream): Promise<void> => {
return new Promise((resolve, reject) => {
stream.on('data', (request: WriteRequest) => {
switch (request.message) {
case 'migrate_table': {
// Update table schema in the `tables` map
const table = decodeTable(request.migrate_table.table);
tables[table.name] = table;
break;
}

case 'insert': {
const [tableName, batches] = decodeRecord(request.insert.record);

if (!memoryDB[tableName]) {
memoryDB[tableName] = [];
}
return async (stream: WriteStream) => {
for await (const data of stream) {
const request = data as WriteRequest;
switch (request.message) {
case 'migrate_table': {
// Update table schema in the `tables` map
const table = decodeTable(request.migrate_table.table);
tables[table.name] = table;
break;
}

const tableSchema = tables[tableName];
const pks = getPrimaryKeys(tableSchema);
case 'insert': {
const [tableName, batches] = decodeRecord(request.insert.record);

for (const batch of batches) {
//eslint-disable-next-line unicorn/no-array-for-each
for (const record of batch) {
overwrite(tableSchema, pks, record);
}
}
break;
if (!memoryDB[tableName]) {
memoryDB[tableName] = [];
}

case 'delete': {
deleteStale(request.delete);
break;
}
const tableSchema = tables[tableName];
const pks = getPrimaryKeys(tableSchema);

default: {
throw new Error(`Unknown request message type: ${request.message}`);
for (const batch of batches) {
//eslint-disable-next-line unicorn/no-array-for-each
for (const record of batch) {
overwrite(tableSchema, pks, record);
}
}
break;
}
});

stream.on('finish', () => {
resolve();
});
case 'delete': {
deleteStale(request.delete);
break;
}

stream.on('error', (error) => {
reject(error);
});
});
default: {
throw new Error(`Unknown request message type: ${request.message}`);
}
}
}
};
};
4 changes: 2 additions & 2 deletions src/plugin/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export interface SourceClient {

export interface DestinationClient {
read: (stream: ReadStream) => void;
write: (stream: WriteStream) => void;
write: (stream: WriteStream) => Promise<void>;
}

export interface Client extends SourceClient, DestinationClient {
Expand Down Expand Up @@ -72,7 +72,7 @@ export const newPlugin = (name: string, version: string, newClient: NewClientFun
name: () => name,
version: () => version,
write: (stream: WriteStream) => {
return plugin.client?.write(stream) ?? new Error('client not initialized');
return plugin.client?.write(stream) ?? Promise.reject(new Error('client not initialized'));
},
read: (stream: ReadStream) => {
return plugin.client?.read(stream) ?? new Error('client not initialized');
Expand Down

0 comments on commit 773a0e5

Please sign in to comment.