From ca5eb9d3ce15f9a0ae517c4ed2f84f9112870f3f Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 25 Apr 2023 18:39:15 -0500 Subject: [PATCH 1/3] Time values are truncated uniformly --- clients/destination/v0/destination.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/destination/v0/destination.go b/clients/destination/v0/destination.go index 53e9488a4f..0e1a0096a2 100644 --- a/clients/destination/v0/destination.go +++ b/clients/destination/v0/destination.go @@ -264,7 +264,7 @@ func (c *Client) Write(ctx context.Context, source string, syncTime time.Time, r if err := saveClient.Send(&pb.Write_Request{ Resource: resource, Source: source, - Timestamp: timestamppb.New(syncTime), + Timestamp: timestamppb.New(syncTime.Truncate(time.Millisecond)), }); err != nil { if err == io.EOF { // don't send write request if the channel is closed @@ -297,7 +297,7 @@ func (c *Client) Write2(ctx context.Context, sourceSpec specs.Source, tables sch if err := saveClient.Send(&pb.Write2_Request{ Tables: b, Source: sourceSpec.Name, - Timestamp: timestamppb.New(syncTime), + Timestamp: timestamppb.New(syncTime.Truncate(time.Millisecond)), SourceSpec: sourceSpecBytes, }); err != nil { return fmt.Errorf("failed to send tables: %w", err) @@ -335,7 +335,7 @@ func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source s } if _, err := c.pbClient.DeleteStale(ctx, &pb.DeleteStale_Request{ Source: source, - Timestamp: timestamppb.New(timestamp), + Timestamp: timestamppb.New(timestamp.Truncate(time.Millisecond)), Tables: b, }); err != nil { return fmt.Errorf("failed to call DeleteStale: %w", err) From fe2e64e4b2fe5487af5cfe6c117cdf9df03d4e47 Mon Sep 17 00:00:00 2001 From: bbernays Date: Wed, 26 Apr 2023 08:00:05 -0500 Subject: [PATCH 2/3] Update destination.go --- clients/destination/v0/destination.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/destination/v0/destination.go b/clients/destination/v0/destination.go index 0e1a0096a2..4169f21084 100644 --- a/clients/destination/v0/destination.go +++ b/clients/destination/v0/destination.go @@ -264,7 +264,7 @@ func (c *Client) Write(ctx context.Context, source string, syncTime time.Time, r if err := saveClient.Send(&pb.Write_Request{ Resource: resource, Source: source, - Timestamp: timestamppb.New(syncTime.Truncate(time.Millisecond)), + Timestamp: timestamppb.New(syncTime.Truncate(time.Microsecond)), }); err != nil { if err == io.EOF { // don't send write request if the channel is closed @@ -297,7 +297,7 @@ func (c *Client) Write2(ctx context.Context, sourceSpec specs.Source, tables sch if err := saveClient.Send(&pb.Write2_Request{ Tables: b, Source: sourceSpec.Name, - Timestamp: timestamppb.New(syncTime.Truncate(time.Millisecond)), + Timestamp: timestamppb.New(syncTime.Truncate(time.Microsecond)), SourceSpec: sourceSpecBytes, }); err != nil { return fmt.Errorf("failed to send tables: %w", err) @@ -335,7 +335,7 @@ func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source s } if _, err := c.pbClient.DeleteStale(ctx, &pb.DeleteStale_Request{ Source: source, - Timestamp: timestamppb.New(timestamp.Truncate(time.Millisecond)), + Timestamp: timestamppb.New(timestamp.Truncate(time.Microsecond)), Tables: b, }); err != nil { return fmt.Errorf("failed to call DeleteStale: %w", err) From 8cc61d35aba7fe29027b2fc648f1ee55e5365f50 Mon Sep 17 00:00:00 2001 From: bbernays Date: Wed, 26 Apr 2023 12:15:20 -0500 Subject: [PATCH 3/3] Update destination.go --- clients/destination/v0/destination.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/destination/v0/destination.go b/clients/destination/v0/destination.go index 4169f21084..6e5dac2e93 100644 --- a/clients/destination/v0/destination.go +++ b/clients/destination/v0/destination.go @@ -260,11 +260,12 @@ func (c *Client) Write(ctx context.Context, source string, syncTime time.Time, r if err != nil { return 0, fmt.Errorf("failed to call Write: %w", err) } + syncTime = syncTime.Truncate(time.Microsecond) for resource := range resources { if err := saveClient.Send(&pb.Write_Request{ Resource: resource, Source: source, - Timestamp: timestamppb.New(syncTime.Truncate(time.Microsecond)), + Timestamp: timestamppb.New(syncTime), }); err != nil { if err == io.EOF { // don't send write request if the channel is closed