Skip to content

Commit

Permalink
fix(destinations): Handle nulls in JSONs (#6466)
Browse files Browse the repository at this point in the history
Looks like we didn't handle `nulls` well in all cases.

This should close #6333
and also the recent pg report that we got.

Side note: There is some boilerplate code here but I don't think there
is a way to avoid it because we do want to give destinations an
opportunity to decide what to do on `null` value which might not always
be just send null as it can be empty `string` like in CSV or some other
value.
  • Loading branch information
yevgenypats committed Jan 7, 2023
1 parent 163a581 commit f434f00
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 32 deletions.
51 changes: 39 additions & 12 deletions plugins/destination/mongodb/client/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,31 @@ import (
)

func (*Client) TransformBool(v *schema.Bool) any {
return v.Bool
if v.Status == schema.Present {
return v.Bool
}
return nil
}

func (*Client) TransformBytea(v *schema.Bytea) any {
return v.Bytes
if v.Status == schema.Present {
return v.Bytes
}
return nil
}

func (*Client) TransformFloat8(v *schema.Float8) any {
return v.Float
if v.Status == schema.Present {
return v.Float
}
return nil
}

func (*Client) TransformInt8(v *schema.Int8) any {
return v.Int
if v.Status == schema.Present {
return v.Int
}
return nil
}

func (*Client) TransformInt8Array(v *schema.Int8Array) any {
Expand All @@ -42,7 +54,10 @@ func (*Client) TransformJSON(v *schema.JSON) any {
}

func (*Client) TransformText(v *schema.Text) any {
return v.String()
if v.Status == schema.Present {
return v.String()
}
return nil
}

func (*Client) TransformTextArray(v *schema.TextArray) any {
Expand All @@ -54,14 +69,17 @@ func (*Client) TransformTextArray(v *schema.TextArray) any {
}

func (*Client) TransformTimestamptz(v *schema.Timestamptz) any {
if v.Status != schema.Present {
return nil
if v.Status == schema.Present {
return v.Time
}
return v.Time
return nil
}

func (*Client) TransformUUID(v *schema.UUID) any {
return v.String()
if v.Status == schema.Present {
return v.String()
}
return nil
}

func (*Client) TransformUUIDArray(v *schema.UUIDArray) any {
Expand All @@ -73,7 +91,10 @@ func (*Client) TransformUUIDArray(v *schema.UUIDArray) any {
}

func (*Client) TransformCIDR(v *schema.CIDR) any {
return v.String()
if v.Status == schema.Present {
return v.String()
}
return nil
}

func (*Client) TransformCIDRArray(v *schema.CIDRArray) any {
Expand All @@ -85,7 +106,10 @@ func (*Client) TransformCIDRArray(v *schema.CIDRArray) any {
}

func (*Client) TransformInet(v *schema.Inet) any {
return v.String()
if v.Status == schema.Present {
return v.String()
}
return nil
}

func (*Client) TransformInetArray(v *schema.InetArray) any {
Expand All @@ -97,7 +121,10 @@ func (*Client) TransformInetArray(v *schema.InetArray) any {
}

func (*Client) TransformMacaddr(v *schema.Macaddr) any {
return v.String()
if v.Status == schema.Present {
return v.String()
}
return nil
}

func (*Client) TransformMacaddrArray(v *schema.MacaddrArray) any {
Expand Down
57 changes: 42 additions & 15 deletions plugins/destination/neo4j/client/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,31 @@ import (
)

func (*Client) TransformBool(v *schema.Bool) any {
return v.Bool
if v.Status == schema.Present {
return v.Bool
}
return nil
}

func (*Client) TransformBytea(v *schema.Bytea) any {
return v.Bytes
if v.Status == schema.Present {
return v.Bytes
}
return nil
}

func (*Client) TransformFloat8(v *schema.Float8) any {
return v.Float
if v.Status == schema.Present {
return v.Float
}
return nil
}

func (*Client) TransformInt8(v *schema.Int8) any {
return v.Int
if v.Status == schema.Present {
return v.Int
}
return nil
}

func (*Client) TransformInt8Array(v *schema.Int8Array) any {
Expand All @@ -29,14 +41,17 @@ func (*Client) TransformInt8Array(v *schema.Int8Array) any {
}

func (*Client) TransformJSON(v *schema.JSON) any {
if v.Status != schema.Present {
return nil
if v.Status == schema.Present {
return string(v.Bytes)
}
return string(v.Bytes)
return nil
}

func (*Client) TransformText(v *schema.Text) any {
return v.String()
if v.Status == schema.Present {
return v.String()
}
return nil
}

func (*Client) TransformTextArray(v *schema.TextArray) any {
Expand All @@ -48,14 +63,17 @@ func (*Client) TransformTextArray(v *schema.TextArray) any {
}

func (*Client) TransformTimestamptz(v *schema.Timestamptz) any {
if v.Status != schema.Present {
return nil
if v.Status == schema.Present {
return v.Time
}
return v.Time
return nil
}

func (*Client) TransformUUID(v *schema.UUID) any {
return v.String()
if v.Status == schema.Present {
return v.String()
}
return nil
}

func (*Client) TransformUUIDArray(v *schema.UUIDArray) any {
Expand All @@ -67,7 +85,10 @@ func (*Client) TransformUUIDArray(v *schema.UUIDArray) any {
}

func (*Client) TransformCIDR(v *schema.CIDR) any {
return v.String()
if v.Status == schema.Present {
return v.String()
}
return nil
}

func (*Client) TransformCIDRArray(v *schema.CIDRArray) any {
Expand All @@ -79,7 +100,10 @@ func (*Client) TransformCIDRArray(v *schema.CIDRArray) any {
}

func (*Client) TransformInet(v *schema.Inet) any {
return v.String()
if v.Status == schema.Present {
return v.String()
}
return nil
}

func (*Client) TransformInetArray(v *schema.InetArray) any {
Expand All @@ -91,7 +115,10 @@ func (*Client) TransformInetArray(v *schema.InetArray) any {
}

func (*Client) TransformMacaddr(v *schema.Macaddr) any {
return v.String()
if v.Status == schema.Present {
return v.String()
}
return nil
}

func (*Client) TransformMacaddrArray(v *schema.MacaddrArray) any {
Expand Down
25 changes: 20 additions & 5 deletions plugins/destination/postgresql/client/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ func (*Client) TransformBool(v *schema.Bool) any {
}

func (*Client) TransformBytea(v *schema.Bytea) any {
return v.Bytes
if v.Status == schema.Present {
return v.Bytes
}
return nil
}

func (*Client) TransformFloat8(v *schema.Float8) any {
Expand All @@ -42,7 +45,10 @@ func (*Client) TransformInt8Array(v *schema.Int8Array) any {
}

func (*Client) TransformJSON(v *schema.JSON) any {
return v.Bytes
if v.Status == schema.Present {
return v.Bytes
}
return nil
}

func (*Client) TransformText(v *schema.Text) any {
Expand Down Expand Up @@ -83,7 +89,10 @@ func (*Client) TransformUUIDArray(v *schema.UUIDArray) any {
}

func (*Client) TransformCIDR(v *schema.CIDR) any {
return v.IPNet
if v.Status == schema.Present {
return v.IPNet
}
return nil
}

func (*Client) TransformCIDRArray(v *schema.CIDRArray) any {
Expand All @@ -95,7 +104,10 @@ func (*Client) TransformCIDRArray(v *schema.CIDRArray) any {
}

func (*Client) TransformInet(v *schema.Inet) any {
return v.IPNet
if v.Status == schema.Present {
return v.IPNet
}
return nil
}

func (*Client) TransformInetArray(v *schema.InetArray) any {
Expand All @@ -107,7 +119,10 @@ func (*Client) TransformInetArray(v *schema.InetArray) any {
}

func (*Client) TransformMacaddr(v *schema.Macaddr) any {
return v.Addr
if v.Status == schema.Present {
return v.Addr
}
return nil
}

func (c *Client) TransformMacaddrArray(v *schema.MacaddrArray) any {
Expand Down

0 comments on commit f434f00

Please sign in to comment.