Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,55 @@

import org.apache.inlong.sdk.sort.entity.InLongTopic;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
* Builder to build single topic fetcher
* Abstract Builder of topic fetcher
*/
public abstract class SingleTopicFetcherBuilder implements TopicFetcherBuilder {
public abstract class AbstractTopicFetcherBuilder implements TopicFetcherBuilder {
protected Interceptor interceptor;
protected Deserializer deserializer;
protected ClientContext context;
protected InLongTopic topic;
protected List<InLongTopic> topics;
protected String fetchKey;

public SingleTopicFetcherBuilder interceptor(Interceptor interceptor) {
@Override
public TopicFetcherBuilder interceptor(Interceptor interceptor) {
this.interceptor = interceptor;
return this;
}

public SingleTopicFetcherBuilder topic(InLongTopic topic) {
@Override
public TopicFetcherBuilder topic(InLongTopic topic) {
this.topic = topic;
return this;
}

public SingleTopicFetcherBuilder deserializer(Deserializer deserializer) {
@Override
public TopicFetcherBuilder topic(Collection<InLongTopic> topics) {
this.topics = new ArrayList<>(topics);
return this;
}

@Override
public TopicFetcherBuilder deserializer(Deserializer deserializer) {
this.deserializer = deserializer;
return this;
}

public SingleTopicFetcherBuilder context(ClientContext context) {
@Override
public TopicFetcherBuilder context(ClientContext context) {
this.context = context;
return this;
}

@Override
public TopicFetcherBuilder fetchKey(String fetchKey) {
this.fetchKey = fetchKey;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
public abstract class MultiTopicsFetcher implements TopicFetcher {
protected final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(true);
protected final ScheduledExecutorService executor;
protected final String fetchKey;
protected Map<String, InLongTopic> onlineTopics;
protected List<InLongTopic> newTopics;
protected ClientContext context;
protected Deserializer deserializer;
protected volatile Thread fetchThread;
Expand All @@ -56,14 +58,21 @@ public MultiTopicsFetcher(
List<InLongTopic> topics,
ClientContext context,
Interceptor interceptor,
Deserializer deserializer) {
Deserializer deserializer,
String fetchKey) {
this.onlineTopics = topics.stream()
.filter(Objects::nonNull)
.collect((Collectors.toMap(InLongTopic::getTopic, t -> t)));
this.context = context;
this.interceptor = interceptor;
this.deserializer = deserializer;
this.executor = Executors.newSingleThreadScheduledExecutor();
this.fetchKey = fetchKey;
}

@Override
public String getFetchKey() {
return fetchKey;
}

protected boolean needUpdate(Collection<InLongTopic> newTopics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public SingleTopicFetcher(
this.interceptor = Optional.ofNullable(interceptor).orElse(new MsgTimeInterceptor());
}

@Override
public String getFetchKey() {
return topic.getTopicKey();
}

@Override
public boolean updateTopics(List<InLongTopic> topics) {
if (topics.size() != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,14 @@ public class SortClientConfig implements Serializable {
private int maxEmptyPollSleepMs = 500;
private int emptyPollTimes = 10;
private int cleanOldConsumerIntervalSec = 60;

public SortClientConfig(String sortTaskId, String sortClusterName, InLongTopicChangeListener assignmentsListener,
ConsumeStrategy consumeStrategy, String localIp) {
private int maxConsumerSize = 5;

public SortClientConfig(
String sortTaskId,
String sortClusterName,
InLongTopicChangeListener assignmentsListener,
ConsumeStrategy consumeStrategy,
String localIp) {
this.sortTaskId = sortTaskId;
this.sortClusterName = sortClusterName;
this.assignmentsListener = assignmentsListener;
Expand Down Expand Up @@ -323,6 +328,14 @@ public void setCleanOldConsumerIntervalSec(int cleanOldConsumerIntervalSec) {
this.cleanOldConsumerIntervalSec = cleanOldConsumerIntervalSec;
}

public int getMaxConsumerSize() {
return maxConsumerSize;
}

public void setMaxConsumerSize(int maxConsumerSize) {
this.maxConsumerSize = maxConsumerSize;
}

/**
* ConsumeStrategy
*/
Expand Down Expand Up @@ -367,7 +380,7 @@ public void setParameters(Map<String, String> sortSdkParams) {
this.managerApiVersion = sortSdkParams.getOrDefault("managerApiVersion", managerApiVersion);
String strConsumeStrategy = sortSdkParams.getOrDefault("consumeStrategy", consumeStrategy.name());
String strManagerType = sortSdkParams.getOrDefault("topicManagerType",
TopicType.SINGLE_TOPIC.toString());
TopicType.MULTI_TOPIC.toString());
this.consumeStrategy = ConsumeStrategy.valueOf(strConsumeStrategy);
this.topicType = TopicType.valueOf(strManagerType);

Expand All @@ -385,5 +398,6 @@ public void setParameters(Map<String, String> sortSdkParams) {
this.emptyPollSleepStepMs = NumberUtils.toInt(sortSdkParams.get("emptyPollSleepStepMs"), emptyPollSleepStepMs);
this.maxEmptyPollSleepMs = NumberUtils.toInt(sortSdkParams.get("maxEmptyPollSleepMs"), maxEmptyPollSleepMs);
this.emptyPollTimes = NumberUtils.toInt(sortSdkParams.get("emptyPollTimes"), emptyPollTimes);
this.maxConsumerSize = NumberUtils.toInt(sortSdkParams.get("maxConsumerSize"), maxConsumerSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public interface TopicFetcher {
*/
void ack(String msgOffset) throws Exception;

/**
* Get the unique fetcher key to specify the fetcher who consume this message.
* @return Message key.
*/
String getFetchKey();

/**
* Pause the consuming
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@

package org.apache.inlong.sdk.sort.api;

import org.apache.inlong.sdk.sort.fetcher.kafka.KafkaSingleTopicFetcherBuilder;
import org.apache.inlong.sdk.sort.fetcher.pulsar.PulsarSingleTopicFetcherBuilder;
import org.apache.inlong.sdk.sort.fetcher.tube.TubeSingleTopicFetcherBuilder;
import org.apache.inlong.sdk.sort.entity.InLongMessage;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.fetcher.kafka.KafkaTopicFetcherBuilderImpl;
import org.apache.inlong.sdk.sort.fetcher.pulsar.PulsarTopicFetcherBuilderImpl;
import org.apache.inlong.sdk.sort.fetcher.tube.TubeTopicFetcherBuilderImpl;
import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer;

import java.util.Collection;

/**
* Interface of topic fetcher builder.
Expand All @@ -29,19 +34,88 @@ public interface TopicFetcherBuilder {

/**
* Subscribe topics and build the {@link TopicFetcher}
*
* @return The prepared topic fetcher
*/
TopicFetcher subscribe();

static KafkaSingleTopicFetcherBuilder kafkaSingleTopic() {
return new KafkaSingleTopicFetcherBuilder();
/**
* Specify the interceptor of TopicFetcher.
* The interceptor is used to filter or modify the message fetched from MQ.
* The default interceptor is {@link MessageInterceptor}.
*
* @param interceptor Interceptor
* @return TopicFetcherBuilder
*/
TopicFetcherBuilder interceptor(Interceptor interceptor);

/**
* Specify the topic to be subscribed.
* Repeated call will replace the previous topic.
*
* @param topic Topic to be subscribed.
* @return TopicFetcherBuilder
*/
TopicFetcherBuilder topic(InLongTopic topic);

/**
* Specify the topics to be subscribed.
* Repeated call will replace the previous topics.
* <p>
* This method will removed the topic which is added though TopicFetcherBuilder::topic(InLongTopic topic)
* </p>
*
* @param topics Topics to be subscribed.
* @return TopicFetcherBuilder
*/
TopicFetcherBuilder topic(Collection<InLongTopic> topics);

/**
* Specify the deserializer of fetcher.
* Deserializer is used to decode the messages fetched from MQ, and arrange them to {@link InLongMessage} format.
* The default deserializer is {@link MessageDeserializer}
*
* @param deserializer Deserializer.
* @return TopicFetcherBuilder
*/
TopicFetcherBuilder deserializer(Deserializer deserializer);

/**
* Specify the clientContext of topic fetcher
*
* @param context ClientContext.
* @return TopicFetcherBuilder
*/
TopicFetcherBuilder context(ClientContext context);

/**
* The fetchKey to specify that which one fetcher this message belongs to.
* @param fetchKey Key of fetcher.
* @return TopicFetcherBuilder
*/
TopicFetcherBuilder fetchKey(String fetchKey);

/**
* Got a kafka topic fetcher builder
* @return KafkaTopicFetcherBuilderImpl
*/
static KafkaTopicFetcherBuilderImpl newKafkaBuilder() {
return new KafkaTopicFetcherBuilderImpl();
}

static PulsarSingleTopicFetcherBuilder pulsarSingleTopic() {
return new PulsarSingleTopicFetcherBuilder();
/**
* Got a pulsar topic fetcher builder
* @return KafkaTopicFetcherBuilderImpl
*/
static PulsarTopicFetcherBuilderImpl newPulsarBuilder() {
return new PulsarTopicFetcherBuilderImpl();
}

static TubeSingleTopicFetcherBuilder tubeSingleTopic() {
return new TubeSingleTopicFetcherBuilder();
/**
* Got a tube topic fetcher builder
* @return KafkaTopicFetcherBuilderImpl
*/
static TubeTopicFetcherBuilderImpl newTubeBuilder() {
return new TubeTopicFetcherBuilderImpl();
}
}
Loading