Skip to content

Commit

Permalink
Copy encodeDecimalWithScale from transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Aug 23, 2024
1 parent e0fa8ec commit 21c87a9
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
41 changes: 40 additions & 1 deletion lib/debezium/converters/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,50 @@ package converters

import (
"fmt"

"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/typing"
"github.com/cockroachdb/apd/v3"
)

// decimalWithNewExponent takes a [*apd.Decimal] and returns a new [*apd.Decimal] with a the given exponent.
// If the new exponent is less precise then the extra digits will be truncated.
func decimalWithNewExponent(decimal *apd.Decimal, newExponent int32) *apd.Decimal {
exponentDelta := newExponent - decimal.Exponent // Exponent is negative.

if exponentDelta == 0 {
return new(apd.Decimal).Set(decimal)
}

coefficient := new(apd.BigInt).Set(&decimal.Coeff)

if exponentDelta < 0 {
multiplier := new(apd.BigInt).Exp(apd.NewBigInt(10), apd.NewBigInt(int64(-exponentDelta)), nil)
coefficient.Mul(coefficient, multiplier)
} else if exponentDelta > 0 {
divisor := new(apd.BigInt).Exp(apd.NewBigInt(10), apd.NewBigInt(int64(exponentDelta)), nil)
coefficient.Div(coefficient, divisor)
}

return &apd.Decimal{
Form: decimal.Form,
Negative: decimal.Negative,
Exponent: newExponent,
Coeff: *coefficient,
}
}

// encodeDecimalWithScale is used to encode a [*apd.Decimal] to `org.apache.kafka.connect.data.Decimal`
// using a specific scale.
func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) []byte {
targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative.
if decimal.Exponent != targetExponent {
decimal = decimalWithNewExponent(decimal, targetExponent)
}
bytes, _ := debezium.EncodeDecimal(decimal)
return bytes
}

type decimalConverter struct {
scale uint16
precision *int
Expand Down Expand Up @@ -44,7 +83,7 @@ func (d decimalConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

return debezium.EncodeDecimalWithScale(decimal, int32(d.scale)), nil
return encodeDecimalWithScale(decimal, int32(d.scale)), nil
}

type VariableNumericConverter struct{}
Expand Down
2 changes: 1 addition & 1 deletion lib/debezium/converters/money.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ func (m MoneyConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a money value: %w`, valString, err)
}

return debezium.EncodeDecimalWithScale(decimal, int32(m.Scale())), nil
return encodeDecimalWithScale(decimal, int32(m.Scale())), nil
}

0 comments on commit 21c87a9

Please sign in to comment.