diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 59c36d8ab14..bee7eaa5380 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -19,14 +19,14 @@
-
+
-
+
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/dingtalk.md b/docs/en/connector-v2/sink/dingtalk.md
new file mode 100644
index 00000000000..eb3ebef9dde
--- /dev/null
+++ b/docs/en/connector-v2/sink/dingtalk.md
@@ -0,0 +1,31 @@
+# DingTalk
+
+## Description
+
+A sink plugin which use DingTalk robot send message
+
+## Options
+
+| name | type | required | default value |
+|------------------------------| ---------- | -------- | ------------- |
+| url | string | yes | - |
+| secret | string | yes | - |
+
+### url [string]
+
+DingTalk robot address format is https://oapi.dingtalk.com/robot/send?access_token=XXXXXX(string)
+
+### secret [string]
+
+DingTalk robot secret (string)
+
+## Example
+
+```hocon
+sink {
+ DingTalk {
+ url="https://oapi.dingtalk.com/robot/send?access_token=ec646cccd028d978a7156ceeac5b625ebd94f586ea0743fa501c100007890"
+ secret="SEC093249eef7aa57d4388aa635f678930c63db3d28b2829d5b2903fc1e5c10000"
+ }
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 51b7b9c3ebf..16b9a08829d 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -106,3 +106,4 @@ seatunnel.sink.HdfsFile = connector-file-hadoop
seatunnel.sink.LocalFile = connector-file-local
seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
+seatunnel.sink.DingTalk = connector-dingtalk
diff --git a/pom.xml b/pom.xml
index 6f4eac23426..f6fa3ffae60 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,7 +87,7 @@
seatunnel-plugin-discoveryseatunnel-formatsseatunnel-dist
- seatunnel-server
+ seatunnel-server
diff --git a/seatunnel-connectors-v2/connector-dingtalk/pom.xml b/seatunnel-connectors-v2/connector-dingtalk/pom.xml
new file mode 100644
index 00000000000..f15aa9aeeb9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-dingtalk/pom.xml
@@ -0,0 +1,44 @@
+
+
+
+
+ seatunnel-connectors-v2
+ org.apache.seatunnel
+ ${revision}
+
+ 4.0.0
+
+ connector-dingtalk
+
+
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
+
+
+ com.aliyun
+ alibaba-dingtalk-service-sdk
+ 2.0.0
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
new file mode 100644
index 00000000000..c509912b290
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.seatunnel.connectors.seatunnel.sink;
+
+ import org.apache.seatunnel.api.common.PrepareFailException;
+ import org.apache.seatunnel.api.sink.SeaTunnelSink;
+ import org.apache.seatunnel.api.sink.SinkWriter;
+ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+ import org.apache.seatunnel.common.constants.PluginType;
+ import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+ import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+ import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+ import com.google.auto.service.AutoService;
+
+ import java.io.IOException;
+
+ /**
+ * DingTalk sink class
+ */
+ @AutoService(SeaTunnelSink.class)
+ public class DingTalkSink extends AbstractSimpleSink {
+ private Config pluginConfig;
+ private SeaTunnelRowType seaTunnelRowType;
+ private final String CONSTANT_URL="url";
+ private final String CONSTANT_SECRET="secret";
+ @Override
+ public String getPluginName() {
+ return "DingTalk";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ if(pluginConfig.getIsNull(CONSTANT_URL)){
+ throw new PrepareFailException(getPluginName(), PluginType.SINK, String.format("Config must include column : %s", CONSTANT_URL));
+ }
+ if(pluginConfig.getIsNull(CONSTANT_SECRET)){
+ throw new PrepareFailException(getPluginName(), PluginType.SINK, String.format("Config must include column : %s", CONSTANT_SECRET));
+ }
+ this.pluginConfig = pluginConfig;
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java
new file mode 100644
index 00000000000..cf6dc136085
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sink;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import com.dingtalk.api.DefaultDingTalkClient;
+import com.dingtalk.api.request.OapiRobotSendRequest;
+import com.dingtalk.api.response.OapiRobotSendResponse;
+import com.taobao.api.ApiException;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URLEncoder;
+import java.util.Base64;
+
+/**
+ * DingTalk write class
+ */
+public class DingTalkWriter extends AbstractSinkWriter {
+
+ private RobotClient robotClient;
+
+ public DingTalkWriter(String url, String secret) {
+ this.robotClient = new RobotClient(url, secret);
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ robotClient.send(element.toString());
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ private static class RobotClient implements Serializable {
+
+ private String url;
+
+ private String secret;
+
+ private DefaultDingTalkClient client;
+
+ public RobotClient(String url, String secret) {
+ this.url = url;
+ this.secret = secret;
+ }
+
+ public OapiRobotSendResponse send(String message) throws IOException {
+ if (null == client) {
+ client = new DefaultDingTalkClient(getUrl());
+ }
+ OapiRobotSendRequest request = new OapiRobotSendRequest();
+ request.setMsgtype("text");
+ OapiRobotSendRequest.Text text = new OapiRobotSendRequest.Text();
+ text.setContent(message);
+ request.setText(text);
+ try {
+ return this.client.execute(request);
+ } catch (ApiException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public String getUrl() {
+ Long timestamp = System.currentTimeMillis();
+ String sign = getSign(timestamp);
+ return url + "×tamp=" + timestamp + "&sign=" + sign;
+ }
+
+ public String getSign(Long timestamp) {
+ try {
+ String stringToSign = timestamp + "\n" + secret;
+ Mac mac = Mac.getInstance("HmacSHA256");
+ mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
+ byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
+ return URLEncoder.encode(Base64.getEncoder().encodeToString(signData), "UTF-8");
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index ba947e30e55..73e55a1db8f 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -44,6 +44,7 @@
connector-fileconnector-hudiconnector-assert
+ connector-dingtalk
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 215a586f8a1..28756ccbf0c 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -66,6 +66,11 @@
connector-socket${project.version}
+
+ org.apache.seatunnel
+ connector-dingtalk
+ ${project.version}
+
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java
new file mode 100644
index 00000000000..0512e5abbdd
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.flink.v2;
+
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder;
+
+public class SeaTunnelDingTalkApiExample {
+
+ public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
+ String configFile = getTestConfigFile("/examples/fake_to_dingtalk.conf");
+ FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+ flinkCommandArgs.setConfigFile(configFile);
+ flinkCommandArgs.setCheckConfig(false);
+ flinkCommandArgs.setVariables(null);
+ Command flinkCommand =
+ new FlinkCommandBuilder().buildCommand(flinkCommandArgs);
+ Seatunnel.run(flinkCommand);
+ }
+
+ public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
+ URL resource = SeaTunnelDingTalkApiExample.class.getResource(configFile);
+ if (resource == null) {
+ throw new FileNotFoundException("Can't find config file: " + configFile);
+ }
+ return Paths.get(resource.toURI()).toString();
+ }
+}
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
new file mode 100644
index 00000000000..aed13429043
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ DingTalk {
+ url="https://oapi.dingtalk.com/robot/send?access_token=ec646cccd028d978a7156ceeac5b625ebd94f586ea0743fa501c100007890"
+ secret="SEC093249eef7aa57d4388aa635f678930c63db3d28b2829d5b2903fc1e5c10000"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file