Skip to content

[Bug] Flink CDC Schema Compatibility Check: Incorrect Argument Order in canConvert Call #5640

Description

@0dunay0

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

1.1.1

Compute Engine

Flink

Minimal reproduce step

Consider these two avro schemas where the first is evolved into the second by promoting primitives to larger sized ones.

{
    "type": "record",
    "name": "primitives_promotion",
    "namespace": "some_ns",
    "doc": "some stuff",
    "fields": [
    {
      "name": "int_and_long",
      "type": "int",
      "doc": "some stuff"
    },
    {
      "name": "float_and_double",
      "type": "float",
      "doc": "some stuff"
    }
  ]
}
{
    "type": "record",
    "name": "primitives_promotion",
    "namespace": "some_ns",
    "doc": "some stuff",
    "fields": [
    {
      "name": "int_and_long",
      "type": "long",   //  <-- int promoted to long (BIGINT in Paimon)
      "doc": "some stuff"
    },
    {
      "name": "float_and_double",
      "type": "double",  // <-- float promoted to double
      "doc": "some stuff"
    }
  ]
}
# This evolution is compatible in Avro

from avro import schema
from avro.compatibility import SchemaCompatibilityResult, ReaderWriterCompatibilityChecker

avsc1 = schema.from_path('s1.json')
avsc2 = schema.from_path('s2.json')

rwc = ReaderWriterCompatibilityChecker()
result = rwc.get_compatibility(avsc2, avsc1)

print(result.compatibility)
SchemaCompatibilityType.compatible

print(result.incompatibilities)
[]

Start an ingestion job with a kafka topic with the first schema then switch to second schema you should get errors like this.

2025-05-20 03:13:26,649 INFO  org.apache.paimon.flink.action.cdc.CdcActionCommonUtils      [] - Cannot convert field 'int_and_long' from source table type 'BIGINT' to Paimon type 'INT'.
2025-05-20 03:13:26,650 ERROR org.apache.flink.client.cli.CliFrontend                      [] - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Paimon schema and source table schema are not compatible.
Paimon fields are: [`int_and_long` INT, `float_and_double` FLOAT].
Source table fields are: [`int_and_long` BIGINT, `float_and_double` DOUBLE].

What doesn't meet your expectations?

There appears to be a logic error in the schema compatibility check implementation in the CDC synchronization functionality. The schemaCompatible method in CdcActionCommonUtils is incorrectly checking whether source field types can be converted to Paimon table field types, rather than checking if the Paimon table can evolve to accommodate the source field types.

This bug prevents valid schema evolution scenarios. If a Paimon table has an INT column and the source data has a BIGINT column, the current code calls canConvert(BIGINT, INT). This returns IGNORE because a larger type can't be downcast to a smaller one. However, this is a valid evolution case - the Paimon table should evolve from INT to BIGINT to accommodate the source data.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions