Skip to content

Commit

Permalink
feat: Add multiplexer, round-robin (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
erezrokah committed Aug 10, 2023
1 parent 4a5f9e8 commit 00a842a
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 6 deletions.
58 changes: 58 additions & 0 deletions src/scheduler/scheduler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import test from 'ava';

import { createTable } from '../schema/table.js';

import { getRoundRobinTableClients } from './scheduler.js';

test('getRoundRobinTableClients', (t): void => {
const client = { id: () => 'client_0' };
const tables = [
createTable({
name: 'table1',
multiplexer: (client) => {
return Array.from({ length: 2 }).map((_, index) => ({
id: () => `client_${index}`,
}));
},
}),
createTable({
name: 'table2',
multiplexer: (client) => {
return Array.from({ length: 4 }).map((_, index) => ({
id: () => `client_${index}`,
}));
},
}),
createTable({
name: 'table3',
multiplexer: (client) => {
return Array.from({ length: 1 }).map((_, index) => ({
id: () => `client_${index}`,
}));
},
}),
createTable({
name: 'table4',
multiplexer: (client) => {
return [];
},
}),
];

const tableClients = getRoundRobinTableClients(tables, client);
t.is(tableClients.length, 7);
t.is(tableClients[0].table.name, 'table1');
t.is(tableClients[0].client.id(), 'client_0');
t.is(tableClients[1].table.name, 'table2');
t.is(tableClients[1].client.id(), 'client_0');
t.is(tableClients[2].table.name, 'table3');
t.is(tableClients[2].client.id(), 'client_0');
t.is(tableClients[3].table.name, 'table1');
t.is(tableClients[3].client.id(), 'client_1');
t.is(tableClients[4].table.name, 'table2');
t.is(tableClients[4].client.id(), 'client_1');
t.is(tableClients[5].table.name, 'table2');
t.is(tableClients[5].client.id(), 'client_2');
t.is(tableClients[6].table.name, 'table2');
t.is(tableClients[6].client.id(), 'client_3');
});
78 changes: 73 additions & 5 deletions src/scheduler/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ export type Options = {
stream: SyncStream;
deterministicCQId: boolean;
concurrency: number;
strategy?: Strategy;
};

export enum Strategy {
dfs = 'dfs',
roundRobin = 'round-robin',
}

class TableResolverStream extends Duplex {
queue: unknown[] = [];

Expand Down Expand Up @@ -100,19 +106,81 @@ const resolveTable = async (

syncStream.write(new SyncResponse({ insert: new Insert({ record: encodeResource(resource) }) }));

await Promise.all(table.relations.map((child) => resolveTable(logger, client, child, resource, syncStream)));
await pMap(table.relations, (child) => resolveTable(logger, client, child, resource, syncStream));
}
};

export const sync = async ({ logger, client, tables, stream: syncStream, concurrency }: Options) => {
const syncDfs = async ({ logger, client, tables, stream: syncStream, concurrency }: Omit<Options, 'strategy'>) => {
const tableClients = tables.flatMap((table) => {
const clients = table.multiplexer(client);
return clients.map((client) => ({ table, client }));
});

await pMap(tableClients, ({ table, client }) => resolveTable(logger, client, table, null, syncStream), {
concurrency,
});
};

export const getRoundRobinTableClients = (tables: Table[], client: ClientMeta) => {
let tablesWithClients = tables
.map((table) => ({ table, clients: table.multiplexer(client) }))
.filter(({ clients }) => clients.length > 0);

const tableClients: { table: Table; client: ClientMeta }[] = [];
while (tablesWithClients.length > 0) {
for (const { table, clients } of tablesWithClients) {
tableClients.push({ table, client: clients.shift() as ClientMeta });
}
tablesWithClients = tablesWithClients.filter(({ clients }) => clients.length > 0);
}

return tableClients;
};

const syncRoundRobin = async ({
logger,
client,
tables,
stream: syncStream,
concurrency,
}: Omit<Options, 'strategy'>) => {
const tableClients = getRoundRobinTableClients(tables, client);
await pMap(tableClients, ({ table, client }) => resolveTable(logger, client, table, null, syncStream), {
concurrency,
});
};

export const sync = async ({
logger,
client,
tables,
stream,
concurrency,
strategy = Strategy.dfs,
deterministicCQId,
}: Options) => {
for (const table of tables) {
logger.info(`sending migrate message for table ${table.name}`);
// eslint-disable-next-line @typescript-eslint/naming-convention
syncStream.write(new SyncResponse({ migrate_table: new MigrateTable({ table: encodeTable(table) }) }));
stream.write(new SyncResponse({ migrate_table: new MigrateTable({ table: encodeTable(table) }) }));
}

await pMap(tables, (table) => resolveTable(logger, client, table, null, syncStream), { concurrency });
switch (strategy) {
case Strategy.dfs: {
logger.debug(`using dfs strategy`);
await syncDfs({ logger, client, tables, stream, concurrency, deterministicCQId });
break;
}
case Strategy.roundRobin: {
logger.debug(`using round-robin strategy`);
await syncRoundRobin({ logger, client, tables, stream, concurrency, deterministicCQId });
break;
}
default: {
throw new Error(`unknown strategy ${strategy}`);
}
}

syncStream.end();
stream.end();
return await Promise.resolve();
};
2 changes: 1 addition & 1 deletion src/schema/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const createTable = ({
relations = [],
transform = () => {},
resolver = () => Promise.resolve(),
multiplexer = () => [],
multiplexer = (client) => [client],
postResourceResolver = () => Promise.resolve(),
preResourceResolver = () => Promise.resolve(),
isIncremental = false,
Expand Down

0 comments on commit 00a842a

Please sign in to comment.