Skip to content

Commit

Permalink
fix: add fail index to streaming sequence (#1364)
Browse files Browse the repository at this point in the history
* feat: added fail_index to AttemptSequenceRequest

* cleanup: add test, regen protos/support files

* revert changes to non sequence client

* restore generated files

* fix some comments

* fix a few more comments

* clarify comment

---------

Co-authored-by: Gal Zahavi <38544478+galz10@users.noreply.github.com>
  • Loading branch information
leahecole and galz10 committed Aug 14, 2023
1 parent 3188c8a commit 567e756
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 3 deletions.
9 changes: 8 additions & 1 deletion schema/google/showcase/v1beta1/sequence.proto
Expand Up @@ -217,14 +217,21 @@ message AttemptSequenceRequest {
(google.api.resource_reference).type = "showcase.googleapis.com/Sequence",
(google.api.field_behavior) = REQUIRED
];

}

message AttemptStreamingSequenceRequest {
string name = 1 [
(google.api.resource_reference).type = "showcase.googleapis.com/StreamingSequence",
(google.api.field_behavior) = REQUIRED
];


// used to send the index of the last failed message
// in the string "content" of an AttemptStreamingSequenceResponse
// needed for stream resumption logic testing
int32 last_fail_index = 2 [
(google.api.field_behavior) = OPTIONAL
];
}

// The response message for the Echo methods.
Expand Down
8 changes: 6 additions & 2 deletions server/services/sequence_service.go
Expand Up @@ -202,6 +202,7 @@ func (s *sequenceServerImpl) CreateStreamingSequence(ctx context.Context, in *pb
func (s *sequenceServerImpl) AttemptStreamingSequence(in *pb.AttemptStreamingSequenceRequest, stream pb.SequenceService_AttemptStreamingSequenceServer) error {
received := time.Now()
name := in.GetName()
lastFailIndex := in.GetLastFailIndex()
if name == "" {
return status.Errorf(
codes.InvalidArgument,
Expand Down Expand Up @@ -241,7 +242,11 @@ func (s *sequenceServerImpl) AttemptStreamingSequence(in *pb.AttemptStreamingSeq
st = status.New(codes.OutOfRange, "Attempt exceeded predefined responses")
}

for idx, word := range content {
if lastFailIndex < 0 {
lastFailIndex = 0
}

for idx, word := range content[lastFailIndex:] {
if idx >= respIndex {
break
}
Expand Down Expand Up @@ -282,7 +287,6 @@ func (s *sequenceServerImpl) AttemptStreamingSequence(in *pb.AttemptStreamingSeq
AttemptDelay: attDelay,
Status: st.Proto(),
})

return st.Err()

}
Expand Down
67 changes: 67 additions & 0 deletions server/services/sequence_service_test.go
Expand Up @@ -354,6 +354,73 @@ func TestStreamingSequenceRetry(t *testing.T) {
}
}

func TestStreamingSequenceWithLastFailIndex(t *testing.T) {
s := NewSequenceServer()
responses := []*pb.StreamingSequence_Response{
{
Status: status.New(codes.Unavailable, "Unavailable").Proto(),
Delay: ptypes.DurationProto(1 * time.Second),
},
{
Status: status.New(codes.Unavailable, "Unavailable").Proto(),
Delay: ptypes.DurationProto(2 * time.Second),
},
{
Status: status.New(codes.OK, "OK").Proto(),
},
}

seq, err := s.CreateStreamingSequence(context.Background(), &pb.CreateStreamingSequenceRequest{
StreamingSequence: &pb.StreamingSequence{Responses: responses, Content: "Hello World, nice to see you"},
})
if err != nil {
t.Errorf("CreateSequence(retry): unexpected err %+v", err)
}

timeout := 5 * time.Second
_, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

delay := 100 * time.Millisecond
stream := &mockStreamSequence{}

for n, r := range responses {
res := status.FromProto(r.GetStatus())
// by passing the LastFailIndex as 3, we force the response to be the 3rd index of content, which is "to"
// the number of responses will still be the same though - the length of the sequence
err = s.AttemptStreamingSequence(&pb.AttemptStreamingSequenceRequest{Name: seq.GetName(), LastFailIndex: 3}, stream)
if c := status.Code(err); c != res.Code() {
t.Errorf("%s: status #%d was %v wanted %v", t.Name(), n, c, res.Code())
}

if n != len(responses)-1 {
time.Sleep(delay)
delay *= 2
}
}

r := streamingReport(seq.GetName())
report, err := s.GetStreamingSequenceReport(context.Background(), &pb.GetStreamingSequenceReportRequest{Name: r})
if err != nil {
t.Errorf("GetSequenceReport(retry): unexpected err %+v", err)
}

attempts := report.GetAttempts()
if len(attempts) != len(responses) {
t.Errorf("%s: expected number of attempts to be %d but was %d", t.Name(), len(responses), len(attempts))
}

for n, a := range attempts {
if got, want := a.GetAttemptNumber(), int32(n); got != want {
t.Errorf("%s: expected attempt #%d but was #%d", t.Name(), want, got)
}

if got, want := a.GetStatus().GetCode(), responses[n].GetStatus().GetCode(); got != want {
t.Errorf("%s: expected response %v but was %v", t.Name(), want, got)
}
}
}

func TestStreamingSequenceOutOfRange(t *testing.T) {
s := NewSequenceServer()

Expand Down

0 comments on commit 567e756

Please sign in to comment.