Skip to content

Commit

Permalink
[IND-545]: Batch Process Vulcan messages (#1018)
Browse files Browse the repository at this point in the history
* [IND-545]: Batch Process Vulcan messages

* nits

* nits
  • Loading branch information
Christopher-Li authored Jan 30, 2024
1 parent a3ce076 commit 53274e2
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 32 deletions.
4 changes: 2 additions & 2 deletions indexer/packages/kafka/__tests__/consumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {
consumer, startConsumer, addOnMessageFunction, stopConsumer,
consumer, startConsumer, updateOnMessageFunction, stopConsumer,
} from '../src/consumer';
import { producer } from '../src/producer';
import { createKafkaMessage } from './helpers/kafka';
Expand All @@ -26,7 +26,7 @@ describe.skip('consumer', () => {

it('is consuming message', async () => {
const onMessageFn: (topic: string, message: KafkaMessage) => Promise<void> = jest.fn();
addOnMessageFunction(onMessageFn);
updateOnMessageFunction(onMessageFn);
const kafkaMessage: KafkaMessage = createKafkaMessage(null);

await producer.send({
Expand Down
8 changes: 4 additions & 4 deletions indexer/packages/kafka/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ export const kafkaConfigSchema = {
default: 'localhost:9092',
requireInEnv: [NodeEnv.PRODUCTION, NodeEnv.STAGING],
}),
KAFKA_CONNECTION_TIMEOUT_MS: parseInteger({ default: 5000 }),
KAFKA_SESSION_TIMEOUT_MS: parseInteger({ default: 60000 }),
KAFKA_REBALANCE_TIMEOUT_MS: parseInteger({ default: 50000 }),
KAFKA_HEARTBEAT_INTERVAL_MS: parseInteger({ default: 2000 }),
KAFKA_CONNECTION_TIMEOUT_MS: parseInteger({ default: 5_000 }),
KAFKA_SESSION_TIMEOUT_MS: parseInteger({ default: 60_000 }),
KAFKA_REBALANCE_TIMEOUT_MS: parseInteger({ default: 50_000 }),
KAFKA_HEARTBEAT_INTERVAL_MS: parseInteger({ default: 5_000 }),
KAFKA_CONCURRENT_PARTITIONS: parseInteger({ default: 1 }),
// If true, consumers will have unique group ids, and SERVICE_NAME will be a common prefix for
// the consumer group ids.
Expand Down
46 changes: 33 additions & 13 deletions indexer/packages/kafka/src/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import {
logger,
} from '@dydxprotocol-indexer/base';
import { Consumer, ConsumerRunConfig, KafkaMessage } from 'kafkajs';
import {
Consumer, ConsumerRunConfig, EachBatchPayload, KafkaMessage,
} from 'kafkajs';
import { v4 as uuidv4 } from 'uuid';

import config from './config';
Expand All @@ -21,12 +23,28 @@ export const consumer: Consumer = kafka.consumer({
});

// List of functions to run per message consumed.
const onMessageFunctions: ((topic: string, message: KafkaMessage) => Promise<void>)[] = [];
let onMessageFunction: (topic: string, message: KafkaMessage) => Promise<void>;

// List of function to be run per batch consumed.
let onBatchFunction: (payload: EachBatchPayload) => Promise<void>;

export function addOnMessageFunction(
/**
* Overwrite function to be run on each kafka message
* @param onMessage
*/
export function updateOnMessageFunction(
onMessage: (topic: string, message: KafkaMessage) => Promise<void>,
): void {
onMessageFunctions.push(onMessage);
onMessageFunction = onMessage;
}

/**
* Overwrite function to be run on each kafka batch
*/
export function updateOnBatchFunction(
onBatch: (payload: EachBatchPayload) => Promise<void>,
): void {
onBatchFunction = onBatch;
}

// Whether the consumer is stopped.
Expand Down Expand Up @@ -66,20 +84,22 @@ export async function stopConsumer(): Promise<void> {
await consumer.disconnect();
}

export async function startConsumer(): Promise<void> {
export async function startConsumer(batchProcessing: boolean = false): Promise<void> {
const consumerRunConfig: ConsumerRunConfig = {
// The last offset of each batch will be committed if processing does not error.
// The commit will still happen if the number of messages in the batch < autoCommitThreshold.
eachBatchAutoResolve: true,
partitionsConsumedConcurrently: config.KAFKA_CONCURRENT_PARTITIONS,
autoCommit: true,
};

consumerRunConfig.eachMessage = async ({ topic, message }) => {
await Promise.all(
onMessageFunctions.map(
async (onMessage: (topic: string, message: KafkaMessage) => Promise<void>) => {
await onMessage(topic, message);
}),
);
};
if (batchProcessing) {
consumerRunConfig.eachBatch = onBatchFunction;
} else {
consumerRunConfig.eachMessage = async ({ topic, message }) => {
await onMessageFunction(topic, message);
};
}

await consumer.run(consumerRunConfig);

Expand Down
8 changes: 4 additions & 4 deletions indexer/services/ender/src/helpers/kafka/kafka-controller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
consumer, producer, TO_ENDER_TOPIC, addOnMessageFunction,
consumer, producer, TO_ENDER_TOPIC, updateOnMessageFunction,
} from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';

Expand All @@ -20,9 +20,9 @@ export async function connect(): Promise<void> {
fromBeginning: true,
});

addOnMessageFunction((_topic: string, message: KafkaMessage): Promise<void> => {
return onMessage(message);
});
updateOnMessageFunction(
(_topic: string, message: KafkaMessage): Promise<void> => onMessage(message),
);

logger.info({
at: 'consumers#connect',
Expand Down
4 changes: 2 additions & 2 deletions indexer/services/scripts/src/print-block.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { logger, wrapBackgroundTask } from '@dydxprotocol-indexer/base';
import {
addOnMessageFunction,
updateOnMessageFunction,
consumer,
producer,
startConsumer,
Expand Down Expand Up @@ -66,7 +66,7 @@ export async function connect(height: number): Promise<void> {
fromBeginning: true,
});

addOnMessageFunction((_topic: string, message: KafkaMessage): Promise<void> => {
updateOnMessageFunction((_topic: string, message: KafkaMessage): Promise<void> => {
return printMessageAtHeight(message, height);
});

Expand Down
4 changes: 2 additions & 2 deletions indexer/services/socks/src/lib/message-forwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
InfoObject,
safeJsonStringify,
} from '@dydxprotocol-indexer/base';
import { addOnMessageFunction } from '@dydxprotocol-indexer/kafka';
import { updateOnMessageFunction } from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';
import _ from 'lodash';

Expand Down Expand Up @@ -63,7 +63,7 @@ export class MessageForwarder {

// Kafkajs requires the function passed into `eachMessage` be an async function.
// eslint-disable-next-line @typescript-eslint/require-await
addOnMessageFunction(async (topic, message): Promise<void> => {
updateOnMessageFunction(async (topic, message): Promise<void> => {
return this.onMessage(topic, message);
});

Expand Down
4 changes: 4 additions & 0 deletions indexer/services/vulcan/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ export const configSchema = {
...kafkaConfigSchema,
...redisConfigSchema,

BATCH_PROCESSING_ENABLED: parseBoolean({ default: true }),
KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS: parseNumber({
default: 3_000,
}),
FLUSH_KAFKA_MESSAGES_INTERVAL_MS: parseNumber({
default: 10,
}),
Expand Down
22 changes: 18 additions & 4 deletions indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
consumer, producer, KafkaTopics, addOnMessageFunction,
consumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction,
} from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';

import config from '../../config';
import { onBatch } from '../../lib/on-batch';
import { onMessage } from '../../lib/on-message';

export async function connect(): Promise<void> {
Expand All @@ -20,9 +22,21 @@ export async function connect(): Promise<void> {
fromBeginning: true,
});

addOnMessageFunction((_topic: string, message: KafkaMessage): Promise<void> => {
return onMessage(message);
});
if (config.BATCH_PROCESSING_ENABLED) {
logger.info({
at: 'consumers#connect',
message: 'Batch processing enabled',
});
updateOnBatchFunction(onBatch);
} else {
logger.info({
at: 'consumers#connect',
message: 'Batch processing disabled. Processing each message individually',
});
updateOnMessageFunction((_topic: string, message: KafkaMessage): Promise<void> => {
return onMessage(message);
});
}

logger.info({
at: 'consumers#connect',
Expand Down
2 changes: 1 addition & 1 deletion indexer/services/vulcan/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async function startService(): Promise<void> {
connectToRedis(),
]);

await startConsumer();
await startConsumer(config.BATCH_PROCESSING_ENABLED);

logger.info({
at: 'index#start',
Expand Down
92 changes: 92 additions & 0 deletions indexer/services/vulcan/src/lib/on-batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { logger, stats } from '@dydxprotocol-indexer/base';
import {
Batch,
EachBatchPayload,
KafkaMessage,
} from 'kafkajs';

import config from '../config';
import { onMessage } from './on-message';

export async function onBatch(
payload: EachBatchPayload,
): Promise<void> {
const batch: Batch = payload.batch;
const topic: string = batch.topic;
const partition: string = batch.partition.toString();
const metricTags: Record<string, string> = { topic, partition };
if (batch.isEmpty()) {
logger.error({
at: 'on-batch#onBatch',
message: 'Empty batch',
...metricTags,
});
return;
}

const startTime: number = Date.now();
const firstMessageTimestamp: number = Number(batch.messages[0].timestamp);
const batchTimeInQueue: number = startTime - firstMessageTimestamp;
const batchInfo = {
firstMessageTimestamp: new Date(firstMessageTimestamp).toISOString(),
batchTimeInQueue,
messagesInBatch: batch.messages.length,
firstOffset: batch.firstOffset(),
lastOffset: batch.lastOffset(),
...metricTags,
};

logger.info({
at: 'on-batch#onBatch',
message: 'Received batch',
...batchInfo,
});
stats.timing(
'vulcan.batch_time_in_queue',
batchTimeInQueue,
metricTags,
);

let lastCommitTime: number = startTime;
for (let i = 0; i < batch.messages.length; i++) {
const message: KafkaMessage = batch.messages[i];
await onMessage(message);

// Commit every KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS to reduce number of roundtrips, and
// also prevent disconnecting from the broker due to inactivity.
const now: number = Date.now();
if (now - lastCommitTime > config.KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS) {
logger.info({
at: 'on-batch#onBatch',
message: 'Committing offsets and sending heart beat',
...batchInfo,
});
payload.resolveOffset(message.offset);
await Promise.all([
payload.heartbeat(),
// commitOffsetsIfNecessary will respect autoCommitThreshold and will not commit if
// fewer messages than the threshold have been processed since the last commit.
payload.commitOffsetsIfNecessary(),
]);
lastCommitTime = now;
}
}

const batchProcessingTime: number = Date.now() - startTime;
logger.info({
at: 'on-batch#onBatch',
message: 'Finished Processing Batch',
batchProcessingTime,
...batchInfo,
});
stats.timing(
'vulcan.batch_processing_time',
batchProcessingTime,
metricTags,
);
stats.timing(
'vulcan.batch_size',
batch.messages.length,
metricTags,
);
}

0 comments on commit 53274e2

Please sign in to comment.