From cfe58f1f35b5ee24eee2737ca88f512c798a1e3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Thu, 24 Apr 2025 14:59:46 +0100 Subject: [PATCH 1/2] add client-wide retries Introduces gRPC middleware to do client-side retries by default. The only flag introduced to customize this behaviour is the maximum number of attempts, which defaults to 20. The retries are enabled for bot unary and streaming APIs. --- go.mod | 2 +- internal/client/client.go | 33 ++++++++++++++++--- internal/client/client_test.go | 60 ++++++++++++++++++++++++++++++++++ internal/cmd/cmd.go | 1 + 4 files changed, 91 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 1644a6ab..38d51943 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/charmbracelet/x/term v0.2.1 github.com/google/uuid v1.6.0 github.com/gookit/color v1.5.4 + github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.1 github.com/hamba/avro/v2 v2.28.0 github.com/jzelinskie/cobrautil/v2 v2.0.0-20240819150235-f7fe73942d0f github.com/jzelinskie/stringz v0.0.3 @@ -140,7 +141,6 @@ require ( github.com/googleapis/gax-go/v2 v2.14.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 // indirect - github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/errwrap v1.1.0 // indirect diff --git a/internal/client/client.go b/internal/client/client.go index a90e0745..0106752a 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -8,13 +8,16 @@ import ( "os" "path/filepath" "strings" + "time" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" "github.com/jzelinskie/cobrautil/v2" "github.com/mitchellh/go-homedir" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "golang.org/x/net/proxy" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" @@ -33,6 +36,12 @@ type Client interface { v1.ExperimentalServiceClient } +const ( + defaultRetryExponentialBackoff = 100 * time.Millisecond + defaultMaxRetryAttemptDuration = 2 * time.Second + defaultRetryJitterFraction = 0.5 +) + // NewClient defines an (overridable) means of creating a new client. var ( NewClient = newClientForCurrentContext @@ -191,17 +200,33 @@ func certOption(token storage.Token) (opt grpc.DialOption, err error) { // DialOptsFromFlags returns the dial options from the CLI-specified flags. func DialOptsFromFlags(cmd *cobra.Command, token storage.Token) ([]grpc.DialOption, error) { - interceptors := []grpc.UnaryClientInterceptor{ + maxRetries := cobrautil.MustGetUint(cmd, "max-retries") + retryOpts := []retry.CallOption{ + retry.WithBackoff(retry.BackoffExponentialWithJitterBounded(defaultRetryExponentialBackoff, + defaultRetryJitterFraction, defaultMaxRetryAttemptDuration)), + retry.WithCodes(codes.ResourceExhausted, codes.Unavailable, codes.Aborted, codes.Unknown, codes.Internal), + retry.WithMax(maxRetries), + retry.WithOnRetryCallback(func(_ context.Context, attempt uint, err error) { + log.Error().Err(err).Uint("attempt", attempt).Msg("retrying gRPC call") + }), + } + unaryInterceptors := []grpc.UnaryClientInterceptor{ zgrpcutil.LogDispatchTrailers, + retry.UnaryClientInterceptor(retryOpts...), + } + + streamInterceptors := []grpc.StreamClientInterceptor{ + zgrpcutil.StreamLogDispatchTrailers, + retry.StreamClientInterceptor(retryOpts...), } if !cobrautil.MustGetBool(cmd, "skip-version-check") { - interceptors = append(interceptors, zgrpcutil.CheckServerVersion) + unaryInterceptors = append(unaryInterceptors, zgrpcutil.CheckServerVersion) } opts := []grpc.DialOption{ - grpc.WithChainUnaryInterceptor(interceptors...), - grpc.WithChainStreamInterceptor(zgrpcutil.StreamLogDispatchTrailers), + grpc.WithChainUnaryInterceptor(unaryInterceptors...), + grpc.WithChainStreamInterceptor(streamInterceptors...), } proxyAddr := cobrautil.MustGetString(cmd, "proxy") diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 4d531187..ba72b7c5 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -1,11 +1,21 @@ package client_test import ( + "context" + "net" "os" "path" "testing" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" + + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/authzed/authzed-go/v1" + "github.com/authzed/grpcutil" "github.com/authzed/zed/internal/client" "github.com/authzed/zed/internal/storage" @@ -118,3 +128,53 @@ func TestGetCurrentTokenWithCLIOverrideWithoutSecretFile(t *testing.T) { require.Equal("e1", token.Endpoint) require.Equal(&bTrue, token.Insecure) } + +type fakeSchemaServer struct { + v1.UnimplementedSchemaServiceServer + testFunc func() +} + +func (fss *fakeSchemaServer) ReadSchema(_ context.Context, _ *v1.ReadSchemaRequest) (*v1.ReadSchemaResponse, error) { + fss.testFunc() + return nil, status.Error(codes.Unavailable, "") +} + +func TestRetries(t *testing.T) { + ctx := t.Context() + var callCount uint + lis := bufconn.Listen(1024 * 1024) + s := grpc.NewServer() + + fakeServer := &fakeSchemaServer{testFunc: func() { + callCount++ + }} + v1.RegisterSchemaServiceServer(s, fakeServer) + + go func() { + _ = s.Serve(lis) + }() + t.Cleanup(s.Stop) + + secure := true + retries := uint(2) + cmd := zedtesting.CreateTestCobraCommandWithFlagValue(t, + zedtesting.BoolFlag{FlagName: "skip-version-check", FlagValue: true, Changed: true}, + zedtesting.UintFlag{FlagName: "max-retries", FlagValue: retries, Changed: true}, + zedtesting.StringFlag{FlagName: "proxy", FlagValue: "", Changed: true}, + zedtesting.StringFlag{FlagName: "hostname-override", FlagValue: "", Changed: true}, + zedtesting.IntFlag{FlagName: "max-message-size", FlagValue: 1000, Changed: true}, + ) + dialOpts, err := client.DialOptsFromFlags(cmd, storage.Token{Insecure: &secure}) + require.NoError(t, err) + + dialOpts = append(dialOpts, grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + })) + + c, err := authzed.NewClient("passthrough://bufnet", dialOpts...) + require.NoError(t, err) + + _, err = c.ReadSchema(ctx, &v1.ReadSchemaRequest{}) + grpcutil.RequireStatus(t, codes.Unavailable, err) + require.Equal(t, retries, callCount) +} diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index 79a3b206..99df3045 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -76,6 +76,7 @@ func InitialiseRootCmd(zl *cobrazerolog.Builder) *cobra.Command { rootCmd.PersistentFlags().String("request-id", "", "optional id to send along with SpiceDB requests for tracing") rootCmd.PersistentFlags().Int("max-message-size", 0, "maximum size *in bytes* (defaults to 4_194_304 bytes ~= 4MB) of a gRPC message that can be sent or received by zed") rootCmd.PersistentFlags().String("proxy", "", "specify a SOCKS5 proxy address") + rootCmd.PersistentFlags().Uint("max-retries", 10, "maximum number of sequential retries to attempt when a request fails") _ = rootCmd.PersistentFlags().MarkHidden("debug") // This cannot return its error. versionCmd := &cobra.Command{ From 87dfc78314abde0a528e3d5a63e1d0259c96fada Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Thu, 24 Apr 2025 17:48:44 +0100 Subject: [PATCH 2/2] fix missing `page-limit` command on the backup create alias (backup) --- internal/cmd/backup.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/cmd/backup.go b/internal/cmd/backup.go index 5e32de44..e66414a4 100644 --- a/internal/cmd/backup.go +++ b/internal/cmd/backup.go @@ -55,7 +55,7 @@ var ( Short: "Create, restore, and inspect permissions system backups", Args: cobra.MaximumNArgs(1), // Create used to be on the root, so add it here for back-compat. - RunE: backupCreateCmdFunc, + RunE: withErrorHandling(backupCreateCmdFunc), } backupCreateCmd = &cobra.Command{ @@ -116,8 +116,6 @@ func registerBackupCmd(rootCmd *cobra.Command) { backupCmd.AddCommand(backupCreateCmd) registerBackupCreateFlags(backupCreateCmd) - backupCreateCmd.Flags().Uint32("page-limit", 0, "defines the number of relationships to be read by requested page during backup") - backupCmd.AddCommand(backupRestoreCmd) registerBackupRestoreFlags(backupRestoreCmd) @@ -160,6 +158,7 @@ func registerBackupRestoreFlags(cmd *cobra.Command) { func registerBackupCreateFlags(cmd *cobra.Command) { cmd.Flags().String("prefix-filter", "", "include only schema and relationships with a given prefix") cmd.Flags().Bool("rewrite-legacy", false, "potentially modify the schema to exclude legacy/broken syntax") + cmd.Flags().Uint32("page-limit", 0, "defines the number of relationships to be read by requested page during backup") } func createBackupFile(filename string, returnIfExists bool) (*os.File, bool, error) {