From be605617c3d36df9952963e729ac2c2ec2c714a3 Mon Sep 17 00:00:00 2001 From: Chuanyi Li Date: Thu, 5 Jul 2018 20:24:42 +0800 Subject: [PATCH] =?UTF-8?q?Parallel=E5=8F=82=E6=95=B0=E7=BB=86=E8=8A=82?= =?UTF-8?q?=E7=9A=84=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deployer/src/main/resources/canal.properties | 4 +++- .../src/main/resources/spring/default-instance.xml | 2 +- deployer/src/main/resources/spring/file-instance.xml | 2 +- .../src/main/resources/spring/group-instance.xml | 4 ++-- .../src/main/resources/spring/local-instance.xml | 2 +- .../src/main/resources/spring/memory-instance.xml | 2 +- .../canal/parse/inbound/AbstractEventParser.java | 12 +++++++----- 7 files changed, 16 insertions(+), 12 deletions(-) diff --git a/deployer/src/main/resources/canal.properties b/deployer/src/main/resources/canal.properties index 90050148f8..23ec03a180 100644 --- a/deployer/src/main/resources/canal.properties +++ b/deployer/src/main/resources/canal.properties @@ -53,7 +53,9 @@ canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true -canal.instance.parser.parallelThreadSize = 16 +## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() +#canal.instance.parser.parallelThreadSize = 16 +## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 ################################################# diff --git a/deployer/src/main/resources/spring/default-instance.xml b/deployer/src/main/resources/spring/default-instance.xml index 31299e6682..c4465df1ae 100644 --- a/deployer/src/main/resources/spring/default-instance.xml +++ b/deployer/src/main/resources/spring/default-instance.xml @@ -195,7 +195,7 @@ - + diff --git a/deployer/src/main/resources/spring/file-instance.xml b/deployer/src/main/resources/spring/file-instance.xml index 494f6c87e8..340a81584e 100644 --- a/deployer/src/main/resources/spring/file-instance.xml +++ b/deployer/src/main/resources/spring/file-instance.xml @@ -180,7 +180,7 @@ - + diff --git a/deployer/src/main/resources/spring/group-instance.xml b/deployer/src/main/resources/spring/group-instance.xml index f1add0324a..60d069dcae 100644 --- a/deployer/src/main/resources/spring/group-instance.xml +++ b/deployer/src/main/resources/spring/group-instance.xml @@ -169,7 +169,7 @@ - + @@ -268,7 +268,7 @@ - + diff --git a/deployer/src/main/resources/spring/local-instance.xml b/deployer/src/main/resources/spring/local-instance.xml index 3148572d4d..0b2f96aa92 100644 --- a/deployer/src/main/resources/spring/local-instance.xml +++ b/deployer/src/main/resources/spring/local-instance.xml @@ -140,7 +140,7 @@ - + \ No newline at end of file diff --git a/deployer/src/main/resources/spring/memory-instance.xml b/deployer/src/main/resources/spring/memory-instance.xml index 7404229ced..8fec89182c 100644 --- a/deployer/src/main/resources/spring/memory-instance.xml +++ b/deployer/src/main/resources/spring/memory-instance.xml @@ -168,7 +168,7 @@ - + diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java index 39da01639f..54bb14e758 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java @@ -90,9 +90,9 @@ public void uncaughtException(Thread t, protected boolean isGTIDMode = false; // 是否是GTID模式 protected boolean parallel = true; // 是否开启并行解析模式 - protected int parallelThreadSize = Runtime.getRuntime() + protected Integer parallelThreadSize = Runtime.getRuntime() .availableProcessors() * 60 / 100; // 60%的能力跑解析,剩余部分处理网络 - protected int parallelBufferSize = 16 * parallelThreadSize; + protected int parallelBufferSize = 256; // 必须为2的幂 protected MultiStageCoprocessor multiStageCoprocessor; protected abstract BinlogParser buildParser(); @@ -595,11 +595,13 @@ public int getParallelThreadSize() { return parallelThreadSize; } - public void setParallelThreadSize(int parallelThreadSize) { - this.parallelThreadSize = parallelThreadSize; + public void setParallelThreadSize(Integer parallelThreadSize) { + if (parallelThreadSize != null) { + this.parallelThreadSize = parallelThreadSize; + } } - public int getParallelBufferSize() { + public Integer getParallelBufferSize() { return parallelBufferSize; }