Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming response #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,16 @@ Rules are defined in the rules section. Each rule consists of the following fiel

The `Respond` section contains the response for the request. The respond section may contain the following fields:

| Field | Required | Description |
|-------------|----------------------------|---------------------------------------------------------------------------------------------------------------------|
| body | optional | The body of the response. This must be a protobuf snippet that defines the response message with values to be sent. |
| metadata | optional | The metadata to be sent as a response. |
| status | optional | The gRPC status to be sent as a response. |
| status.code | true, if status is present | The gRPC status code to be sent as a response. |
| status.msg | true, if status is present | The gRPC status message to be sent as a response. |
| Field | Required | Description |
|---------------|----------------------------|---------------------------------------------------------------------------------------------------------------------------------|
| stream | optional | The stream of responses to be sent as a response. |
| stream.def | true, if stream is present | The stream definition of the response message. |
| stream.values | optional | Values of the stream message to be sent to the client. Values are just a set of YAML-specified fields, i.e. `[]map[string]any`. |
| body | optional | The body of the response. This must be a protobuf snippet that defines the response message with values to be sent. |
| metadata | optional | The metadata to be sent as a response. |
| status | optional | The gRPC status to be sent as a response. |
| status.code | true, if status is present | The gRPC status code to be sent as a response. |
| status.msg | true, if status is present | The gRPC status message to be sent as a response. |

The configuration file is being watched for changes, and the server will reload the configuration file if it changes.

Expand Down
10 changes: 10 additions & 0 deletions _example/mock.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ not-matched:
status: { code: "NOT_FOUND", message: "some custom not found" }

rules:
# next method is an example of server streaming
- match: { uri: "com.github.Semior001.groxy.example.mock.ExampleService/ServerStream" }
respond:
stream:
def: "message ServerStreamResponse { option (groxypb.target) = true; string message = 1; }"
values:
- message: "first message"
- message: "second message"
- message: "third message"

# The next rule will respond with a predefined message.
- match:
uri: "com.github.Semior001.groxy.example.mock.ExampleService/Stub"
Expand Down
1 change: 1 addition & 0 deletions _example/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ service ExampleService {
rpc Stub(StubRequest) returns (SomeOtherResponse);
rpc Error(google.protobuf.Empty) returns (google.protobuf.Empty);
rpc NotFound(google.protobuf.Empty) returns (google.protobuf.Empty);
rpc ServerStream(google.protobuf.Empty) returns (stream SomeOtherResponse);
}

message Dependency {
Expand Down
34 changes: 12 additions & 22 deletions pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package discovery

import (
"context"
"fmt"
"regexp"
"slices"
"strconv"
"strings"

"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/protoadapt"
)

// Provider provides routing rules for the Service.
Expand All @@ -29,10 +27,16 @@ type Provider interface {

// Mock contains the details of how the handler should reply to the downstream.
type Mock struct {
Header metadata.MD
Trailer metadata.MD
Body proto.Message
Status *status.Status
Header metadata.MD
Trailer metadata.MD
Messages []proto.Message
Status *status.Status
}

// String returns the string representation of the mock.
func (m Mock) String() string {
return fmt.Sprintf("mock{header: %d; trailer: %d; messages: %d; status: %q}",
len(m.Header), len(m.Trailer), len(m.Messages), m.Status)
}

// Rule is a routing rule for the Service.
Expand All @@ -49,21 +53,7 @@ type Rule struct {
}

// String returns the name of the rule.
func (r *Rule) String() string {
sb := &strings.Builder{}
_, _ = sb.WriteString("(")
_, _ = sb.WriteString(r.Name)
_, _ = sb.WriteString("; ")
_, _ = sb.WriteString(strconv.Itoa(len(r.Match.IncomingMetadata)))
_, _ = sb.WriteString(" metadata")
if r.Match.Message != nil {
_, _ = sb.WriteString("; with body: {")
_, _ = sb.WriteString(protoadapt.MessageV1Of(r.Match.Message).String())
_, _ = sb.WriteString("}")
}
_, _ = sb.WriteString(")")
return sb.String()
}
func (r *Rule) String() string { return fmt.Sprintf("rule{name: %s}", r.Name) }

// RequestMatcher defines parameters to match the request to the rule.
type RequestMatcher struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestRule_String(t *testing.T) {
// for some reason, the proto message string sometimes generates two spaces between fields
got = strings.Replace(got, " ", " ", -1)

assert.Equal(t, "(name; 2 metadata; with body: {request_id:\"request-id\" serving_data:\"serving-data\"})", got)
assert.Equal(t, "rule{name: name}", got)
}

func TestRequestMatcher_Matches(t *testing.T) {
Expand Down
15 changes: 11 additions & 4 deletions pkg/discovery/fileprovider/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,27 @@ type Rule struct {
Match struct {
URI string `yaml:"uri"`
Header map[string]string `yaml:"header"`
Body *string `yaml:"body"`
Body *string `yaml:"body,omitempty"`
} `yaml:"match"`
Respond Respond `yaml:"respond"`
}

// Values is an arbitrary structure.
type Values map[string]interface{}

// Respond specifies how the service should respond to the request.
type Respond struct {
Body *string `yaml:"body"`
Stream *struct {
Def string `yaml:"def"`
Values []Values `yaml:"values"`
} `yaml:"stream"`
Body *string `yaml:"body,omitempty"`
Metadata *struct {
Header map[string]string `yaml:"header"`
Trailer map[string]string `yaml:"trailer"`
} `yaml:"metadata"`
} `yaml:"metadata,omitempty"`
Status *struct {
Code string `yaml:"code"`
Message string `yaml:"message"`
} `yaml:"status"`
} `yaml:"status,omitempty"`
}
23 changes: 19 additions & 4 deletions pkg/discovery/fileprovider/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"gopkg.in/yaml.v3"
)

