Skip to content

Commit

Permalink
[Feature][Connector2] Add DingTalk Source apache#2684
Browse files Browse the repository at this point in the history
  • Loading branch information
MRYOG committed Sep 8, 2022
1 parent 18b7db8 commit 18a3a20
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 1 deletion.
54 changes: 54 additions & 0 deletions docs/en/connector-v2/source/dingtalk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# DingTalk

> DinkTalk source connector
## Description

A source plugin which use DingTalk

## Options

| name | type | required | default value |
|-----------| ---------- | -------- | ------------- |
| api_client | string | yes | - |
| access_token | string | yes | - |
| app_key | string | yes | - |
| app_secret | string | yes | - |


### url [string]

DingTalk API address like : https://oapi.dingtalk.com/topapi/v2/department/listsub(string)

### access_token [string]

DingTalk access token [DingTalk Doc](https://open.dingtalk.com/document/orgapp-server/obtain-the-access_token-of-an-internal-app) , the valid period of access token is 7200 seconds , if access token expired , can use app_key and app_secret get new message (string)

### app_key [string]

DingTalk app key (string)

### app_secret [string]

DingTalk app secret (string)

## Example

```hocon
source {
DingTalk {
api_client="https://oapi.dingtalk.com/topapi/v2/department/listsub"
access_token="8c61c395035c37c7812b9b1b1dbecb20"
}
}
or
source {
DingTalk {
api_client="https://oapi.dingtalk.com/topapi/v2/department/listsub"
app_key="dingpsi2nsmw2v"
app_secret="qv5pLes-M9JvHXjaBqkPFdAyk9WKEjDjEZOpLZKHi"
}
}
```
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ seatunnel.source.OssFile = connector-file-oss
seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
seatunnel.sink.DingTalk = connector-dingtalk
seatunnel.source.DingTalk = connector-dingtalk
seatunnel.sink.elasticsearch = connector-elasticsearch
seatunnel.source.IoTDB = connector-iotdb
seatunnel.sink.IoTDB = connector-iotdb
Expand Down
14 changes: 14 additions & 0 deletions seatunnel-connectors-v2/connector-dingtalk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>connector-dingtalk</artifactId>
Expand All @@ -39,6 +40,19 @@
<artifactId>alibaba-dingtalk-service-sdk</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>com.aliyun</groupId>
<artifactId>dingtalk</artifactId>
<version>1.4.26</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-http-base</artifactId>
<version>2.1.3-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.apache.seatunnel.connectors.seatunnel.common;

/**
* @program: incubator-seatunnel
* @description: DingTalk contant
**/

public class DingTalkConstant {
public static final String APP_KEY = "app_key";
public static final String APP_SECRET = "app_secret";
public static final String ACCESS_TOKEN = "access_token";
public static final String API_CLIENT = "api_client";
public static final String DEFAULT_FORMAT = "json";
public static final String SCHEMA = "schema";
public static final String STATUS_OK = "ok";
public static final String BODY_RESULT = "result";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.apache.seatunnel.connectors.seatunnel.common;

import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.Data;

import java.io.Serializable;
import java.util.Map;
import java.util.stream.Collectors;

/**
* @program: incubator-seatunnel
* @description: Ding Talk Parameter
**/

@Data
public class DingTalkParameter implements Serializable {

private String appKey;
private String appSecret;
private String accessToken;
private String apiClient;
private Map<String, String> params;

public void buildParameter(Config pluginConfig, Boolean hasToken) {
if (!hasToken) {
// DingTalk app key
this.setAppKey(pluginConfig.getString(DingTalkConstant.APP_KEY));
// DingTalk app secret
this.setAppSecret(pluginConfig.getString(DingTalkConstant.APP_SECRET));
} else {
// DingTalk app token
this.setAccessToken(pluginConfig.getString(DingTalkConstant.ACCESS_TOKEN));
}
// DingTalk api client
this.setApiClient(pluginConfig.getString(DingTalkConstant.API_CLIENT));
// set params
if (pluginConfig.hasPath(HttpConfig.PARAMS)) {
this.setParams(pluginConfig.getConfig(HttpConfig.PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.apache.seatunnel.connectors.seatunnel.common;

import com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenResponse;

/**
* @program: incubator-seatunnel
* @description: Ding Talk util
**/

public class DingTalkUtil {

/**
* @return Client
*/
public static com.aliyun.dingtalkoauth2_1_0.Client createClient() throws Exception {
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
config.protocol = "https";
config.regionId = "central";
return new com.aliyun.dingtalkoauth2_1_0.Client(config);
}

public static String getAppToken(String appKey, String appSecret) {
String appToken = null;
try {
com.aliyun.dingtalkoauth2_1_0.Client client = DingTalkUtil.createClient();
com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenRequest getAccessTokenRequest = new com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenRequest()
.setAppKey(appKey).setAppSecret(appSecret);
GetAccessTokenResponse res = client.getAccessToken(getAccessTokenRequest);
appToken = res.getBody().getAccessToken();
} catch (Exception e) {
e.printStackTrace();
}
return appToken;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.source;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.DingTalkConstant;
import org.apache.seatunnel.connectors.seatunnel.common.DingTalkParameter;
import org.apache.seatunnel.connectors.seatunnel.common.DingTalkUtil;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.google.auto.service.AutoService;

@AutoService(SeaTunnelSource.class)
public class DingTalkSource extends AbstractSingleSplitSource<SeaTunnelRow> {

protected final DingTalkParameter dtParameter = new DingTalkParameter();
protected SeaTunnelRowType rowType;
protected SeaTunnelContext seaTunnelContext;
protected DeserializationSchema<SeaTunnelRow> deserializationSchema;

@Override
public String getPluginName() {
return "DingTalk";
}

@Override
public Boundedness getBoundedness() {
return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult hasClient = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.API_CLIENT);
if (!hasClient.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, hasClient.getMsg());
}
CheckResult hasToken = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.ACCESS_TOKEN);
if (!hasToken.isSuccess()) {
CheckResult hasKey = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.APP_KEY);
if (!hasKey.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, hasKey.getMsg());
}
CheckResult hasSecret = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.APP_SECRET);
if (!hasSecret.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, hasSecret.getMsg());
}
String appToken = DingTalkUtil.getAppToken(pluginConfig.getString(DingTalkConstant.APP_KEY), pluginConfig.getString(DingTalkConstant.APP_SECRET));
if (null == appToken) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Get App Token Error!");
}
this.dtParameter.setAccessToken(appToken);
}
this.dtParameter.buildParameter(pluginConfig, hasToken.isSuccess());

if (pluginConfig.hasPath(DingTalkConstant.SCHEMA)) {
Config schema = pluginConfig.getConfig(DingTalkConstant.SCHEMA);
this.rowType = SeatunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
} else {
this.rowType = SeatunnelSchema.buildSimpleTextSchema();
}
}

@Override
public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
this.seaTunnelContext = seaTunnelContext;
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.rowType;
}

@Override
public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
return new DingTalkSourceReader(this.dtParameter, readerContext, this.deserializationSchema);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.source;

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.DingTalkConstant;
import org.apache.seatunnel.connectors.seatunnel.common.DingTalkParameter;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;

import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiV2DepartmentListsubRequest;
import com.dingtalk.api.response.OapiV2DepartmentListsubResponse;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class DingTalkSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
private static final Logger LOGGER = LoggerFactory.getLogger(DingTalkSourceReader.class);
protected final SingleSplitReaderContext context;
protected final DingTalkParameter dtParameter;
protected DingTalkClient dtClient;
protected OapiV2DepartmentListsubRequest dtRequest;
protected final DeserializationSchema<SeaTunnelRow> deserializationSchema;

public DingTalkSourceReader(DingTalkParameter dtParameter, SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
this.context = context;
this.dtParameter = dtParameter;
this.deserializationSchema = deserializationSchema;
}

@Override
public void open() {
dtClient = new DefaultDingTalkClient(dtParameter.getApiClient());
dtRequest = new OapiV2DepartmentListsubRequest();
LOGGER.info("Ding Talk Access Token is :" + dtParameter.getAccessToken());
}

@Override
public void close() throws IOException {
}

@Override
public void pollNext(Collector<SeaTunnelRow> output) {
try {
OapiV2DepartmentListsubResponse response = dtClient.execute(dtRequest, dtParameter.getAccessToken());
if (DingTalkConstant.STATUS_OK.equals(response.getErrmsg())) {
String tmpContent = response.getBody();
JsonNode bodyJson = JsonUtils.stringToJsonNode(tmpContent);
JsonNode resJson = bodyJson.get(DingTalkConstant.BODY_RESULT);
if (resJson.isArray()) {
for (JsonNode tmpJson : resJson) {
output.collect(new SeaTunnelRow(new Object[]{tmpJson.toString()}));
}
}
}
LOGGER.error("Ding Talk client execute exception, response status code:[{}], content:[{}]", response.getErrorCode(), response.getBody());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
LOGGER.info("Closed the bounded http source");
context.signalNoMoreElement();
}
}
}
}
Loading

0 comments on commit 18a3a20

Please sign in to comment.