Skip to content

Commit

Permalink
grpcreplay: add Recorder.SetInitial (#51)
Browse files Browse the repository at this point in the history
Provide a separate method to set the initial state, in addition to the
option.
  • Loading branch information
jba authored Jul 22, 2024
1 parent 2442340 commit ead83fb
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 10 deletions.
41 changes: 31 additions & 10 deletions grpcreplay/grpcreplay.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ import (

// A Recorder records RPCs for later playback.
type Recorder struct {
opts *RecorderOptions
mu sync.Mutex
w writer
f *os.File
next int
err error
opts *RecorderOptions
mu sync.Mutex
w writer
f *os.File
initial []byte
wroteHeader bool
next int
err error
}

// RecorderOptions are options for a Recorder.
Expand Down Expand Up @@ -93,10 +95,20 @@ func NewRecorderWriter(w io.Writer, opts *RecorderOptions) (*Recorder, error) {
if err := ww.writeMagic(); err != nil {
return nil, err
}
if err := ww.writeHeader(opts.Initial); err != nil {
return nil, err
// if err := ww.writeHeader(opts.Initial); err != nil {
// return nil, err
// }
return &Recorder{w: ww, opts: opts, initial: opts.Initial, next: 1}, nil
}

// SetInitial is an alternative to [RecorderOptions.Initial] for providing the initial state.
func (r *Recorder) SetInitial(initial []byte) {
r.mu.Lock()
defer r.mu.Unlock()
if r.wroteHeader {
panic("grpcreplay: SetInitial called too late")
}
return &Recorder{w: ww, opts: opts, next: 1}, nil
r.initial = initial
}

// DialOptions returns the options that must be passed to grpc.Dial
Expand Down Expand Up @@ -171,6 +183,13 @@ func (r *Recorder) writeEntry(e *entry) (int, error) {
if r.err != nil {
return 0, r.err
}
if !r.wroteHeader {
if err := r.w.writeHeader(r.initial); err != nil {
r.err = err
return 0, err
}
r.wroteHeader = true
}
err := r.w.writeEntry(e)
if err != nil {
r.err = err
Expand Down Expand Up @@ -423,7 +442,9 @@ func (rep *Replayer) Initial() []byte { return rep.initial }

// Close closes the Replayer.
func (rep *Replayer) Close() error {
rep.conn.Close()
if rep.conn != nil {
rep.conn.Close()
}
return nil
}

Expand Down
37 changes: 37 additions & 0 deletions grpcreplay/grpcreplay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,3 +668,40 @@ func TestOutOfOrderStreamReplay(t *testing.T) {
buf = record(t, "binary", func(t *testing.T, conn *grpc.ClientConn) { run(t, conn, 1, 2) })
replay(t, buf, func(t *testing.T, conn *grpc.ClientConn) { run(t, conn, 2, 1) })
}

func TestSetInitial(t *testing.T) {
srv := newIntStoreServer()
defer srv.stop()

buf := &bytes.Buffer{}
rec, err := NewRecorderWriter(buf, nil)
if err != nil {
t.Fatal(err)
}
rec.SetInitial(initialState)
conn, err := grpc.Dial(srv.Addr,
append([]grpc.DialOption{grpc.WithInsecure()}, rec.DialOptions()...)...)
if err != nil {
t.Fatal(err)
}
defer conn.Close()

client := ipb.NewIntStoreClient(conn)
ctx := context.Background()
if _, err := client.Set(ctx, &ipb.Item{Name: "a", Value: 1}); err != nil {
t.Fatal(err)
}

if err := rec.Close(); err != nil {
t.Fatal(err)
}

rep, err := NewReplayerReader(buf, nil)
if err != nil {
t.Fatal(err)
}
if got, want := rep.Initial(), initialState; !bytes.Equal(got, want) {
t.Errorf("got initial state %q, want %q", got, want)
}

}

0 comments on commit ead83fb

Please sign in to comment.