Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func backupCreateCmdFunc(cmd *cobra.Command, args []string) (err error) {
return errors.New("malformed existing backup, consider recreating it")
}

req := &v1.BulkExportRelationshipsRequest{
req := &v1.ExportBulkRelationshipsRequest{
OptionalLimit: pageLimit,
OptionalCursor: cursor,
}
Expand All @@ -343,7 +343,7 @@ func backupCreateCmdFunc(cmd *cobra.Command, args []string) (err error) {
}

ctx := cmd.Context()
relationshipStream, err := c.BulkExportRelationships(ctx, req)
relationshipStream, err := c.ExportBulkRelationships(ctx, req)
if err != nil {
return fmt.Errorf("error exporting relationships: %w", err)
}
Expand Down Expand Up @@ -467,7 +467,7 @@ func encoderForNewBackup(cmd *cobra.Command, c client.Client, backupFile *os.Fil
return encoder, zedToken, nil
}

func writeProgress(progressFile *os.File, relsResp *v1.BulkExportRelationshipsResponse) error {
func writeProgress(progressFile *os.File, relsResp *v1.ExportBulkRelationshipsResponse) error {
err := progressFile.Truncate(0)
if err != nil {
return fmt.Errorf("unable to truncate backup progress file: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/cmd/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestBackupCreateCmdFunc(t *testing.T) {
})

t.Run("truncates progress marker if it existed but backup did not", func(t *testing.T) {
streamClient, err := c.BulkExportRelationships(ctx, &v1.BulkExportRelationshipsRequest{
streamClient, err := c.ExportBulkRelationships(ctx, &v1.ExportBulkRelationshipsRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_AtExactSnapshot{
AtExactSnapshot: resp.WrittenAt,
Expand Down Expand Up @@ -392,7 +392,7 @@ func TestBackupCreateCmdFunc(t *testing.T) {
})

t.Run("resumes backup if marker file exists", func(t *testing.T) {
streamClient, err := c.BulkExportRelationships(ctx, &v1.BulkExportRelationshipsRequest{
streamClient, err := c.ExportBulkRelationships(ctx, &v1.ExportBulkRelationshipsRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_AtExactSnapshot{
AtExactSnapshot: resp.WrittenAt,
Expand Down
12 changes: 6 additions & 6 deletions internal/cmd/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error {
return fmt.Errorf("unable to write schema: %w", err)
}

relationshipWriter, err := r.client.BulkImportRelationships(ctx)
relationshipWriter, err := r.client.ImportBulkRelationships(ctx)
if err != nil {
return fmt.Errorf("error creating writer stream: %w", err)
}
Expand All @@ -134,7 +134,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error {

if uint(len(batch))%r.batchSize == 0 {
batchesToBeCommitted = append(batchesToBeCommitted, batch)
err := relationshipWriter.Send(&v1.BulkImportRelationshipsRequest{
err := relationshipWriter.Send(&v1.ImportBulkRelationshipsRequest{
Relationships: batch,
})
if err != nil {
Expand All @@ -145,7 +145,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error {
}

// after an error
relationshipWriter, err = r.client.BulkImportRelationships(ctx)
relationshipWriter, err = r.client.ImportBulkRelationships(ctx)
if err != nil {
return fmt.Errorf("error creating new writer stream: %w", err)
}
Expand All @@ -168,7 +168,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error {
return fmt.Errorf("error committing batches: %w", err)
}

relationshipWriter, err = r.client.BulkImportRelationships(ctx)
relationshipWriter, err = r.client.ImportBulkRelationships(ctx)
if err != nil {
return fmt.Errorf("error creating new writer stream: %w", err)
}
Expand All @@ -184,7 +184,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error {
// underlying error that caused Send() to fail. It also gives us the opportunity to retry it
// in case it failed.
batchesToBeCommitted = append(batchesToBeCommitted, batch)
_ = relationshipWriter.Send(&v1.BulkImportRelationshipsRequest{Relationships: batch})
_ = relationshipWriter.Send(&v1.ImportBulkRelationshipsRequest{Relationships: batch})
}

if err := r.commitStream(ctx, relationshipWriter, batchesToBeCommitted); err != nil {
Expand All @@ -210,7 +210,7 @@ func (r *restorer) restoreFromDecoder(ctx context.Context) error {
return nil
}

func (r *restorer) commitStream(ctx context.Context, bulkImportClient v1.ExperimentalService_BulkImportRelationshipsClient,
func (r *restorer) commitStream(ctx context.Context, bulkImportClient v1.PermissionsService_ImportBulkRelationshipsClient,
batchesToBeCommitted [][]*v1.Relationship,
) error {
var numLoaded, expectedLoaded, retries uint
Expand Down
16 changes: 8 additions & 8 deletions internal/cmd/restorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestRestorer(t *testing.T) {

type mockClient struct {
client.Client
v1.ExperimentalService_BulkImportRelationshipsClient
v1.PermissionsService_ImportBulkRelationshipsClient
t *testing.T
schema string
remainderBatch bool
Expand All @@ -204,11 +204,7 @@ type mockClient struct {
touchErrors []error
}

func (m *mockClient) BulkImportRelationships(_ context.Context, _ ...grpc.CallOption) (v1.ExperimentalService_BulkImportRelationshipsClient, error) {
return m, nil
}

func (m *mockClient) Send(req *v1.BulkImportRelationshipsRequest) error {
func (m *mockClient) Send(req *v1.ImportBulkRelationshipsRequest) error {
m.receivedBatches++
m.receivedRels += uint(len(req.Relationships))
m.lastReceivedBatch = req.Relationships
Expand Down Expand Up @@ -241,7 +237,7 @@ func (m *mockClient) WriteRelationships(_ context.Context, in *v1.WriteRelations
return &v1.WriteRelationshipsResponse{}, nil
}

func (m *mockClient) CloseAndRecv() (*v1.BulkImportRelationshipsResponse, error) {
func (m *mockClient) CloseAndRecv() (*v1.ImportBulkRelationshipsResponse, error) {
m.receivedCommits++
lastBatch := m.lastReceivedBatch
defer func() { m.lastReceivedBatch = nil }()
Expand All @@ -250,7 +246,11 @@ func (m *mockClient) CloseAndRecv() (*v1.BulkImportRelationshipsResponse, error)
return nil, m.commitErrors[m.receivedCommits-1]
}

return &v1.BulkImportRelationshipsResponse{NumLoaded: uint64(len(lastBatch))}, nil
return &v1.ImportBulkRelationshipsResponse{NumLoaded: uint64(len(lastBatch))}, nil
}

func (m *mockClient) ImportBulkRelationships(_ context.Context, _ ...grpc.CallOption) (v1.PermissionsService_ImportBulkRelationshipsClient, error) {
return m, nil
}

func (m *mockClient) WriteSchema(_ context.Context, wsr *v1.WriteSchemaRequest, _ ...grpc.CallOption) (*v1.WriteSchemaResponse, error) {
Expand Down
Loading