diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsCallback.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsCallback.java new file mode 100644 index 00000000000..9f900bef4e9 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsCallback.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.cls; + +import com.tencentcloudapi.cls.producer.Callback; +import com.tencentcloudapi.cls.producer.Result; +import org.apache.flume.Transaction; +import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; +import org.slf4j.Logger; + +/** + * Implementation of CLS {@link Callback}. + */ +public class ClsCallback implements Callback { + + private static final Logger LOG = InlongLoggerFactory.getLogger(ClsCallback.class); + + private final Transaction tx; + private final ClsSinkContext context; + private final ProfileEvent event; + private final String topicId; + + /** + * Constructor. + * + * @param tx Transaction + * @param context Context. + * @param event Related event. + */ + public ClsCallback(Transaction tx, ClsSinkContext context, ProfileEvent event) { + this.tx = tx; + this.context = context; + this.event = event; + this.topicId = event.getHeaders().get(ClsSinkContext.KEY_TOPIC_ID); + } + + @Override + public void onCompletion(Result result) { + if (!result.isSuccessful()) { + onFailed(result); + return; + } + onSuccess(); + } + + /** + * If send success. + */ + private void onSuccess() { + context.addSendResultMetric(event, topicId, true, System.currentTimeMillis()); + tx.commit(); + tx.close(); + } + + /** + * If send failed. + * + * @param result Send result. + */ + private void onFailed(Result result) { + tx.rollback(); + tx.close(); + LOG.error(result.toString()); + context.addSendResultMetric(event, topicId, false, System.currentTimeMillis()); + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java new file mode 100644 index 00000000000..263d83fa6d0 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.cls; + +import com.google.common.base.Preconditions; +import com.tencentcloudapi.cls.producer.AsyncProducerClient; +import com.tencentcloudapi.cls.producer.common.LogItem; +import com.tencentcloudapi.cls.producer.errors.ProducerException; +import org.apache.flume.Channel; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.lifecycle.LifecycleState; +import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Cls channel worker. + */ +public class ClsChannelWorker extends Thread { + private static final Logger LOG = InlongLoggerFactory.getLogger(ClsChannelWorker.class); + + private final ClsSinkContext context; + private final String workerName; + private final Channel channel; + private final IEvent2LogItemHandler handler; + private LifecycleState status; + + /** + * Constructor. + * + * @param sinkName Sink name. + * @param context Cls context. + * @param workerIndex Index of cls channel worker. + */ + public ClsChannelWorker(String sinkName, ClsSinkContext context, int workerIndex) { + this.context = Preconditions.checkNotNull(context); + this.workerName = sinkName + "-" + workerIndex; + this.channel = Preconditions.checkNotNull(context.getChannel()); + this.handler = Preconditions.checkNotNull(context.getLogItemHandler()); + this.status = LifecycleState.IDLE; + } + + @Override + public void start() { + LOG.info("Start new cls channel worker {}", this.workerName); + status = LifecycleState.START; + super.start(); + } + + /** + * Close cls channel worker. + */ + public void close() { + LOG.info("Close cls channel worker {}", this.workerName); + status = LifecycleState.STOP; + } + + /** + * Run until status is STOP. + */ + @Override + public void run() { + LOG.info("worker {} start to run, the state is {}", this.workerName, status.name()); + while (status != LifecycleState.STOP) { + doRun(); + } + } + + /** + * Do run. + */ + private void doRun() { + Transaction tx = null; + try { + tx = channel.getTransaction(); + tx.begin(); + Event rowEvent = channel.take(); + + // if event is null, close tx and sleep for a while. + if (rowEvent == null) { + this.commitTransaction(tx); + sleepOneInterval(); + return; + } + // if is the instanceof ProfileEvent + if (!(rowEvent instanceof ProfileEvent)) { + this.commitTransaction(tx); + LOG.error("The type of row event is not compatible with ProfileEvent"); + return; + } + // do send + this.send(rowEvent, tx); + + } catch (Exception e) { + LOG.error(e.getMessage(), e); + this.rollbackTransaction(tx); + sleepOneInterval(); + } + } + + /** + * Send event to Cls + * + * @param rowEvent Row event. + * @param tx Transaction + * @throws ProducerException + * @throws InterruptedException + */ + private void send(Event rowEvent, Transaction tx) throws ProducerException, InterruptedException { + ProfileEvent event = (ProfileEvent) rowEvent; + ClsIdConfig idConfig = context.getIdConfig(event.getUid()); + event.getHeaders().put(ClsSinkContext.KEY_TOPIC_ID, idConfig.getTopicId()); + AsyncProducerClient client = context.getClient(idConfig.getSecretId()); + List record = handler.parse(context, event); + ClsCallback callback = new ClsCallback(tx, context, event); + client.putLogs(idConfig.getTopicId(), record, callback); + } + + /** sleepOneInterval */ + private void sleepOneInterval() { + try { + Thread.sleep(context.getProcessInterval()); + } catch (InterruptedException e1) { + LOG.error(e1.getMessage(), e1); + } + } + + /** + * Rollback transaction if it exists. + * @param tx Transaction + */ + private void rollbackTransaction(Transaction tx) { + if (tx != null) { + tx.rollback(); + tx.close(); + } + } + + /** + * Commit transaction if it exists. + * @param tx Transaction + */ + private void commitTransaction(Transaction tx) { + if (tx != null) { + tx.commit(); + tx.close(); + } + } + +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java index abcae884d80..04f67c3ebc9 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java @@ -17,7 +17,7 @@ package org.apache.inlong.sort.standalone.sink.cls; -import lombok.Getter; +import lombok.Data; import java.util.ArrayList; import java.util.Arrays; @@ -26,7 +26,7 @@ /** * Cls config of each uid. */ -@Getter +@Data public class ClsIdConfig { private String inlongGroupId; private String inlongStreamId; diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java new file mode 100644 index 00000000000..cc1e976da05 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.cls; + +import org.apache.flume.Context; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.conf.Configurable; +import org.apache.flume.sink.AbstractSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Cls Sink implementation. + * + *

+ * Response for initialization of {@link ClsChannelWorker}. + *

+ */ +public class ClsSink extends AbstractSink implements Configurable { + private static final Logger LOG = LoggerFactory.getLogger(ClsSink.class); + + private Context parentContext; + private ClsSinkContext context; + private List workers; + + /** + * Start {@link ClsChannelWorker}. + */ + @Override + public void start() { + super.start(); + try { + this.context = new ClsSinkContext(getName(), parentContext, getChannel()); + this.context.start(); + for (int i = 0; i < context.getMaxThreads(); i++) { + ClsChannelWorker worker = new ClsChannelWorker(getName(), context, i); + this.workers.add(worker); + worker.start(); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + /** + * Stop {@link ClsChannelWorker}. + */ + @Override + public void stop() { + super.stop(); + try { + this.context.close(); + for (ClsChannelWorker worker : this.workers) { + worker.close(); + } + this.workers.clear(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + /** + * Process. + * @return Status + * @throws EventDeliveryException + */ + @Override + public Status process() throws EventDeliveryException { + return Status.BACKOFF; + } + + /** + * Config parent context. + * @param context Parent context. + */ + @Override + public void configure(Context context) { + LOG.info("start to configure:{}, context:{}.", this.getName(), context.toString()); + this.parentContext = context; + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java index f0681009851..9872ab965ef 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java @@ -44,7 +44,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /** @@ -66,6 +65,7 @@ public class ClsSinkContext extends SinkContext { private static final String KEY_MAX_RETRY_BACKOFF_MS = "maxRetryBackoffMs"; private static final String KEY_MAX_KEYWORD_LENGTH = "maxKeywordLength"; private static final String KEY_EVENT_LOG_ITEM_HANDLER = "logItemHandler"; + public static final String KEY_TOPIC_ID = "topicId"; private static final int DEFAULT_KEYWORD_MAX_LENGTH = 32 * 1024 - 1; private int keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH; @@ -74,9 +74,6 @@ public class ClsSinkContext extends SinkContext { private List deletingClients; private Context sinkContext; private Map idConfigMap = new ConcurrentHashMap<>(); - private AtomicLong offerCounter = new AtomicLong(0); - private AtomicLong takeCounter = new AtomicLong(0); - private AtomicLong backCounter = new AtomicLong(0); private IEvent2LogItemHandler event2LogItemHandler; /** @@ -332,4 +329,14 @@ public int getKeywordMaxLength() { public IEvent2LogItemHandler getLogItemHandler() { return event2LogItemHandler; } + + /** + * Get cls client. + * + * @param secretId ID of client. + * @return Client instance. + */ + public AsyncProducerClient getClient(String secretId) { + return clientMap.get(secretId); + } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java index 14d76b777a1..7ce0cf30df3 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java @@ -105,6 +105,8 @@ public void run() { continue; } if (!(rowEvent instanceof ProfileEvent)) { + tx.commit(); + tx.close(); LOG.error("The type of row event is not compatible with ProfileEvent"); continue; } diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java new file mode 100644 index 00000000000..7e9cc8d6325 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.cls; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.List; + +@RunWith(PowerMockRunner.class) +@PowerMockIgnore("javax.management.*") +public class TestClsIdConfig { + + @Test + public void testGetFieldList() { + ClsIdConfig idConfig = new ClsIdConfig(); + String testFieldName = "1 2 3 4 5 6 7"; + idConfig.setFieldNames(testFieldName); + List fieldList = idConfig.getFieldList(); + Assert.assertEquals(7, fieldList.size()); + } +} \ No newline at end of file diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java new file mode 100644 index 00000000000..211c8f0d1bb --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.cls; + +import com.tencentcloudapi.cls.producer.common.LogItem; +import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.utils.Constants; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@RunWith(PowerMockRunner.class) +@PowerMockIgnore("javax.management.*") +@PrepareForTest({ClsSinkContext.class, LogItem.class}) +public class TestDefaultEvent2LogItemHandler { + + private ClsIdConfig idConfig; + private ProfileEvent event; + private DefaultEvent2LogItemHandler handler; + private ClsSinkContext mockContext; + + @Before + public void setUp() { + idConfig = prepareIdConfig(); + event = prepareEvent(); + mockContext = PowerMockito.mock(ClsSinkContext.class); + handler = new DefaultEvent2LogItemHandler(); + } + + @Test + public void testNoIdConfig() { + Assert.assertNull(handler.parse(mockContext, event)); + } + + // @Test + public void testNormal() { + PowerMockito.when(mockContext.getIdConfig(Mockito.anyString())).thenReturn(idConfig); + PowerMockito.when(mockContext.getKeywordMaxLength()).thenReturn(8 * 1024); + List itemList = handler.parse(mockContext, event); + System.out.println(itemList.size()); + } + + private ClsIdConfig prepareIdConfig() { + ClsIdConfig config = new ClsIdConfig(); + config.setFieldNames("f1 f2 f3 f4 f5 f6 f7 f8"); + config.setInlongGroupId("testGroup"); + config.setInlongStreamId("testStream"); + config.setSecretId("testSecretId"); + config.setSecretKey("testSecretKey"); + config.setEndpoint("testEndPoint"); + config.setTopicId("testTopicId"); + return config; + } + + private ProfileEvent prepareEvent() { + String str = "v1|v2|v3|v4|v5|v6|v7|v8"; + final byte[] body = str.getBytes(StandardCharsets.UTF_8); + Map headers = new HashMap<>(); + headers.put(Constants.INLONG_GROUP_ID, "testGroup"); + headers.put(Constants.INLONG_STREAM_ID, "testStream"); + headers.put(Constants.HEADER_KEY_MSG_TIME, "1234456"); + return new ProfileEvent(body, headers); + } + +} \ No newline at end of file