-
-
Notifications
You must be signed in to change notification settings - Fork 1
Add gRPC task service with streaming results #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds gRPC service capabilities to the worker package by defining a WorkerService proto with task registration and result streaming endpoints. The implementation provides a gRPC server that wraps the existing worker Service to handle task scheduling and stream results back to clients.
- Defines protobuf messages and service for task registration and result streaming
- Implements gRPC server wrapper around existing worker Service
- Adds generated protobuf and gRPC bindings
Reviewed Changes
Copilot reviewed 6 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| proto/worker/v1/worker.proto | Defines WorkerService with RegisterTasks and StreamResults RPCs |
| pkg/worker/v1/worker_grpc.pb.go | Generated gRPC service bindings and client/server interfaces |
| pkg/worker/v1/worker.pb.go | Generated protobuf message types and serialization code |
| grpc.go | Implements GRPCServer that wraps worker Service for gRPC endpoints |
| go.mod | Adds gRPC and protobuf dependencies |
| buf.yaml | Configures buf for protobuf management |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| } | ||
|
|
||
| func RegisterWorkerServiceServer(s grpc.ServiceRegistrar, srv WorkerServiceServer) { | ||
| // If the following call pancis, it indicates UnimplementedWorkerServiceServer was |
Copilot
AI
Aug 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The word "pancis" should be "panics".
| // If the following call pancis, it indicates UnimplementedWorkerServiceServer was | |
| // If the following call panics, it indicates UnimplementedWorkerServiceServer was |
| for _, taskReq := range req.GetTasks() { | ||
| payload := taskReq.GetPayload() | ||
|
|
||
| task, err := NewTask(ctx, func(ctx context.Context, args ...any) (any, error) { |
Copilot
AI
Aug 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The task function always returns the payload regardless of context or arguments, which doesn't align with typical task execution patterns. Consider implementing actual task logic or storing the payload for later execution.
| task, err := NewTask(ctx, func(ctx context.Context, args ...any) (any, error) { | |
| task, err := NewTask(ctx, func(ctx context.Context, args ...any) (any, error) { | |
| if len(args) > 0 { | |
| return args[0], nil | |
| } |
| func (s *GRPCServer) StreamResults(req *workerpb.StreamResultsRequest, stream workerpb.WorkerService_StreamResultsServer) error { | ||
| for res := range s.svc.StreamResults() { | ||
| out := &workerpb.StreamResultsResponse{ | ||
| Id: res.Task.ID.String(), |
Copilot
AI
Aug 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The StreamResults method doesn't use the request parameter. Consider adding filtering options to the StreamResultsRequest to allow clients to specify which results they want to receive.
| Id: res.Task.ID.String(), | |
| // Assume req.Ids is a slice of task IDs to filter by. If empty, stream all results. | |
| idFilter := make(map[string]struct{}) | |
| for _, id := range req.GetIds() { | |
| idFilter[id] = struct{}{} | |
| } | |
| for res := range s.svc.StreamResults() { | |
| taskID := res.Task.ID.String() | |
| // If filter is set, skip results not in the filter. | |
| if len(idFilter) > 0 { | |
| if _, ok := idFilter[taskID]; !ok { | |
| continue | |
| } | |
| } | |
| out := &workerpb.StreamResultsResponse{ | |
| Id: taskID, |
Summary
Testing
buf lintpre-commit run --files grpc.go proto/worker/v1/worker.proto buf.yaml buf.gen.yaml pkg/worker/v1/worker.pb.go pkg/worker/v1/worker_grpc.pb.go go.mod go.sum(fails: go: no such tool "covdata")go test ./...https://chatgpt.com/codex/tasks/task_e_68a0e3aadd8083309280653889c93de6