Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Avro's decimal #69

Merged
merged 11 commits into from
Jun 20, 2023
Merged

Support Avro's decimal #69

merged 11 commits into from
Jun 20, 2023

Conversation

aymkhalil
Copy link
Collaborator

@aymkhalil aymkhalil commented May 31, 2023

This patch adds support for an AVRO only type: decimal.

It would enable conversion from other types like CQL Decimal available in C* CDC.

For example, the use can use an ARRO decimal aware sink with an upstream topic that has a CQL Decimal type like the following:

{
  "name": "longitude",
  "type": [
    "null",
    {
      "type": "record",
      "name": "cql_decimal",
      "namespace": "",
      "fields": [
        {
          "name": "bigint",
          "type": "bytes"
        },
        {
          "name": "scale",
          "type": "int"
        }
      ],
      "logicalType": "cql_decimal"
    }
  ]
}

can be converted to Avro's decimal with the following converion:

{
    "steps": [
        {
            "type": "compute",
            "fields": [
                {
                    "name": "value.longitude",
                    "expression": "fn:decimal(value.longitude.bigint, value.longitude.scale)",
                    "type": "DECIMAL"
                }
            ]
        }
    ]
}

The output schema of the conversion would look like this:

{
  "type": "bytes",
  "logicalType": "decimal",
  "precision": 4, // would match the BigDecimal.precision that is retrievable after building the BigDecimal the takes bytes + scale as input (but not precision)
  "scale": 2 // would match the scale in the CQL decimal type
}

In order to support the conversion, a new function is registered with the following sig:

decimal(byte[] bytes, int scale) --> BigDecimal

Copy link
Collaborator

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

I wonder if it would work better 'toDecimal' instead of 'decimal'

a second thought:
probably the table has many columns of this type, probably we could have ALSO a specific step 'convert Cassandra CDC logical types' that does the conversion for all the fields.

it will help a lot because it would be a no Brainerd and a simple checkbox in the UI

@aymkhalil aymkhalil marked this pull request as ready for review May 31, 2023 07:49
@aymkhalil aymkhalil requested a review from cbornet May 31, 2023 07:49
Copy link

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Amazing work @aymkhalil !

README.md Outdated Show resolved Hide resolved
@aymkhalil
Copy link
Collaborator Author

I wonder if it would work better 'toDecimal' instead of 'decimal'

Actually I originally had it toBigDecimal (but toDecimal is a better one) - then I realized we had toString that we expose to user as str and we don't have to* convention so I thought decimal is more inline with the convention. I'm open to both

a second thought: probably the table has many columns of this type, probably we could have ALSO a specific step 'convert Cassandra CDC logical types' that does the conversion for all the fields.
it will help a lot because it would be a no Brainerd and a simple checkbox in the UI

Yeah you are right this conversion could be reused multiple resulting in duplicate steps. It is like a "reasonable" work around for the CDC issue in specific. I was avoiding adding too specific of a conversion step to keep the transformation generic enough. I actually thought of adding another step like compute (but called computeSchema) that would define a schema conversion rule and is not specific to a particular field. For example:

{
    "steps": [
        {
            "type": "schema", // indicates that this step is a schema transformation step. Could be named "computeSchema" to draw analogy with the compute step 
            "fields": [
                {
                    "name": "cql_decimal", // the name of the schema to be transformed
                    "type": "record", // type of the schema. "record" it indicates the fields names are nested
                    "output": { // the desired output schema, could be named outputSchema or newSchema to be more explicit 
                        "type": "bytes", 
                        "value": "bigint", // another way to referece those is to use dot notations like cql_decimal.bigint or record.bigint. This would leverage the Expression Language for completeness
                        "logicalType": "decimal",
                        "precision": 4, // use can hard code stuff directly 
                        "scale": "scale" // again, could be cq_decimal.scale or record.scale
                    }
                }
            ]
        }
    ]
}

but it turned out more complex that I though, however I can pursue this direction if we see value in it

@cbornet what are your thoughts regarding both points above?

@aymkhalil
Copy link
Collaborator Author

aymkhalil commented May 31, 2023

I realized that in practice, BigDecimal would be of variable precision/scale. So what will happen in this patch is if scale changes and we reuse the schema, validation could pass if no rounding is required: https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/Conversions.java#L122-L132, if we cache per field, then compatibility rules would apply.

