Skip to content

Commit

Permalink
[ISSUE #4420] Add Feishu Sink connector (#4522)
Browse files Browse the repository at this point in the history
* [ISSUE #4420] Add Feishu Sink connector

* [ISSUE #4420] Add Feishu Sink connector

* [ISSUE #4420] Add Feishu Sink connector

* [ISSUE #4420] Add Feishu Sink connector

* [ISSUE #4420] Add Feishu Sink connector,code style

* [ISSUE #4420] Add Feishu Sink connector,check style

* [ISSUE #4420] Add Feishu Sink connector,mock test

* [ISSUE #4420] Add Feishu Sink connector,mock test

* [ISSUE #4420] Add Feishu Sink connector,mock test
  • Loading branch information
SunnyBoy-WYH committed Nov 19, 2023
1 parent 715423c commit 5a8cd37
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,14 @@ public class Constants {

public static final String DEFAULT = "default";

public static final String FEISHU_SEND_MESSAGE_API = "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=";

public static final String FEISHU_RECEIVE_ID = "receive_id";

public static final String FEISHU_MSG_TYPE = "msg_type";

public static final String FEISHU_CONTENT = "content";

public static final String FEISHU_UUID = "uuid";

}
33 changes: 33 additions & 0 deletions eventmesh-connectors/eventmesh-connector-feishu/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

List feishu = [
"com.larksuite.oapi:oapi-sdk:$feishu_version",
"com.github.rholder:guava-retrying:$guava_retrying_version",
"org.apache.httpcomponents:httpclient",
project(":eventmesh-common")
]

dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation feishu
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'

testImplementation "org.mockito:mockito-core"
testImplementation "org.mockito:mockito-junit-jupiter"
}
18 changes: 18 additions & 0 deletions eventmesh-connectors/eventmesh-connector-feishu/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
feishu_version=2.0.28
guava_retrying_version=2.0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.feishu.server;

import org.apache.eventmesh.connector.feishu.sink.connector.FeishuSinkConnector;
import org.apache.eventmesh.openconnect.Application;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FeishuConnectServer {

public static void main(String[] args) throws Exception {

Application feishuSinkApp = new Application();
feishuSinkApp.run(FeishuSinkConnector.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.feishu.sink.config;

import org.apache.eventmesh.openconnect.api.config.SinkConfig;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class FeishuSinkConfig extends SinkConfig {

public SinkConnectorConfig connectorConfig;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.feishu.sink.config;

import lombok.Data;

@Data
public class SinkConnectorConfig {

private String connectorName;

private String receiveId;

private String appId;

private String appSecret;

private String receiveIdType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.feishu.sink.connector;

import static org.apache.eventmesh.common.Constants.FEISHU_CONTENT;
import static org.apache.eventmesh.common.Constants.FEISHU_MSG_TYPE;
import static org.apache.eventmesh.common.Constants.FEISHU_RECEIVE_ID;
import static org.apache.eventmesh.common.Constants.FEISHU_SEND_MESSAGE_API;
import static org.apache.eventmesh.common.Constants.FEISHU_UUID;

import org.apache.eventmesh.connector.feishu.sink.config.FeishuSinkConfig;
import org.apache.eventmesh.connector.feishu.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.lark.oapi.Client;
import com.lark.oapi.core.response.RawResponse;
import com.lark.oapi.core.token.AccessTokenType;
import com.lark.oapi.service.im.v1.enums.MsgTypeEnum;
import com.lark.oapi.service.im.v1.model.ext.MessageText;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FeishuSinkConnector implements Sink {

private FeishuSinkConfig sinkConfig;

private Client feishuClient;

private static final int MAX_RETRY_TIME = 3;

private static final int FIXED_WAIT_SECOND = 1;

private final Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder().retryIfException().retryIfResult(res -> !res)
.withWaitStrategy(WaitStrategies.fixedWait(FIXED_WAIT_SECOND, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIME)).withRetryListener(new RetryListener() {

@Override
public <V> void onRetry(Attempt<V> attempt) {
long times = attempt.getAttemptNumber();
log.warn("retry invoke http,times={}", times);
}
}).build();

@Override
public Class<? extends Config> configClass() {
return FeishuSinkConfig.class;
}

@Override
public void init(Config config) throws Exception {
// init config for feishu sink connector
this.sinkConfig = (FeishuSinkConfig) config;
doInit();
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
// init config for feishu sink connector
SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext;
this.sinkConfig = (FeishuSinkConfig) sinkConnectorContext.getSinkConfig();
doInit();
}

private void doInit() {
this.feishuClient = Client.newBuilder(this.getConfig().getAppId(), this.getConfig().getAppSecret()).requestTimeout(3, TimeUnit.SECONDS)
.build();
}

@Override
public void start() {
}

@Override
public void commit(ConnectRecord record) {

}

@Override
public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}

@Override
public void stop() {
}

@Override
public void put(List<ConnectRecord> sinkRecords) {
SinkConnectorConfig connectorConfig = getConfig();
try {
for (ConnectRecord connectRecord : sinkRecords) {
AtomicReference<RawResponse> response = new AtomicReference<>();
retryer.call(() -> {
Map<String, Object> body = new HashMap<>();
body.put(FEISHU_RECEIVE_ID, connectorConfig.getReceiveId());
body.put(FEISHU_CONTENT, MessageText.newBuilder().text(connectRecord.getData().toString()).build());
body.put(FEISHU_MSG_TYPE, MsgTypeEnum.MSG_TYPE_TEXT.getValue());
body.put(FEISHU_UUID, UUID.randomUUID().toString());
response.set(feishuClient.post(FEISHU_SEND_MESSAGE_API + connectorConfig.getReceiveIdType(), body, AccessTokenType.Tenant));
if (response.get().getStatusCode() != 200) {
log.error("request feishu open api err{}", new String(response.get().getBody()));
return false;
}
return true;
});
}
} catch (Exception e) {
log.error("failed to put message to feishu", e);
}
}

public SinkConnectorConfig getConfig() {
return this.sinkConfig.getConnectorConfig();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pubSubConfig:
meshAddress: 127.0.0.1:10000
subject: TopicTest
idc: FT
env: PRD
group: feishuSink
appId: 5031
userName: feishuSinkUser
passWord: feishuPassWord
connectorConfig:
connectorName: feishuSink
reciveId: reciveIdValue
reciveType: open_id
Loading

0 comments on commit 5a8cd37

Please sign in to comment.