-
Notifications
You must be signed in to change notification settings - Fork 0
Separate API and Broker layer for decoupling #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…messages. Changed message state enum to integer one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Worth considering though. View full project report here.
return state.name | ||
return f"UNKNOWN({value})" | ||
|
||
@dataclass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dataclass | |
@dataclass(frozen=True) |
Use frozen=True
to make the dataclasses
immutable and hashable. More info.
"enqueued_at": self.enqueued_at | ||
} | ||
|
||
@dataclass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dataclass | |
@dataclass(frozen=True) |
Likewise, Use frozen=True
.
if _version_not_supported: | ||
raise RuntimeError( | ||
f'The grpc package installed is at version {GRPC_VERSION},' | ||
+ f' but the generated code in broker_pb2_grpc.py depends on' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+ f' but the generated code in broker_pb2_grpc.py depends on' | |
+ ' but the generated code in broker_pb2_grpc.py depends on' |
f-string is unnecessary here. This can just be a string. More.
) | ||
|
||
|
||
class BrokerStub(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class BrokerStub(object): | |
class BrokerStub: |
BrokerStub
inherits from object
by default, so explicitly inheriting from object is redundant. Removing it keeps the code simpler. More details.
_registered_method=True) | ||
|
||
|
||
class BrokerServicer(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class BrokerServicer(object): | |
class BrokerServicer: |
Similarly, Explicitly inheriting from object is redundant.
|
||
|
||
# This class is part of an EXPERIMENTAL API. | ||
class Broker(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class Broker(object): | |
class Broker: |
Again, Explicitly inheriting from object is redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Separates the messaging system into decoupled API and broker layers using gRPC communication. The broker layer manages message queuing and state while API nodes communicate with the broker via gRPC instead of direct method calls.
- Introduced gRPC protocol definitions for broker-client communication with message streaming, publishing, and acknowledgment capabilities
- Created dedicated broker service that runs independently on port 50051 with the message service
- Refactored API layer to act as a client that communicates with the broker via gRPC calls instead of direct service access
Reviewed Changes
Copilot reviewed 13 out of 16 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
run.sh | Launch script to start both broker and API node services with auto-reload |
proto/broker.proto | gRPC service definition for broker communication protocol |
proto/broker_pb2.py | Generated Python protobuf message classes |
proto/broker_pb2_grpc.py | Generated gRPC client and server stubs |
proto/api_node.proto | gRPC service definition for API node status (optional) |
models.py | Removed original models file |
broker/models.py | Moved and updated message models with integer-based enum values |
broker/persistence_service.py | Updated imports and enum handling for new model structure |
broker/message_storage.py | Updated import path for moved models |
broker/message_service.py | Updated import paths for broker module structure |
broker/broker.py | New gRPC broker service implementation |
app.py | Removed original Flask app |
api_node/app.py | New Flask API that communicates with broker via gRPC |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
"payload": msg.payload, | ||
"timestamp": msg.timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BrokerMessage proto doesn't have 'payload' or 'timestamp' fields. Based on the proto definition, these should be 'data' and 'enqueued_at' respectively.
Copilot uses AI. Check for mistakes.
"payload": m.payload, | ||
"timestamp": m.timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BrokerMessage proto doesn't have 'payload' or 'timestamp' fields. These should be 'data' and 'enqueued_at' to match the proto definition.
Copilot uses AI. Check for mistakes.
data: object | ||
enqueued_at: float = None | ||
retries: int = 0 | ||
state: MessageState = MessageState.ENQUEUED.value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value should be the enum member MessageState.ENQUEUED
, not its integer value. Using .value
makes the field an integer instead of the enum type.
state: MessageState = MessageState.ENQUEUED.value | |
state: MessageState = MessageState.ENQUEUED |
Copilot uses AI. Check for mistakes.
Created separate API and broker layers. Message service will run with the broker and any API node can access it using gRPC.