I'm not sure how practical the changes in the PR would be, yes it conforms to AVRO's logical type, but may not be very useful to CDC customers in particular (they could still use STRING conversion which makes the JSTL function useful, but not necessary the DECIMAL type it self.

@@ -239,6 +245,12 @@ protected byte[] coerceToBytes(Object value) {
if (value instanceof byte[]) {
return (byte[]) value;
}
if (value instanceof ByteBuffer) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we get ByteBuffer ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It comes from the custom CDC CQL Type (the bigint part is encoded as ByteBuffer) so I mimicked the same in the tests. The coerceToBytes from ByteBuffer will kick in when expression like value.cqlDecimalField.bigint are evaluated

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is only for tests ? Can we use instead the type that we'll get from the AVRO deserializer (probably byte[] ?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only for testing. If the byte[] was encoded as ByteBuffer, the AVRO deserializer will return ByteBuffer even if the schema type is bytes.

This how CDC code converts it: https://github.com/datastax/cdc-apache-cassandra/blob/cd64bdd03af7a608687b6c54aa614021c62d8027/commons/src/main/java/com/datastax/oss/cdc/CqlLogicalTypes.java#L146

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how the info of which structure was used byte[]/ByteBuffer during serialization would be in the AVRO record...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized the conversation from ByteBuffer had been removed from the coerceToBytes and is now limited to coerceToBigInteger only. However, I'm not sure how the AVRO recognizes ByteBuffer under the hoods.

@aymkhalil
Copy link
Collaborator Author

@cbornet I addressed your feedback, PTAL
However, I think the DECMAL type (although compliant with AVRO) is not practical, here is why in context of CDC:

create table ks1.d2 (id int primary key, v1 decimal) with cdc=true;
insert into ks1.d2(id, v1) values (1, 1.334);

assume the following transform is in place:

{
   "steps":[
      {
         "type":"compute",
         "fields":[
            {
               "name":"value.v2",
               "expression":"fn:decimal(value.v1.bigint, value.v1.scale)",
               "type":"DECIMAL"
            }
         ]
      }
   ]
}

works fine, results in the following schema:

{
        "name": "v2",
        "type": [
          "null",
          {
            "type": "bytes",
            "logicalType": "decimal",
            "precision": 4,
            "scale": 3
          }
        ]
      }

now decreasing the scale works fine as well (without changing the schema on the topic):

 insert into ks1.d2(id, v1) values (1, 1.3);

however, increasing the scale is problematic:

insert into ks1.d2(id, v1) values (1, 1.33499);

will fail with

org.apache.avro.AvroTypeException: Cannot encode decimal with scale 5 as scale 3 without rounding in field v2

so basically depending on the very first decimal that was received, the scale will limit all future decimal values which is impractical. Otherwise, this patch is technically correct. Let me know WDYT.

@aymkhalil aymkhalil requested a review from cbornet June 1, 2023 22:49
@cbornet
Copy link
Collaborator

cbornet commented Jun 19, 2023

so basically depending on the very first decimal that was received, the scale will limit all future decimal values which is impractical.

Is this bc it sets the schema on the topic with the first scale received ?

@aymkhalil
Copy link
Collaborator Author

so basically depending on the very first decimal that was received, the scale will limit all future decimal values which is impractical.

Is this bc it sets the schema on the topic with the first scale received ?

Exactly.

README.md Outdated Show resolved Hide resolved
README.md Outdated Show resolved Hide resolved
@cbornet
Copy link
Collaborator

cbornet commented Jun 20, 2023

LGTM.
We can rebase and merge.
As discussed it would be useful to have a decimalFromNumberWithScale(decimal, scale) to be able to fix the scale of the output in CDC but it can be done in a subsequent PR.

@aymkhalil
Copy link
Collaborator Author

LGTM. We can rebase and merge. As discussed it would be useful to have a decimalFromNumberWithScale(decimal, scale) to be able to fix the scale of the output in CDC but it can be done in a subsequent PR.

Issue for tracking: #75

@aymkhalil aymkhalil merged commit ff3fa27 into master Jun 20, 2023
@cbornet cbornet deleted the avro-decimal branch June 24, 2023 14:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants