diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java index 7b1721b3..dd6b8bce 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java @@ -230,7 +230,7 @@ public void run() { throw e; } catch (Throwable e) { state.set(WorkerTaskState.ERROR); - log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e); + log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e); connectStatsManager.incSinkRecordPutTotalFailNums(); connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); } @@ -373,7 +373,7 @@ private void pullMessageFromQueues() throws InterruptedException { pullMsgErrorCount = 0; } catch (MQClientException e) { pullMsgErrorCount++; - log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e); + log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e); connectStatsManager.incSinkRecordReadTotalFailNums(); connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp; @@ -381,7 +381,7 @@ private void pullMessageFromQueues() throws InterruptedException { connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT); } catch (RemotingException e) { pullMsgErrorCount++; - log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e); + log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e); connectStatsManager.incSinkRecordReadTotalFailNums(); connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp; @@ -389,7 +389,7 @@ private void pullMessageFromQueues() throws InterruptedException { connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT); } catch (MQBrokerException e) { pullMsgErrorCount++; - log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e); + log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e); connectStatsManager.incSinkRecordReadTotalFailNums(); connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp; @@ -397,7 +397,7 @@ private void pullMessageFromQueues() throws InterruptedException { connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT); } catch (InterruptedException e) { pullMsgErrorCount++; - log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e); + log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e); connectStatsManager.incSinkRecordReadTotalFailNums(); connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp; @@ -406,7 +406,7 @@ private void pullMessageFromQueues() throws InterruptedException { throw e; } catch (Throwable e) { pullMsgErrorCount++; - log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e); + log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e); connectStatsManager.incSinkRecordReadTotalFailNums(); connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp; @@ -430,7 +430,7 @@ private void pullMessageFromQueues() throws InterruptedException { if (messageQueuesOffsetMap.containsKey(entry.getKey())) { messageQueuesOffsetMap.put(entry.getKey(), pullResult.getNextBeginOffset()); } else { - log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey())); + log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey())); } try { consumer.updateConsumeOffset(entry.getKey(), pullResult.getNextBeginOffset()); @@ -570,7 +570,7 @@ private ConnectRecord convertToSinkDataEntry(MessageExt message) { for (Map.Entry entry : properties.entrySet()) { if (MQ_SYS_KEYS.contains(entry.getKey())) { keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue()); - } else if (entry.getKey().startsWith("connect-ext-")){ + } else if (entry.getKey().startsWith("connect-ext-")) { keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue()); } else { keyValue.put(entry.getKey(), entry.getValue());