Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Mysql-cdc] send Mysql data to kafka #5010

Closed
2 of 3 tasks
CamilleInGitHub opened this issue Jul 3, 2023 · 2 comments
Closed
2 of 3 tasks

[Bug] [Mysql-cdc] send Mysql data to kafka #5010

CamilleInGitHub opened this issue Jul 3, 2023 · 2 comments

Comments

@CamilleInGitHub
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

when use seatunnel engine to implement Mysql-cdc output to kafka, then suffer from an exception:

2023-06-30 21:01:26,785 ERROR org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation - [localhost]:5801[seatunnel-456505][5.1] cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) ~[?:1.8.0_171]
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java: 1405) ~[?:1.8.0_171]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2291) ~[?:1.8.0_171]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: 2209) ~[?:1.8.0_171]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2067) ~[?:1.8.0_171]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java: 1571) ~[?:1.8.0_171]
at java.io.ObjectInputStream.readArray(ObjectInputStream.java: 1973) ~[?:1.8.0_171]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java: 1565) [?:1.8.0_171]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java: 431) [?:1.8.0_171]
at org.apache.seatunnel.common.utils.SerializationUtils.deserialize(SerializationUtils.java:100) [classes/:?]
at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.lambda$run$1(AssignSplitOperation.java:64)
[classes/:?]
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
[classes/:?]
at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.run(AssignSplitOperation.java:55)
[classes/:?]
at com.hazelcast.spi.impl.operationService.Operation.call(Operation.java:189) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationService.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationService.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationService.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) [hazelcast-5.1.jar:5.1]
at com.hazelcast.internal.util.executor.hazelcastManagedThread.run(HazelcastManagedThread.java:102) [hazelcast-5.1.jar:5.1]

SeaTunnel Version

2.3.2

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    result_table_name = "table1"

    username = "root"
    password = "root"
    database-names = ["test"]
    table-names = ["test.source_teacher"]
    base-url = "jdbc:mysql://10.1.4.8:3306/test"
    "startup.mode"=INITIAL
    "stop.mode"="never"
    catalog {
       factory=MySQL
    }

    format = compatible_debezium_json
    debezium = {
       # include schema into kafka message
       key.converter.schemas.enable = false
       value.converter.schemas.enable = false
       # include dd1
       include.schema.changes = true
       # topic.prefix
       database.server.name = "mysql_cdc_1"
    }
    #compatible_debezium_json fixed schema
    schema = {
       fields = {
           topic = string
           key = string
           value = string
       }
    }
  }
   #If you would like to get more information about how to configure Seatunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/category/source-v2
}

sink {
   kafka {
      source_table_name = "table1"
      topic = "BDSP_DETAIL_O7"
      bootstrap.servers = "122.42.201.124:9092"
      format = json
      kafka.request.timeout.ms = 50000
      kafka.config = {
          acks = "all"
          request.timeout.ms = 70000
          buffer.memory = 335544432
          security.protocol=SASL_PLAINTEXT
          sasl.mechanism=SCREAM-SHA-512
          sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=\"root\"\npassword=\"root\";"  
      }
   } 
}

Running Command

--master local --config D:\seaTunnelConfig\Mysql-CDCToKafka.conf

Error Exception

2023-06-30 21:01:26,785 ERROR org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation - [localhost]:5801[seatunnel-456505][5.1] cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit 
java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit 
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) ~[?:1.8.0_171]
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java: 1405) ~[?:1.8.0_171]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2291) ~[?:1.8.0_171]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: 2209) ~[?:1.8.0_171]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2067) ~[?:1.8.0_171]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java: 1571) ~[?:1.8.0_171]
at java.io.ObjectInputStream.readArray(ObjectInputStream.java: 1973) ~[?:1.8.0_171]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java: 1565) ~[?:1.8.0_171]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java: 431) ~[?:1.8.0_171]
at org.apache.seatunnel.common.utils.SerializationUtils.deserialize(SerializationUtils.java:100) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.lambda$run$1(AssignSplitOperation.java:64)~[classes/:?]
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)~[classes/:?]
at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.run(AssignSplitOperation.java:55)~[classes/:?]
at com.hazelcast.spi.impl.operationService.Operation.call(Operation.java:189) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationService.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationService.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationService.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) [hazelcast-5.1.jar:5.1]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) [hazelcast-5.1.jar:5.1]
at com.hazelcast.internal.util.executor.hazelcastManagedThread.run(HazelcastManagedThread.java:102) [hazelcast-5.1.jar:5.1]

Flink or Spark Version

no

Java or Scala Version

jdk 1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@github-actions
Copy link

github-actions bot commented Aug 3, 2023

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

@github-actions github-actions bot added the stale label Aug 3, 2023
@github-actions
Copy link

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant