Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]support CDC in framework (not only flink cdc, also for spark) #963

Open
Tracked by #720 ...
CalvinKirs opened this issue Jan 6, 2022 · 8 comments
Open
Tracked by #720 ...
Labels
Projects

Comments

@CalvinKirs
Copy link
Member

CalvinKirs commented Jan 6, 2022

support change data capture(CDC) in SeaTunnel both both in flink and spark backend,

  • for flink, we ca integrate flink cdc directly, which is actually debezium.
  • for spark, maybe we can use debezium directly.
@CalvinKirs CalvinKirs changed the title support CDC in framework (not only flink cdc, maybe others) [Feature]support CDC in framework (not only flink cdc, maybe others) Jan 6, 2022
@CalvinKirs CalvinKirs added this to discussion in 2.0 RoadMap Jan 6, 2022
@xtr1993
Copy link
Contributor

xtr1993 commented Jan 7, 2022

cdc is for real-time calculations
flink is compatible with debezium to support CDC
maybe others My understanding is that you want to achieve it in other ways without debezium?

@garyelephant garyelephant changed the title [Feature]support CDC in framework (not only flink cdc, maybe others) [Feature]support CDC in framework (not only flink cdc, also for spark) Jan 8, 2022
@chenhu
Copy link
Contributor

chenhu commented Jan 10, 2022

It would be woh ~ for spark !

@yuangjiang
Copy link

spark cdc is feasible, only need to define a spark source to do a simple test. CDC can be developed based on debezium
similar to this
class DefaultSource extends StreamSourceProvider with DataSourceRegister with Logging {
override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) = {
(shortName(),schema.get)
}

override def createSource(sqlContext: SQLContext, metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = {
val debeziumOffset:DebeziumOffset = new DebeziumOffset
val handover:Handover = new Handover
val changeConsumer:DebeziumChangeConsumer = new DebeziumChangeConsumer(handover)
val debeziumEngine:SparkDebeziumEngine = new SparkDebeziumEngine(debeziumOffset,handover,changeConsumer)
val javaParameters:java.util.Map[String,String] = new util.HashMapString,String
for (parameter <- parameters){
javaParameters.put(parameter._1,parameter._2)
}
debeziumEngine.EngineInit(javaParameters)
new DebeziumSource(sqlContext,schema.get,debeziumOffset,debeziumEngine)
}

override def shortName(): String = "debezium"
}

@obobj
Copy link

obobj commented Feb 21, 2022

Is there any progress ?

@UUIDUsername
Copy link

Can we introduce BloomFilter under the characteristics of timing?

@yuangjiang
Copy link

this is a spark datasource using by seatunnel connector test code like this

object DebeziumTest {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession
.builder()
.master("local[3]")
.appName("debezium")
.getOrCreate()

val structType =
  StructType(
      StructField("a", StringType, nullable = true) ::
      StructField("b", DecimalType(6,2), nullable = true) ::
      StructField("c", DecimalType(22,0), nullable = true) ::
      StructField("d", DecimalType(6,0), nullable = true) ::
      StructField("e", DecimalType(22,0), nullable = true) ::
      StructField("f", DecimalType(6,0), nullable = true) ::
        StructField("g", DecimalType(22,0), nullable = true)::
      StructField("h", StringType, nullable = true) ::
      StructField("op", StringType, nullable = true) ::
      StructField("ts_ms", StringType, nullable = true) ::
        Nil
  )

val parameters = new util.HashMap[String,String]()
parameters.put("database.hostname","localhost")
parameters.put("database.port","3306")
parameters.put("database.user","root")
parameters.put("database.password","123456")
parameters.put("database.include.list","test")
parameters.put("snapshot.mode","schema_only")
parameters.put("table.include.list","temp.ts_cdc_test")
parameters.put("connector.class","io.debezium.connector.mysql.MySqlConnector")
val dataFrame = sparkSession
   .readStream
  .options(parameters)
   .schema(structType)
   .format("org.apache.spark.sql.execution.streaming.debezium.DefaultSource")
   .load()

dataFrame.createOrReplaceTempView("test")
sparkSession.sql("select * from test")
  .writeStream
  .format("console")
  .outputMode(OutputMode.Append())
  .start()
  .awaitTermination()

}
}

can support all debezium connetcor by seatunnel stream mode

@yuangjiang
Copy link

yuangjiang commented Feb 23, 2022 via email

@dijiekstra
Copy link
Contributor

I think we need to upgrade all Row to RowData first in Flink Module

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Discuss
2.0 RoadMap
discussion
Development

No branches or pull requests

8 participants