Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-2651][Sort] Add CLS sink, and UT #2919

Merged
merged 3 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.standalone.sink.cls;

import com.tencentcloudapi.cls.producer.Callback;
import com.tencentcloudapi.cls.producer.Result;
import org.apache.flume.Transaction;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/**
* Implementation of CLS {@link Callback}.
*/
public class ClsCallback implements Callback {

private static final Logger LOG = InlongLoggerFactory.getLogger(ClsCallback.class);

private final Transaction tx;
private final ClsSinkContext context;
private final ProfileEvent event;
private final String topicId;

/**
* Constructor.
*
* @param tx Transaction
* @param context Context.
* @param event Related event.
*/
public ClsCallback(Transaction tx, ClsSinkContext context, ProfileEvent event) {
this.tx = tx;
this.context = context;
this.event = event;
this.topicId = event.getHeaders().get(ClsSinkContext.KEY_TOPIC_ID);
}

@Override
public void onCompletion(Result result) {
if (!result.isSuccessful()) {
onFailed(result);
return;
}
onSuccess();
}

/**
* If send success.
*/
private void onSuccess() {
context.addSendResultMetric(event, topicId, true, System.currentTimeMillis());
tx.commit();
tx.close();
}

/**
* If send failed.
*
* @param result Send result.
*/
private void onFailed(Result result) {
tx.rollback();
tx.close();
LOG.error(result.toString());
context.addSendResultMetric(event, topicId, false, System.currentTimeMillis());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.standalone.sink.cls;

import com.google.common.base.Preconditions;
import com.tencentcloudapi.cls.producer.AsyncProducerClient;
import com.tencentcloudapi.cls.producer.common.LogItem;
import com.tencentcloudapi.cls.producer.errors.ProducerException;
import org.apache.flume.Channel;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

import java.util.List;

/**
* Cls channel worker.
*/
public class ClsChannelWorker extends Thread {
private static final Logger LOG = InlongLoggerFactory.getLogger(ClsChannelWorker.class);

private final ClsSinkContext context;
private final String workerName;
private final Channel channel;
private final IEvent2LogItemHandler handler;
private LifecycleState status;

/**
* Constructor.
*
* @param sinkName Sink name.
* @param context Cls context.
* @param workerIndex Index of cls channel worker.
*/
public ClsChannelWorker(String sinkName, ClsSinkContext context, int workerIndex) {
this.context = Preconditions.checkNotNull(context);
this.workerName = sinkName + "-" + workerIndex;
this.channel = Preconditions.checkNotNull(context.getChannel());
this.handler = Preconditions.checkNotNull(context.getLogItemHandler());
this.status = LifecycleState.IDLE;
}

@Override
public void start() {
LOG.info("Start new cls channel worker {}", this.workerName);
status = LifecycleState.START;
super.start();
}

/**
* Close cls channel worker.
*/
public void close() {
LOG.info("Close cls channel worker {}", this.workerName);
status = LifecycleState.STOP;
}

/**
* Run until status is STOP.
*/
@Override
public void run() {
LOG.info("worker {} start to run, the state is {}", this.workerName, status.name());
while (status != LifecycleState.STOP) {
doRun();
}
}

/**
* Do run.
*/
private void doRun() {
Transaction tx = null;
try {
tx = channel.getTransaction();
tx.begin();
Event rowEvent = channel.take();

// if event is null, close tx and sleep for a while.
if (rowEvent == null) {
this.commitTransaction(tx);
sleepOneInterval();
return;
}
// if is the instanceof ProfileEvent
if (!(rowEvent instanceof ProfileEvent)) {
this.commitTransaction(tx);
LOG.error("The type of row event is not compatible with ProfileEvent");
return;
}
// do send
this.send(rowEvent, tx);

} catch (Exception e) {
LOG.error(e.getMessage(), e);
this.rollbackTransaction(tx);
sleepOneInterval();
}
}

/**
* Send event to Cls
*
* @param rowEvent Row event.
* @param tx Transaction
* @throws ProducerException
* @throws InterruptedException
*/
private void send(Event rowEvent, Transaction tx) throws ProducerException, InterruptedException {
ProfileEvent event = (ProfileEvent) rowEvent;
ClsIdConfig idConfig = context.getIdConfig(event.getUid());
event.getHeaders().put(ClsSinkContext.KEY_TOPIC_ID, idConfig.getTopicId());
AsyncProducerClient client = context.getClient(idConfig.getSecretId());
List<LogItem> record = handler.parse(context, event);
ClsCallback callback = new ClsCallback(tx, context, event);
client.putLogs(idConfig.getTopicId(), record, callback);
}

/** sleepOneInterval */
private void sleepOneInterval() {
try {
Thread.sleep(context.getProcessInterval());
} catch (InterruptedException e1) {
LOG.error(e1.getMessage(), e1);
}
}

/**
* Rollback transaction if it exists.
* @param tx Transaction
*/
private void rollbackTransaction(Transaction tx) {
if (tx != null) {
tx.rollback();
tx.close();
}
}

/**
* Commit transaction if it exists.
* @param tx Transaction
*/
private void commitTransaction(Transaction tx) {
if (tx != null) {
tx.commit();
tx.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.sort.standalone.sink.cls;

import lombok.Getter;
import lombok.Data;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -26,7 +26,7 @@
/**
* Cls config of each uid.
*/
@Getter
@Data
public class ClsIdConfig {
private String inlongGroupId;
private String inlongStreamId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.standalone.sink.cls;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* Cls Sink implementation.
*
* <p>
* Response for initialization of {@link ClsChannelWorker}.
* </p>
*/
public class ClsSink extends AbstractSink implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(ClsSink.class);

private Context parentContext;
private ClsSinkContext context;
private List<ClsChannelWorker> workers;

/**
* Start {@link ClsChannelWorker}.
*/
@Override
public void start() {
super.start();
try {
this.context = new ClsSinkContext(getName(), parentContext, getChannel());
this.context.start();
for (int i = 0; i < context.getMaxThreads(); i++) {
ClsChannelWorker worker = new ClsChannelWorker(getName(), context, i);
this.workers.add(worker);
worker.start();
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

/**
* Stop {@link ClsChannelWorker}.
*/
@Override
public void stop() {
super.stop();
try {
this.context.close();
for (ClsChannelWorker worker : this.workers) {
worker.close();
}
this.workers.clear();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

/**
* Process.
* @return Status
* @throws EventDeliveryException
*/
@Override
public Status process() throws EventDeliveryException {
return Status.BACKOFF;
}

/**
* Config parent context.
* @param context Parent context.
*/
@Override
public void configure(Context context) {
LOG.info("start to configure:{}, context:{}.", this.getName(), context.toString());
this.parentContext = context;
}
}
Loading