From 089f9e1ce528625e4c8ece4f1da8cdd59cdb5c3a Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Wed, 9 Nov 2022 11:28:26 +0000 Subject: [PATCH] Fix errors --- clients/destination.go | 4 ++-- clients/source.go | 2 +- plugins/destination.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/clients/destination.go b/clients/destination.go index e3bfd1d5d7..3fb3a67168 100644 --- a/clients/destination.go +++ b/clients/destination.go @@ -289,7 +289,7 @@ func (c *DestinationClient) Write2(ctx context.Context, tables schema.Tables, so if err := saveClient.Send(&pb.Write2_Request{ Resource: resource, }); err != nil { - return fmt.Errorf("failed to call Write.Send: %w", err) + return fmt.Errorf("failed to call Write2.Send: %w", err) } } _, err = saveClient.CloseAndRecv() @@ -330,7 +330,7 @@ func (c *DestinationClient) Terminate() error { if c.grpcSocketName != "" { defer func() { - if err := os.Remove(c.grpcSocketName); err != nil { + if err := os.RemoveAll(c.grpcSocketName); err != nil { c.logger.Error().Err(err).Msg("failed to remove destination socket file") } }() diff --git a/clients/source.go b/clients/source.go index 764529161d..8adfa0b924 100644 --- a/clients/source.go +++ b/clients/source.go @@ -301,7 +301,7 @@ func (c *SourceClient) Terminate() error { if c.grpcSocketName != "" { defer func() { - if err := os.Remove(c.grpcSocketName); err != nil { + if err := os.RemoveAll(c.grpcSocketName); err != nil { c.logger.Error().Err(err).Msg("failed to remove source socket file") } }() diff --git a/plugins/destination.go b/plugins/destination.go index 5fe69775b9..c67aa66931 100644 --- a/plugins/destination.go +++ b/plugins/destination.go @@ -137,12 +137,12 @@ func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sou syncTime = syncTime.UTC() SetDestinationManagedCqColumns(tables) ch := make(chan *ClientResource) - eg, ctx := errgroup.WithContext(ctx) + eg, gctx := errgroup.WithContext(ctx) // given most destination plugins writing in batch we are using a worker pool to write in parallel // it might not generalize well and we might need to move it to each destination plugin implementation. for i := 0; i < writeWorkers; i++ { eg.Go(func() error { - return p.client.Write(ctx, tables, ch) + return p.client.Write(gctx, tables, ch) }) } sourceColumn := &schema.Text{}