Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc 监控oracle 数据 延迟高 #609

Closed
TYangXiaoChen opened this issue Nov 16, 2021 · 8 comments
Closed

cdc 监控oracle 数据 延迟高 #609

TYangXiaoChen opened this issue Nov 16, 2021 · 8 comments
Labels
bug Something isn't working

Comments

@TYangXiaoChen
Copy link

Oracle 数据修改后 cdc获取数据 大概在3~4分钟之间获取到数据 并输出

环境: docker 搭建的 oracle11g 镜像:registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
版本 flink-cdc :2.1. flink 版本 2.12 本地环境

@TYangXiaoChen TYangXiaoChen added the bug Something isn't working label Nov 16, 2021
@TYangXiaoChen
Copy link
Author

package con.firesoon

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.SourceFunction
import com.ververica.cdc.debezium.{DebeziumSourceFunction, JsonDebeziumDeserializationSchema}
import com.ververica.cdc.connectors.oracle.OracleSource
import com.ververica.cdc.connectors.oracle.table.StartupOptions

import java.util.Properties

/**

  • @author Yangxiaochen

  • @Date 15/11/2021 17:46

  • @Version 1.0
    */
    object OracleSourceExample {
    def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("database.tablename.case.insensitive","false")
    val source: DebeziumSourceFunction[String] = OracleSource.builderString
    .hostname("172.16.16.242")
    .port(1521)
    .database("helowin") // monitor XE database
    .schemaList("FLINKUSER") // monitor inventory schema
    .tableList("FLINKUSER.DIFF_INFO") // monitor products table
    .username("flinkuser")
    .password("flinkpw")
    //.startupOptions(StartupOptions.latest())
    .debeziumProperties(properties)
    .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
    .build();

    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    env
    .addSource(source)
    .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute();
    }
    }

@A-little-bit-of-data
Copy link

确实延迟比较高,我是用的sql client

@mustang2247
Copy link
Contributor

同上

@Rowen110
Copy link

环境说明:flink1.13.1、oracle11g

flinksql cdc client 也发现数据延迟比较大

@gtk96
Copy link
Contributor

gtk96 commented Nov 30, 2021

我直接使用debezium 采集也是有延迟的。

@star-brilliant
Copy link

同上

@leonardBang
Copy link
Contributor

Could you use English for better communication? feel free to reopen once updated the description.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

8 participants