-
Notifications
You must be signed in to change notification settings - Fork 0
Apply adapter design pattern #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Removed the adapter abstraction and related files for event handling. The event client now directly manages in-memory (socket.io) and Kafka backends, simplifying initialization, publishing, and subscription logic. Updated type definitions and streamlined the API. Also improved metrics tracking in the server event implementation. Updated package.json to reflect new entry points and removed obsolete scripts.
Introduced various Prometheus metrics using prom-client to track event publishing, subscriptions, payload sizes, errors, callback durations, and throughput. Metrics are updated throughout the event publishing and subscription lifecycle to improve observability and monitoring.
Added 'client' to the exports in Event.ts to make it available for import in other modules.
Introduces an async disconnect() method to the event API for both in-memory and Kafka transports, ensuring proper cleanup of resources. Refactors Kafka consumer logic to use a shared consumer and topic tracking, improving subscription management and resource usage. Updates type definitions and bumps package version to 1.1.8.
Improved event publishing by validating event types, tracking metrics, and refactoring Kafka and socket handling. Enhanced subscription management with better metric updates and streamlined callback execution. Cleaned up formatting and removed unnecessary code.
Introduces periodic monitoring of Kafka topic backlogs, exposing a Prometheus gauge for the number of unprocessed events per topic. Adds methods to start, stop, and manually check backlog monitoring, and ensures metrics are updated on topic subscription and disconnect. Also refactors some Kafka producer/consumer usage for clarity.
Replaces per-topic Kafka consumers with a single shared consumer that dynamically subscribes to topics and restarts as needed. Adds Prometheus metrics for event publishing, subscriptions, callback processing, payload size, error rates, and throughput. Updates API to export Prometheus client and adds restartKafkaConsumer method. Bumps package version to 1.1.11.
Wraps subscribedTopics in Array.from() to ensure proper iteration, likely addressing cases where subscribedTopics is a Set or non-array iterable.
Introduces functions to monitor Kafka backlog size, including periodic and manual checks, and exposes metrics for events waiting to be processed. Also adjusts consumer concurrency and updates package version.
Replaces monolithic Event implementation with modular EventManager, KafkaAdapter, and SocketAdapter. Event logic and metrics are separated into dedicated files, improving maintainability and extensibility. Updates package.json exports to use new entry points. Removes legacy Event files.
Introduces a modular event system supporting Kafka and Socket.io adapters, including event publishing, subscription management, metrics collection, and backlog monitoring. Updates build scripts and TypeScript configuration for proper compilation and type declarations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the event system to implement the adapter design pattern, creating separate adapters for different transport mechanisms (Socket.IO and Kafka). The refactoring introduces a unified interface through an EventAdapter interface and consolidates event management into an EventManager class.
- Implements adapter pattern with EventAdapter interface for different transport types
- Creates new EventManager class to handle event publishing, subscription, and metrics
- Refactors existing code to use the new adapter-based architecture
Reviewed Changes
Copilot reviewed 19 out of 30 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Event.ts | Updates import statement and adds metric tracking comments |
| package.json | Updates package name, version, and build configuration |
| client/types/types.ts | Adds EventAdapter interface definition |
| client/metrics.ts | Creates new EventMetrics class for centralized metrics handling |
| client/index.ts | New main client entry point with adapter exports |
| client/eventManager.ts | Core EventManager class implementing adapter pattern |
| client/adapters/SocketAdapter.ts | Socket.IO adapter implementation |
| client/adapters/KafkaAdapter.ts | Kafka adapter implementation with improved consumer management |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
client/adapters/KafkaAdapter.ts
Outdated
| const producer = this.kafka.producer(); | ||
| await producer.connect(); | ||
|
|
||
| try { | ||
| const messages = types.map((type) => ({ | ||
| await producer.send({ | ||
| topic: type, | ||
| messages: [ | ||
| { | ||
| value: JSON.stringify(payload), | ||
| timestamp: Date.now().toString(), | ||
| }, | ||
| ], | ||
| })); | ||
| await Promise.all(messages.map((msg) => this.kafkaProducer!.send(msg))); | ||
| } catch (error: any) { | ||
| console.error("Failed to publish to Kafka:", error); | ||
| if (error.message?.includes("disconnected")) { | ||
| try { | ||
| await this.kafkaProducer.connect(); | ||
| await this.publish(...args); | ||
| } catch (reconnectError: any) { | ||
| throw new Error(`Failed to reconnect producer: ${reconnectError.message}`); | ||
| } | ||
| } else { | ||
| throw error; | ||
| } | ||
| messages: [{ value: JSON.stringify(payload) }], | ||
| }); | ||
| } finally { | ||
| await producer.disconnect(); | ||
| } |
Copilot
AI
Sep 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating and connecting a new producer for each publish call is inefficient. Consider reusing the producer instance that's already created in the constructor or connect method to avoid connection overhead.
| setTimeout(() => { | ||
| const endTimer = this.metrics.recordCallback(type); | ||
| try { | ||
| callback(payload); | ||
| } catch (error) { | ||
| console.error(`Error in callback for ${type}:`, error); | ||
| } | ||
| endTimer(); | ||
| }, 0); |
Copilot
AI
Sep 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using setTimeout with 0 delay for async callback execution may cause issues with error handling and could lead to unexpected execution order. Consider using Promise.resolve().then() or making this properly async if the intent is to defer execution.
| const consumerOffset = parseInt(partitionOffset.offset); | ||
| const latestOffsetValue = parseInt(latestOffset.offset); | ||
| totalLag += Math.max(0, latestOffsetValue - consumerOffset); |
Copilot
AI
Sep 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using parseInt on Kafka offsets may cause precision loss for large offset values. Kafka offsets are typically handled as strings or BigInt values. Consider using BigInt or proper string-based arithmetic for offset calculations.
Updated type signatures for publish and subscribe methods in EventManager and event exports to require payloads to extend object, improving type safety. Also updated package version to 1.1.20.
Changed event type handling in publish to merge multiple types into a single string separated by underscores, ensuring consistent event naming. Updated all relevant type signatures to use object instead of any for payloads and callbacks, improving type safety. Also updated the EventAdapter interface and related adapters, and incremented the package version.
Introduced a 'test' script in package.json that outputs a placeholder message. This prepares the project for future test integration.
Introduces Pushgateway support to EventMetrics, allowing metrics to be periodically pushed to a Prometheus Pushgateway. Adds related methods to EventManager and exposes PushgatewayConfig type in the public API.
KafkaAdapter now receives a static list of topics at construction and no longer manages dynamic topic subscriptions. EventManager provides the topic list and handles callback registration/removal in memory. This simplifies consumer management and improves separation of concerns.
… into none-adapter
Simplifies KafkaAdapter by removing per-topic subscription logic and managing a single consumer for all topics except internal ones. Updates eventManager to use a static list of Kafka topics for backlog monitoring and adapts to the new KafkaAdapter API. This improves efficiency and maintainability by centralizing topic management and reducing redundant consumer restarts.
… into none-adapter
Introduces Pushgateway integration for metrics reporting, including methods to start, stop, and push metrics to a Prometheus Pushgateway. Refactors KafkaAdapter to remove internal topic tracking, use regex for topic subscription, and improve backlog calculation. Updates type definitions and public API to expose Pushgateway configuration and control methods.
Introduces TxEventQAdapter for Oracle Advanced Queuing event transport, updates EventManager and types to support 'txeventq', and exposes the adapter in the public API. Adds docker-compose-oracle.yml for local Oracle DB setup and includes 'oracledb' as a dependency.
Replaces usage of the EventMessage type with a generic payload structure in TxEventQAdapter. Updates queue and message handling logic to use 'topic' instead of 'eventType', simplifying type usage and removing dependency on EventMessage.
Introduces TxEventQAdapter to support Oracle Advanced Queuing (AQ) as an event adapter. Updates EventManager, types, and exports to allow initialization and usage of TxEventQAdapter. Also increases KafkaAdapter's partitionsConsumedConcurrently to 160 and sets a default consumer name for TxEventQAdapter. Updates package version and dependencies.
Removed static queueName from configuration and refactored TxEventQAdapter to use dynamic queue names based on event type for publish/subscribe. Updated related types and EventManager initialization. Added autoCommit option and improved message consumption logic. Also added pub_test.js and test.js for usage examples.
Simplified message dequeuing logic in TxEventQAdapter by removing batch processing and always using deqOne with a fixed wait time of 5000ms. Improved error handling and message processing, and updated package version to 1.1.38.
Eliminated a console.log statement used for debugging payloads in the message handler. This cleans up console output during normal operation.
Introduces a dedicated client.Registry for EventMetrics and registers all metrics to it. This improves encapsulation and avoids polluting the global registry, ensuring metrics are scoped to the EventMetrics instance.
EventMetrics now uses its own prom-client Registry for metric registration and export, improving isolation and compatibility with multiple metric sources. Also removed a debug log from TxEventQAdapter and bumped package version to 1.1.39.
Introduces getHistory to TxEventQAdapter for retrieving recent event history per topic. EventManager now supports backlog/history monitoring for both Kafka and TxEventQ adapters, updating a unified backlog metric. Metrics are refactored to use a generic eventBacklog gauge instead of Kafka-specific naming, and Pushgateway integration is improved for periodic metric pushes.
Implemented backlog calculation for TxEventQAdapter and integrated it into EventManager's backlog monitoring. Refactored metrics to generalize backlog tracking (renamed kafkaBacklog to eventBacklog) and updated related methods to support multiple adapters. Backlog monitoring now supports both Kafka and TxEventQ adapters.
No description provided.