diff --git a/docs/en/connector-v2/sink/dingtalk.md b/docs/en/connector-v2/sink/dingtalk.md new file mode 100644 index 00000000000..eb3ebef9dde --- /dev/null +++ b/docs/en/connector-v2/sink/dingtalk.md @@ -0,0 +1,31 @@ +# DingTalk + +## Description + +A sink plugin which use DingTalk robot send message + +## Options + +| name | type | required | default value | +|------------------------------| ---------- | -------- | ------------- | +| url | string | yes | - | +| secret | string | yes | - | + +### url [string] + +DingTalk robot address format is https://oapi.dingtalk.com/robot/send?access_token=XXXXXX(string) + +### secret [string] + +DingTalk robot secret (string) + +## Example + +```hocon +sink { + DingTalk { + url="https://oapi.dingtalk.com/robot/send?access_token=ec646cccd028d978a7156ceeac5b625ebd94f586ea0743fa501c100007890" + secret="SEC093249eef7aa57d4388aa635f678930c63db3d28b2829d5b2903fc1e5c10000" + } +} +``` diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 51b7b9c3ebf..16b9a08829d 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -106,3 +106,4 @@ seatunnel.sink.HdfsFile = connector-file-hadoop seatunnel.sink.LocalFile = connector-file-local seatunnel.source.Pulsar = connector-pulsar seatunnel.source.Hudi = connector-hudi +seatunnel.sink.DingTalk = connector-dingtalk diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml index fe37965a1b8..e8edcd95305 100644 --- a/seatunnel-connectors-v2-dist/pom.xml +++ b/seatunnel-connectors-v2-dist/pom.xml @@ -96,6 +96,11 @@ connector-hudi ${project.version} + + org.apache.seatunnel + connector-dingtalk + ${project.version} + diff --git a/seatunnel-connectors-v2/connector-dingtalk/pom.xml b/seatunnel-connectors-v2/connector-dingtalk/pom.xml new file mode 100644 index 00000000000..f15aa9aeeb9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-dingtalk/pom.xml @@ -0,0 +1,44 @@ + + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-dingtalk + + + + org.apache.seatunnel + connector-common + ${project.version} + + + + com.aliyun + alibaba-dingtalk-service-sdk + 2.0.0 + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java new file mode 100644 index 00000000000..4a8d3cefa84 --- /dev/null +++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter.Context; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + + +/** + * DingTalk sink class + */ +@AutoService(SeaTunnelSink.class) +public class DingTalkSink extends AbstractSimpleSink { + + private Config pluginConfig; + private SeaTunnelRowType seaTunnelRowType; + private final String dtURL = "url"; + private final String dtSecret = "secret"; + + @Override + public String getPluginName() { + return "DingTalk"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + if (pluginConfig.getIsNull(dtURL)) { + throw new PrepareFailException(getPluginName(), PluginType.SINK, + String.format("Config must include column : %s", dtURL)); + } + if (pluginConfig.getIsNull(dtSecret)) { + throw new PrepareFailException(getPluginName(), PluginType.SINK, + String.format("Config must include column : %s", dtSecret)); + } + this.pluginConfig = pluginConfig; + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return this.seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(Context context) throws IOException { + return new DingTalkWriter(pluginConfig.getString(dtURL), pluginConfig.getString(dtSecret)); + } +} diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java new file mode 100644 index 00000000000..ab2b3468cc1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import com.dingtalk.api.DefaultDingTalkClient; +import com.dingtalk.api.request.OapiRobotSendRequest; +import com.dingtalk.api.response.OapiRobotSendResponse; +import com.taobao.api.ApiException; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +/** + * DingTalk write class + */ +public class DingTalkWriter extends AbstractSinkWriter { + + private RobotClient robotClient; + + public DingTalkWriter(String url, String secret) { + this.robotClient = new RobotClient(url, secret); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + robotClient.send(element.toString()); + } + + @Override + public void close() throws IOException { + + } + + private static class RobotClient implements Serializable { + + private String url; + + private String secret; + + private DefaultDingTalkClient client; + + public RobotClient(String url, String secret) { + this.url = url; + this.secret = secret; + } + + public OapiRobotSendResponse send(String message) throws IOException { + if (null == client) { + client = new DefaultDingTalkClient(getUrl()); + } + OapiRobotSendRequest request = new OapiRobotSendRequest(); + request.setMsgtype("text"); + OapiRobotSendRequest.Text text = new OapiRobotSendRequest.Text(); + text.setContent(message); + request.setText(text); + try { + return this.client.execute(request); + } catch (ApiException e) { + throw new IOException(e); + } + } + + public String getUrl() throws IOException { + Long timestamp = System.currentTimeMillis(); + String sign = getSign(timestamp); + return url + "×tamp=" + timestamp + "&sign=" + sign; + } + + public String getSign(Long timestamp) throws IOException { + try { + String stringToSign = timestamp + "\n" + secret; + Mac mac = Mac.getInstance("HmacSHA256"); + mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256")); + byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8)); + return URLEncoder.encode(Base64.getEncoder().encodeToString(signData), "UTF-8"); + } catch (Exception e) { + throw new IOException(e); + } + } + } + +} + diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 42f67df2c9a..325325cf0c8 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -44,6 +44,7 @@ connector-file connector-hudi connector-assert + connector-dingtalk diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml index 215a586f8a1..28756ccbf0c 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml @@ -66,6 +66,11 @@ connector-socket ${project.version} + + org.apache.seatunnel + connector-dingtalk + ${project.version} + diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java new file mode 100644 index 00000000000..d72d70b2a79 --- /dev/null +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.example.flink.v2; + +import org.apache.seatunnel.core.starter.Seatunnel; +import org.apache.seatunnel.core.starter.command.Command; +import org.apache.seatunnel.core.starter.exception.CommandException; +import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; +import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder; + +import java.io.FileNotFoundException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; + +public class SeaTunnelDingTalkApiExample { + + public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException { + String configFile = getTestConfigFile("/examples/fake_to_dingtalk.conf"); + FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs(); + flinkCommandArgs.setConfigFile(configFile); + flinkCommandArgs.setCheckConfig(false); + flinkCommandArgs.setVariables(null); + Command flinkCommand = + new FlinkCommandBuilder().buildCommand(flinkCommandArgs); + Seatunnel.run(flinkCommand); + } + + public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException { + URL resource = SeaTunnelDingTalkApiExample.class.getResource(configFile); + if (resource == null) { + throw new FileNotFoundException("Can't find config file: " + configFile); + } + return Paths.get(resource.toURI()).toString(); + } +} diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf new file mode 100644 index 00000000000..aed13429043 --- /dev/null +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + field_name = "name,age" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake +} + +transform { + sql { + sql = "select name,age from fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql +} + +sink { + DingTalk { + url="https://oapi.dingtalk.com/robot/send?access_token=ec646cccd028d978a7156ceeac5b625ebd94f586ea0743fa501c100007890" + secret="SEC093249eef7aa57d4388aa635f678930c63db3d28b2829d5b2903fc1e5c10000" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console +} \ No newline at end of file