From ead83fba84fa33a2dec811ec2d9e91334f26aa98 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Mon, 22 Jul 2024 09:38:01 -0400 Subject: [PATCH] grpcreplay: add Recorder.SetInitial (#51) Provide a separate method to set the initial state, in addition to the option. --- grpcreplay/grpcreplay.go | 41 ++++++++++++++++++++++++++--------- grpcreplay/grpcreplay_test.go | 37 +++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 10 deletions(-) diff --git a/grpcreplay/grpcreplay.go b/grpcreplay/grpcreplay.go index 6a5c59e..6f95d82 100644 --- a/grpcreplay/grpcreplay.go +++ b/grpcreplay/grpcreplay.go @@ -36,12 +36,14 @@ import ( // A Recorder records RPCs for later playback. type Recorder struct { - opts *RecorderOptions - mu sync.Mutex - w writer - f *os.File - next int - err error + opts *RecorderOptions + mu sync.Mutex + w writer + f *os.File + initial []byte + wroteHeader bool + next int + err error } // RecorderOptions are options for a Recorder. @@ -93,10 +95,20 @@ func NewRecorderWriter(w io.Writer, opts *RecorderOptions) (*Recorder, error) { if err := ww.writeMagic(); err != nil { return nil, err } - if err := ww.writeHeader(opts.Initial); err != nil { - return nil, err + // if err := ww.writeHeader(opts.Initial); err != nil { + // return nil, err + // } + return &Recorder{w: ww, opts: opts, initial: opts.Initial, next: 1}, nil +} + +// SetInitial is an alternative to [RecorderOptions.Initial] for providing the initial state. +func (r *Recorder) SetInitial(initial []byte) { + r.mu.Lock() + defer r.mu.Unlock() + if r.wroteHeader { + panic("grpcreplay: SetInitial called too late") } - return &Recorder{w: ww, opts: opts, next: 1}, nil + r.initial = initial } // DialOptions returns the options that must be passed to grpc.Dial @@ -171,6 +183,13 @@ func (r *Recorder) writeEntry(e *entry) (int, error) { if r.err != nil { return 0, r.err } + if !r.wroteHeader { + if err := r.w.writeHeader(r.initial); err != nil { + r.err = err + return 0, err + } + r.wroteHeader = true + } err := r.w.writeEntry(e) if err != nil { r.err = err @@ -423,7 +442,9 @@ func (rep *Replayer) Initial() []byte { return rep.initial } // Close closes the Replayer. func (rep *Replayer) Close() error { - rep.conn.Close() + if rep.conn != nil { + rep.conn.Close() + } return nil } diff --git a/grpcreplay/grpcreplay_test.go b/grpcreplay/grpcreplay_test.go index 46fd86c..b7b8bd4 100644 --- a/grpcreplay/grpcreplay_test.go +++ b/grpcreplay/grpcreplay_test.go @@ -668,3 +668,40 @@ func TestOutOfOrderStreamReplay(t *testing.T) { buf = record(t, "binary", func(t *testing.T, conn *grpc.ClientConn) { run(t, conn, 1, 2) }) replay(t, buf, func(t *testing.T, conn *grpc.ClientConn) { run(t, conn, 2, 1) }) } + +func TestSetInitial(t *testing.T) { + srv := newIntStoreServer() + defer srv.stop() + + buf := &bytes.Buffer{} + rec, err := NewRecorderWriter(buf, nil) + if err != nil { + t.Fatal(err) + } + rec.SetInitial(initialState) + conn, err := grpc.Dial(srv.Addr, + append([]grpc.DialOption{grpc.WithInsecure()}, rec.DialOptions()...)...) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + client := ipb.NewIntStoreClient(conn) + ctx := context.Background() + if _, err := client.Set(ctx, &ipb.Item{Name: "a", Value: 1}); err != nil { + t.Fatal(err) + } + + if err := rec.Close(); err != nil { + t.Fatal(err) + } + + rep, err := NewReplayerReader(buf, nil) + if err != nil { + t.Fatal(err) + } + if got, want := rep.Initial(), initialState; !bytes.Equal(got, want) { + t.Errorf("got initial state %q, want %q", got, want) + } + +}