From 66771e05ec0aa180251ad6f8031c9bd15decbb03 Mon Sep 17 00:00:00 2001 From: Christopher Li Date: Wed, 24 Jan 2024 13:16:56 -0500 Subject: [PATCH] [IND-545]: Batch Process Vulcan messages --- .../packages/kafka/__tests__/consumer.test.ts | 4 +- indexer/packages/kafka/src/config.ts | 2 +- indexer/packages/kafka/src/consumer.ts | 46 +++++++--- .../src/helpers/kafka/kafka-controller.ts | 4 +- indexer/services/scripts/src/print-block.ts | 4 +- .../socks/src/lib/message-forwarder.ts | 4 +- indexer/services/vulcan/src/config.ts | 8 ++ .../src/helpers/kafka/kafka-controller.ts | 22 ++++- indexer/services/vulcan/src/index.ts | 2 +- indexer/services/vulcan/src/lib/on-batch.ts | 92 +++++++++++++++++++ 10 files changed, 161 insertions(+), 27 deletions(-) create mode 100644 indexer/services/vulcan/src/lib/on-batch.ts diff --git a/indexer/packages/kafka/__tests__/consumer.test.ts b/indexer/packages/kafka/__tests__/consumer.test.ts index f2d330b611..de801b2dfe 100644 --- a/indexer/packages/kafka/__tests__/consumer.test.ts +++ b/indexer/packages/kafka/__tests__/consumer.test.ts @@ -1,5 +1,5 @@ import { - consumer, startConsumer, addOnMessageFunction, stopConsumer, + consumer, startConsumer, updateOnMessageFunction, stopConsumer, } from '../src/consumer'; import { producer } from '../src/producer'; import { createKafkaMessage } from './helpers/kafka'; @@ -26,7 +26,7 @@ describe.skip('consumer', () => { it('is consuming message', async () => { const onMessageFn: (topic: string, message: KafkaMessage) => Promise = jest.fn(); - addOnMessageFunction(onMessageFn); + updateOnMessageFunction(onMessageFn); const kafkaMessage: KafkaMessage = createKafkaMessage(null); await producer.send({ diff --git a/indexer/packages/kafka/src/config.ts b/indexer/packages/kafka/src/config.ts index 366aa64db0..ecfa90d29e 100644 --- a/indexer/packages/kafka/src/config.ts +++ b/indexer/packages/kafka/src/config.ts @@ -21,7 +21,7 @@ export const kafkaConfigSchema = { KAFKA_CONNECTION_TIMEOUT_MS: parseInteger({ default: 5000 }), KAFKA_SESSION_TIMEOUT_MS: parseInteger({ default: 60000 }), KAFKA_REBALANCE_TIMEOUT_MS: parseInteger({ default: 50000 }), - KAFKA_HEARTBEAT_INTERVAL_MS: parseInteger({ default: 2000 }), + KAFKA_HEARTBEAT_INTERVAL_MS: parseInteger({ default: 5000 }), KAFKA_CONCURRENT_PARTITIONS: parseInteger({ default: 1 }), // If true, consumers will have unique group ids, and SERVICE_NAME will be a common prefix for // the consumer group ids. diff --git a/indexer/packages/kafka/src/consumer.ts b/indexer/packages/kafka/src/consumer.ts index f691df5712..b366bd433a 100644 --- a/indexer/packages/kafka/src/consumer.ts +++ b/indexer/packages/kafka/src/consumer.ts @@ -1,7 +1,9 @@ import { logger, } from '@dydxprotocol-indexer/base'; -import { Consumer, ConsumerRunConfig, KafkaMessage } from 'kafkajs'; +import { + Consumer, ConsumerRunConfig, EachBatchPayload, KafkaMessage, +} from 'kafkajs'; import { v4 as uuidv4 } from 'uuid'; import config from './config'; @@ -21,12 +23,28 @@ export const consumer: Consumer = kafka.consumer({ }); // List of functions to run per message consumed. -const onMessageFunctions: ((topic: string, message: KafkaMessage) => Promise)[] = []; +let onMessageFunction: (topic: string, message: KafkaMessage) => Promise; + +// List of function to be run per batch consumed. +let onBatchFunction: (payload: EachBatchPayload) => Promise; -export function addOnMessageFunction( +/** + * Overwrite function to be run on each kafka message + * @param onMessage + */ +export function updateOnMessageFunction( onMessage: (topic: string, message: KafkaMessage) => Promise, ): void { - onMessageFunctions.push(onMessage); + onMessageFunction = onMessage; +} + +/** + * Overwrite function to be run on each kafka batch + */ +export function updateOnBatchFunction( + onBatch: (payload: EachBatchPayload) => Promise, +): void { + onBatchFunction = onBatch; } // Whether the consumer is stopped. @@ -66,20 +84,22 @@ export async function stopConsumer(): Promise { await consumer.disconnect(); } -export async function startConsumer(): Promise { +export async function startConsumer(batchProcessing: boolean = false): Promise { const consumerRunConfig: ConsumerRunConfig = { + // The last offset of each batch will be committed if processing does not error. + // The commit will still happen if the number of messages in the batch < autoCommitThreshold. + eachBatchAutoResolve: true, partitionsConsumedConcurrently: config.KAFKA_CONCURRENT_PARTITIONS, autoCommit: true, }; - consumerRunConfig.eachMessage = async ({ topic, message }) => { - await Promise.all( - onMessageFunctions.map( - async (onMessage: (topic: string, message: KafkaMessage) => Promise) => { - await onMessage(topic, message); - }), - ); - }; + if (batchProcessing) { + consumerRunConfig.eachBatch = onBatchFunction; + } else { + consumerRunConfig.eachMessage = async ({ topic, message }) => { + await onMessageFunction(topic, message); + }; + } await consumer.run(consumerRunConfig); diff --git a/indexer/services/ender/src/helpers/kafka/kafka-controller.ts b/indexer/services/ender/src/helpers/kafka/kafka-controller.ts index 3744316fa6..c3d910b29e 100644 --- a/indexer/services/ender/src/helpers/kafka/kafka-controller.ts +++ b/indexer/services/ender/src/helpers/kafka/kafka-controller.ts @@ -1,6 +1,6 @@ import { logger } from '@dydxprotocol-indexer/base'; import { - consumer, producer, TO_ENDER_TOPIC, addOnMessageFunction, + consumer, producer, TO_ENDER_TOPIC, updateOnMessageFunction, } from '@dydxprotocol-indexer/kafka'; import { KafkaMessage } from 'kafkajs'; @@ -20,7 +20,7 @@ export async function connect(): Promise { fromBeginning: true, }); - addOnMessageFunction((_topic: string, message: KafkaMessage): Promise => { + updateOnMessageFunction((_topic: string, message: KafkaMessage): Promise => { return onMessage(message); }); diff --git a/indexer/services/scripts/src/print-block.ts b/indexer/services/scripts/src/print-block.ts index b4eee6d106..05855229ca 100644 --- a/indexer/services/scripts/src/print-block.ts +++ b/indexer/services/scripts/src/print-block.ts @@ -1,6 +1,6 @@ import { logger, wrapBackgroundTask } from '@dydxprotocol-indexer/base'; import { - addOnMessageFunction, + updateOnMessageFunction, consumer, producer, startConsumer, @@ -66,7 +66,7 @@ export async function connect(height: number): Promise { fromBeginning: true, }); - addOnMessageFunction((_topic: string, message: KafkaMessage): Promise => { + updateOnMessageFunction((_topic: string, message: KafkaMessage): Promise => { return printMessageAtHeight(message, height); }); diff --git a/indexer/services/socks/src/lib/message-forwarder.ts b/indexer/services/socks/src/lib/message-forwarder.ts index 26b58e444e..1df0b17a2a 100644 --- a/indexer/services/socks/src/lib/message-forwarder.ts +++ b/indexer/services/socks/src/lib/message-forwarder.ts @@ -4,7 +4,7 @@ import { InfoObject, safeJsonStringify, } from '@dydxprotocol-indexer/base'; -import { addOnMessageFunction } from '@dydxprotocol-indexer/kafka'; +import { updateOnMessageFunction } from '@dydxprotocol-indexer/kafka'; import { KafkaMessage } from 'kafkajs'; import _ from 'lodash'; @@ -63,7 +63,7 @@ export class MessageForwarder { // Kafkajs requires the function passed into `eachMessage` be an async function. // eslint-disable-next-line @typescript-eslint/require-await - addOnMessageFunction(async (topic, message): Promise => { + updateOnMessageFunction(async (topic, message): Promise => { return this.onMessage(topic, message); }); diff --git a/indexer/services/vulcan/src/config.ts b/indexer/services/vulcan/src/config.ts index 82e21bc18b..5e13b09a92 100644 --- a/indexer/services/vulcan/src/config.ts +++ b/indexer/services/vulcan/src/config.ts @@ -7,6 +7,7 @@ import { parseSchema, baseConfigSchema, parseBoolean, + ONE_SECOND_IN_MILLISECONDS, } from '@dydxprotocol-indexer/base'; import { kafkaConfigSchema, @@ -20,6 +21,13 @@ export const configSchema = { ...kafkaConfigSchema, ...redisConfigSchema, + BATCH_PROCESSING_ENABLED: parseBoolean({ default: true }), + KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY: parseNumber({ + default: 20, + }), + KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS: parseNumber({ + default: 3 * ONE_SECOND_IN_MILLISECONDS, + }), FLUSH_KAFKA_MESSAGES_INTERVAL_MS: parseNumber({ default: 10, }), diff --git a/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts b/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts index 6e156276d1..7c71f72af8 100644 --- a/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts +++ b/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts @@ -1,9 +1,11 @@ import { logger } from '@dydxprotocol-indexer/base'; import { - consumer, producer, KafkaTopics, addOnMessageFunction, + consumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction, } from '@dydxprotocol-indexer/kafka'; import { KafkaMessage } from 'kafkajs'; +import config from '../../config'; +import { onBatch } from '../../lib/on-batch'; import { onMessage } from '../../lib/on-message'; export async function connect(): Promise { @@ -20,9 +22,21 @@ export async function connect(): Promise { fromBeginning: true, }); - addOnMessageFunction((_topic: string, message: KafkaMessage): Promise => { - return onMessage(message); - }); + if (config.BATCH_PROCESSING_ENABLED) { + logger.info({ + at: 'consumers#connect', + message: 'Batch processing enabled', + }); + updateOnBatchFunction(onBatch); + } else { + logger.info({ + at: 'consumers#connect', + message: 'Batch processing disabled. Processing each message individually', + }); + updateOnMessageFunction((_topic: string, message: KafkaMessage): Promise => { + return onMessage(message); + }); + } logger.info({ at: 'consumers#connect', diff --git a/indexer/services/vulcan/src/index.ts b/indexer/services/vulcan/src/index.ts index 72d0472d93..be4429a4e4 100644 --- a/indexer/services/vulcan/src/index.ts +++ b/indexer/services/vulcan/src/index.ts @@ -37,7 +37,7 @@ async function startService(): Promise { connectToRedis(), ]); - await startConsumer(); + await startConsumer(config.BATCH_PROCESSING_ENABLED); logger.info({ at: 'index#start', diff --git a/indexer/services/vulcan/src/lib/on-batch.ts b/indexer/services/vulcan/src/lib/on-batch.ts new file mode 100644 index 0000000000..13b8d95b0b --- /dev/null +++ b/indexer/services/vulcan/src/lib/on-batch.ts @@ -0,0 +1,92 @@ +import { logger, stats } from '@dydxprotocol-indexer/base'; +import { + Batch, + EachBatchPayload, + KafkaMessage, +} from 'kafkajs'; + +import config from '../config'; +import { onMessage } from './on-message'; + +export async function onBatch( + payload: EachBatchPayload, +): Promise { + const batch: Batch = payload.batch; + const topic: string = batch.topic; + const partition: string = batch.partition.toString(); + const metricTags: Record = { topic, partition }; + if (batch.isEmpty()) { + logger.error({ + at: 'on-batch#onBatch', + message: 'Empty batch', + ...metricTags, + }); + return; + } + + const startTime: number = Date.now(); + const firstMessageTimestamp: number = Number(batch.messages[0].timestamp); + const batchTimeInQueue: number = startTime - firstMessageTimestamp; + const batchInfo = { + firstMessageTimestamp: new Date(firstMessageTimestamp).toISOString(), + batchTimeInQueue, + messagesInBatch: batch.messages.length, + firstOffset: batch.firstOffset(), + lastOffset: batch.lastOffset(), + ...metricTags, + }; + + logger.info({ + at: 'on-batch#onBatch', + message: 'Received batch', + ...batchInfo, + }); + stats.timing( + 'vulcan.batch_time_in_queue', + batchTimeInQueue, + metricTags, + ); + + let lastCommitTime: number = startTime; + for (let i = 0; i < batch.messages.length; i++) { + const message: KafkaMessage = batch.messages[i]; + await onMessage(message); + + // Commit every KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS to reduce number of roundtrips, and + // also prevent disconnecting from the broker due to inactivity. + const now: number = Date.now(); + if (now - lastCommitTime > config.KAFKA_BATCH_PROCESSING_COMMIT_FREQUENCY_MS) { + logger.info({ + at: 'on-batch#onBatch', + message: 'Committing offsets and sending heart beat', + ...batchInfo, + }); + payload.resolveOffset(message.offset); + await Promise.all([ + payload.heartbeat(), + // commitOffsetsIfNecessary will respect autoCommitThreshold and will not commit if + // fewer messages than the threshold have been processed since the last commit. + payload.commitOffsetsIfNecessary(), + ]); + lastCommitTime = now; + } + } + + const batchProcessingTime: number = Date.now() - startTime; + logger.info({ + at: 'on-batch#onBatch', + message: 'Finished Processing Batch', + batchProcessingTime, + ...batchInfo, + }); + stats.timing( + 'vulcan.batch_processing_time', + batchProcessingTime, + metricTags, + ); + stats.gauge( + 'vulcan.batch_size', + batch.messages.length, + metricTags, + ); +}