From 198f1ab814a25a7ba719a37183a282e6bf944385 Mon Sep 17 00:00:00 2001 From: Kartikay Date: Sun, 15 Jun 2025 17:43:10 +0530 Subject: [PATCH] use ImportBulk and ExportBulk instead Signed-off-by: Kartikay --- internal/cmd/backup.go | 6 +++--- internal/cmd/backup_test.go | 4 ++-- internal/cmd/restorer.go | 12 ++++++------ internal/cmd/restorer_test.go | 16 ++++++++-------- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/internal/cmd/backup.go b/internal/cmd/backup.go index 57fa83c8..00f1641f 100644 --- a/internal/cmd/backup.go +++ b/internal/cmd/backup.go @@ -328,7 +328,7 @@ func backupCreateCmdFunc(cmd *cobra.Command, args []string) (err error) { return errors.New("malformed existing backup, consider recreating it") } - req := &v1.BulkExportRelationshipsRequest{ + req := &v1.ExportBulkRelationshipsRequest{ OptionalLimit: pageLimit, OptionalCursor: cursor, } @@ -343,7 +343,7 @@ func backupCreateCmdFunc(cmd *cobra.Command, args []string) (err error) { } ctx := cmd.Context() - relationshipStream, err := c.BulkExportRelationships(ctx, req) + relationshipStream, err := c.ExportBulkRelationships(ctx, req) if err != nil { return fmt.Errorf("error exporting relationships: %w", err) } @@ -467,7 +467,7 @@ func encoderForNewBackup(cmd *cobra.Command, c client.Client, backupFile *os.Fil return encoder, zedToken, nil } -func writeProgress(progressFile *os.File, relsResp *v1.BulkExportRelationshipsResponse) error { +func writeProgress(progressFile *os.File, relsResp *v1.ExportBulkRelationshipsResponse) error { err := progressFile.Truncate(0) if err != nil { return fmt.Errorf("unable to truncate backup progress file: %w", err) diff --git a/internal/cmd/backup_test.go b/internal/cmd/backup_test.go index f6d19205..886d46c0 100644 --- a/internal/cmd/backup_test.go +++ b/internal/cmd/backup_test.go @@ -364,7 +364,7 @@ func TestBackupCreateCmdFunc(t *testing.T) { }) t.Run("truncates progress marker if it existed but backup did not", func(t *testing.T) { - streamClient, err := c.BulkExportRelationships(ctx, &v1.BulkExportRelationshipsRequest{ + streamClient, err := c.ExportBulkRelationships(ctx, &v1.ExportBulkRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ AtExactSnapshot: resp.WrittenAt, @@ -392,7 +392,7 @@ func TestBackupCreateCmdFunc(t *testing.T) { }) t.Run("resumes backup if marker file exists", func(t *testing.T) { - streamClient, err := c.BulkExportRelationships(ctx, &v1.BulkExportRelationshipsRequest{ + streamClient, err := c.ExportBulkRelationships(ctx, &v1.ExportBulkRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ AtExactSnapshot: resp.WrittenAt, diff --git a/internal/cmd/restorer.go b/internal/cmd/restorer.go index 468064fb..c39dc047 100644 --- a/internal/cmd/restorer.go +++ b/internal/cmd/restorer.go @@ -111,7 +111,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error { return fmt.Errorf("unable to write schema: %w", err) } - relationshipWriter, err := r.client.BulkImportRelationships(ctx) + relationshipWriter, err := r.client.ImportBulkRelationships(ctx) if err != nil { return fmt.Errorf("error creating writer stream: %w", err) } @@ -134,7 +134,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error { if uint(len(batch))%r.batchSize == 0 { batchesToBeCommitted = append(batchesToBeCommitted, batch) - err := relationshipWriter.Send(&v1.BulkImportRelationshipsRequest{ + err := relationshipWriter.Send(&v1.ImportBulkRelationshipsRequest{ Relationships: batch, }) if err != nil { @@ -145,7 +145,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error { } // after an error - relationshipWriter, err = r.client.BulkImportRelationships(ctx) + relationshipWriter, err = r.client.ImportBulkRelationships(ctx) if err != nil { return fmt.Errorf("error creating new writer stream: %w", err) } @@ -168,7 +168,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error { return fmt.Errorf("error committing batches: %w", err) } - relationshipWriter, err = r.client.BulkImportRelationships(ctx) + relationshipWriter, err = r.client.ImportBulkRelationships(ctx) if err != nil { return fmt.Errorf("error creating new writer stream: %w", err) } @@ -184,7 +184,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error { // underlying error that caused Send() to fail. It also gives us the opportunity to retry it // in case it failed. batchesToBeCommitted = append(batchesToBeCommitted, batch) - _ = relationshipWriter.Send(&v1.BulkImportRelationshipsRequest{Relationships: batch}) + _ = relationshipWriter.Send(&v1.ImportBulkRelationshipsRequest{Relationships: batch}) } if err := r.commitStream(ctx, relationshipWriter, batchesToBeCommitted); err != nil { @@ -210,7 +210,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error { return nil } -func (r *restorer) commitStream(ctx context.Context, bulkImportClient v1.ExperimentalService_BulkImportRelationshipsClient, +func (r *restorer) commitStream(ctx context.Context, bulkImportClient v1.PermissionsService_ImportBulkRelationshipsClient, batchesToBeCommitted [][]*v1.Relationship, ) error { var numLoaded, expectedLoaded, retries uint diff --git a/internal/cmd/restorer_test.go b/internal/cmd/restorer_test.go index b5c2a487..f0816347 100644 --- a/internal/cmd/restorer_test.go +++ b/internal/cmd/restorer_test.go @@ -185,7 +185,7 @@ func TestRestorer(t *testing.T) { type mockClient struct { client.Client - v1.ExperimentalService_BulkImportRelationshipsClient + v1.PermissionsService_ImportBulkRelationshipsClient t *testing.T schema string remainderBatch bool @@ -204,11 +204,7 @@ type mockClient struct { touchErrors []error } -func (m *mockClient) BulkImportRelationships(_ context.Context, _ ...grpc.CallOption) (v1.ExperimentalService_BulkImportRelationshipsClient, error) { - return m, nil -} - -func (m *mockClient) Send(req *v1.BulkImportRelationshipsRequest) error { +func (m *mockClient) Send(req *v1.ImportBulkRelationshipsRequest) error { m.receivedBatches++ m.receivedRels += uint(len(req.Relationships)) m.lastReceivedBatch = req.Relationships @@ -241,7 +237,7 @@ func (m *mockClient) WriteRelationships(_ context.Context, in *v1.WriteRelations return &v1.WriteRelationshipsResponse{}, nil } -func (m *mockClient) CloseAndRecv() (*v1.BulkImportRelationshipsResponse, error) { +func (m *mockClient) CloseAndRecv() (*v1.ImportBulkRelationshipsResponse, error) { m.receivedCommits++ lastBatch := m.lastReceivedBatch defer func() { m.lastReceivedBatch = nil }() @@ -250,7 +246,11 @@ func (m *mockClient) CloseAndRecv() (*v1.BulkImportRelationshipsResponse, error) return nil, m.commitErrors[m.receivedCommits-1] } - return &v1.BulkImportRelationshipsResponse{NumLoaded: uint64(len(lastBatch))}, nil + return &v1.ImportBulkRelationshipsResponse{NumLoaded: uint64(len(lastBatch))}, nil +} + +func (m *mockClient) ImportBulkRelationships(_ context.Context, _ ...grpc.CallOption) (v1.PermissionsService_ImportBulkRelationshipsClient, error) { + return m, nil } func (m *mockClient) WriteSchema(_ context.Context, wsr *v1.WriteSchemaRequest, _ ...grpc.CallOption) (*v1.WriteSchemaResponse, error) {