Expand Down Expand Up @@ -112,22 +113,36 @@ func (d *File) Rules(context.Context) ([]*discovery.Rule, error) {
}

switch {
case r.Status != nil && r.Body != nil:
return nil, fmt.Errorf("can't set both status and body in rule")
case r.Status != nil && r.Body != nil || r.Stream != nil && r.Body != nil || r.Status != nil && r.Stream != nil:
return nil, fmt.Errorf("can't set have multiple response variants in rule")
case r.Stream != nil:
tmpl, err := protodef.BuildMessage(r.Stream.Def)
if err != nil {
return nil, fmt.Errorf("parse stream message template: %w", err)
}

for _, v := range r.Stream.Values {
msg, err := protodef.SetValues(tmpl, v)
if err != nil {
return nil, fmt.Errorf("set values to stream message: %w", err)
}
result.Messages = append(result.Messages, msg)
}
case r.Status != nil:
var code codes.Code
if err = code.UnmarshalJSON([]byte(fmt.Sprintf("%q", r.Status.Code))); err != nil {
return nil, fmt.Errorf("unmarshal status code: %w", err)
}
result.Status = status.New(code, r.Status.Message)
case r.Body != nil:
if result.Body, err = protodef.BuildMessage(*r.Body); err != nil {
msg, err := protodef.BuildMessage(*r.Body)
if err != nil {
return nil, fmt.Errorf("build respond message: %w", err)
}
result.Messages = []proto.Message{msg}
default:
return nil, fmt.Errorf("empty response in rule")
}

return result, nil
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/discovery/fileprovider/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,16 @@ func TestFile_Rules(t *testing.T) {

require.Len(t, rules, 5)
assert.NotNil(t, rules[0].Match.Message)
assert.NotNil(t, rules[0].Mock.Body)
assert.NotNil(t, rules[0].Mock.Messages)
assert.NotNil(t, rules[1].Match.Message)
assert.NotNil(t, rules[1].Mock.Body)
assert.NotNil(t, rules[2].Mock.Body)
assert.NotNil(t, rules[1].Mock.Messages)
assert.NotNil(t, rules[2].Mock.Messages)

rules[0].Match.Message = nil
rules[0].Mock.Body = nil
rules[0].Mock.Messages = nil
rules[1].Match.Message = nil
rules[1].Mock.Body = nil
rules[2].Mock.Body = nil
rules[1].Mock.Messages = nil
rules[2].Mock.Messages = nil

assert.Equal(t, []*discovery.Rule{
{
Expand Down
11 changes: 4 additions & 7 deletions pkg/protodef/definer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,9 @@ func (b *Definer) BuildTarget(def string) (proto.Message, error) {
}

func (b *Definer) enrich(def string) string {
sb := &strings.Builder{}
_, _ = fmt.Fprintln(sb, `syntax = "proto3";`)
_, _ = fmt.Fprintln(sb, `import "groxypb/annotations.proto";`)
_, _ = fmt.Fprintln(sb)
_, _ = fmt.Fprintln(sb, def)
return sb.String()
const header = `syntax = "proto3"; import "groxypb/annotations.proto";
` // intentional newline to index the line correctly
return header + def
}

func (b *Definer) parseDefinition(def string) (*desc.FileDescriptor, error) {
Expand Down Expand Up @@ -96,7 +93,7 @@ func (b *Definer) parseDefinition(def string) (*desc.FileDescriptor, error) {
if errors.As(err, &esp) {
pos := esp.GetPosition()
return nil, errSyntax{
Line: pos.Line - 3, // sub 3 lines to remove enriched parts
Line: pos.Line - 1, // sub 1 line to remove enriched parts
Col: pos.Col, Err: esp.Unwrap().Error(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/protodef/definer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestBuildMessage(t *testing.T) {
{
name: "syntax error",
def: "message StubResponse {",
wantErr: errSyntax{Line: 2, Col: 1, Err: "syntax error: unexpected $end"},
wantErr: errSyntax{Line: 1, Col: 23, Err: "syntax error: unexpected $end"},
},
{
name: "multiple target messages",
Expand Down
26 changes: 26 additions & 0 deletions pkg/protodef/protodef.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package protodef

import (
"encoding/json"
"fmt"

"github.com/jhump/protoreflect/dynamic"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/protoadapt"
)

var globalDefiner = NewDefiner()
Expand All @@ -15,3 +20,24 @@ type Option func(*Definer)

// LoadOS sets definer to load the OS files, if they were requested in the definition.
func LoadOS(d *Definer) { d.loadFromOS = true }

// SetValues sets a set of arbitrary values to the message.
func SetValues(tmpl proto.Message, vals map[string]any) (proto.Message, error) {
// FIXME: this is ugly, we need to set values explicitly for better performance
tmpl = proto.Clone(tmpl)
dmsg, err := dynamic.AsDynamicMessage(protoadapt.MessageV1Of(tmpl))
if err != nil {
return nil, fmt.Errorf("convert to dynamic message: %w", err)
}

bts, err := json.Marshal(vals)
if err != nil {
return nil, fmt.Errorf("marshal values: %w", err)
}

if err = dmsg.UnmarshalJSON(bts); err != nil {
return nil, fmt.Errorf("unmarshal values: %w", err)
}

return protoadapt.MessageV2Of(dmsg), nil
}
61 changes: 61 additions & 0 deletions pkg/protodef/protodef_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package protodef

import (
"testing"

"github.com/Semior001/groxy/pkg/protodef/testdata"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

func TestSetValues(t *testing.T) {
type want struct {
msg proto.Message
err error
}
tests := []struct {
name string
tmpl proto.Message
vals map[string]any
want want
}{
{
name: "simple message",
tmpl: &testdata.Response{},
vals: map[string]any{"value": "test"},
want: want{msg: &testdata.Response{Value: "test"}},
},
{
name: "nested message",
tmpl: &testdata.Response{},
vals: map[string]any{"nested": map[string]any{"nested_value": "test"}},
want: want{msg: &testdata.Response{Nested: &testdata.Nested{NestedValue: "test"}}},
},
{
name: "nested message with repeated field",
tmpl: &testdata.Response{},
vals: map[string]any{"nesteds": []any{
map[string]any{"nested_value": "test1"},
map[string]any{"nested_value": "test2"},
}},
want: want{msg: &testdata.Response{Nesteds: []*testdata.Nested{
{NestedValue: "test1"},
{NestedValue: "test2"},
}}},
},
{
name: "enum",
tmpl: &testdata.Response{},
vals: map[string]any{"enum": "STUB_ENUM_SECOND"},
want: want{msg: &testdata.Response{Enum: testdata.Enum_STUB_ENUM_SECOND}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := SetValues(tt.tmpl, tt.vals)
require.ErrorIs(t, err, tt.want.err)
assert.Equal(t, mustProtoMarshal(t, tt.want.msg), mustProtoMarshal(t, got))
})
}
}
10 changes: 7 additions & 3 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func (s *Server) handle(_ any, stream grpc.ServerStream) error {
func (s *Server) mock(stream grpc.ServerStream, reply *discovery.Mock) error {
ctx := stream.Context()

slog.DebugContext(ctx, "mocking", slog.String("reply", reply.String()))

if len(reply.Header) > 0 {
if err := stream.SetHeader(reply.Header); err != nil {
slog.WarnContext(ctx, "failed to set header to the client", slogx.Error(err))
Expand All @@ -140,9 +142,11 @@ func (s *Server) mock(stream grpc.ServerStream, reply *discovery.Mock) error {
}

switch {
case reply.Body != nil:
if err := stream.SendMsg(reply.Body); err != nil {
return status.Errorf(codes.Internal, "{groxy} failed to send message: %v", err)
case len(reply.Messages) > 0:
for _, msg := range reply.Messages {
if err := stream.SendMsg(msg); err != nil {
return status.Errorf(codes.Internal, "{groxy} failed to send message: %v", err)
}
}
case reply.Status != nil:
return reply.Status.Err()
Expand Down
Loading