Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-3256][SDK] Modify log printing level and method for Sort-SDK #3259

Merged
merged 3 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ public boolean clean() {
return true;
} catch (Throwable th) {
logger.error("close error " + sortTaskId, th);

}
return false;
}
Expand Down Expand Up @@ -283,19 +282,19 @@ private void handleCurrentConsumeConfig(List<InLongTopic> currentConsumeConfig)
}

List<InLongTopic> newConsumeConfig = new ArrayList<>(currentConsumeConfig);
logger.info("newConsumeConfig List:{}", Arrays.toString(newConsumeConfig.toArray()));
logger.debug("newConsumeConfig List:{}", Arrays.toString(newConsumeConfig.toArray()));
List<String> newTopics = getNewTopics(newConsumeConfig);
logger.info("newTopics :{}", Arrays.toString(newTopics.toArray()));
logger.debug("newTopics :{}", Arrays.toString(newTopics.toArray()));

List<String> oldInLongTopics = new ArrayList<>(fetchers.keySet());
logger.info("oldInLongTopics :{}", Arrays.toString(oldInLongTopics.toArray()));
logger.debug("oldInLongTopics :{}", Arrays.toString(oldInLongTopics.toArray()));
//get need be offlined topics
oldInLongTopics.removeAll(newTopics);
logger.info("removed oldInLongTopics :{}", Arrays.toString(oldInLongTopics.toArray()));
logger.debug("removed oldInLongTopics :{}", Arrays.toString(oldInLongTopics.toArray()));

//get new topics
newTopics.removeAll(new ArrayList<>(fetchers.keySet()));
logger.info("really new topics :{}", Arrays.toString(newTopics.toArray()));
logger.debug("really new topics :{}", Arrays.toString(newTopics.toArray()));
//offline need be offlined topics
offlineRmovedTopic(oldInLongTopics);
//online new topics
Expand Down Expand Up @@ -338,7 +337,7 @@ private void offlineRmovedTopic(List<String> oldInLongTopics) {
private void onlineNewTopic(List<InLongTopic> newSubscribedInLongTopics, List<String> reallyNewTopic) {
for (InLongTopic inLongTopic : newSubscribedInLongTopics) {
if (!reallyNewTopic.contains(inLongTopic.getTopicKey())) {
logger.info("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
logger.debug("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
continue;
}
onlineTopic(inLongTopic);
Expand Down Expand Up @@ -377,7 +376,7 @@ private boolean checkAndCreateNewPulsarClient(InLongTopic inLongTopic) {
.authentication(AuthenticationFactory.token(inLongTopic.getInLongCluster().getToken()))
.build();
pulsarClients.put(inLongTopic.getInLongCluster().getClusterId(), pulsarClient);
logger.info("create pulsar client succ {}",
logger.debug("create pulsar client succ {}",
new String[]{inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getInLongCluster().getBootstraps(),
inLongTopic.getInLongCluster().getToken()});
Expand All @@ -387,7 +386,7 @@ private boolean checkAndCreateNewPulsarClient(InLongTopic inLongTopic) {
return false;
}
} else {
logger.info("bootstrap is null {}", inLongTopic.getInLongCluster());
logger.error("bootstrap is null {}", inLongTopic.getInLongCluster());
return false;
}
}
Expand All @@ -405,7 +404,7 @@ private boolean checkAndCreateNewTubeSessionFactory(InLongTopic inLongTopic) {
TubeConsumerCreater tubeConsumerCreater = new TubeConsumerCreater(messageSessionFactory,
tubeConfig);
tubeFactories.put(inLongTopic.getInLongCluster().getClusterId(), tubeConsumerCreater);
logger.info("create tube client succ {} {} {}",
logger.debug("create tube client succ {} {} {}",
new String[]{inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getInLongCluster().getBootstraps(),
inLongTopic.getInLongCluster().getToken()});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public void reload() {
e.getMessage());
logger.error(msg, e);
}
logger.debug("end to reload manager config.");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ public AckOffsetOnRebalance(KafkaConsumer<byte[], byte[]> consumer,

@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
logger.info("*- in ralance:onPartitionsRevoked");
logger.debug("*- in ralance:onPartitionsRevoked");
while (!commitOffsetMap.isEmpty()) {
consumer.commitSync(commitOffsetMap);
}
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
logger.info("*- in ralance:onPartitionsAssigned ");
logger.debug("*- in ralance:onPartitionsAssigned ");
Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(new HashSet<>(collection));
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committed.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public boolean init(Object object) {
return false;
}
this.bootstrapServers = bootstrapServers;
String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date());
this.fetchThread = new Thread(new Fetcher(), threadName);
this.fetchThread.start();
} catch (Exception e) {
Expand All @@ -91,6 +91,8 @@ public void ack(String msgOffset) throws Exception {
TopicPartition topicPartition = new TopicPartition(inLongTopic.getTopic(), Integer.parseInt(offset[0]));
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(Long.parseLong(offset[1]));
commitOffsetMap.put(topicPartition, offsetAndMetadata);
} else {
throw new Exception("offset is illegal, the correct format is int:long ,the error offset is:" + msgOffset);
}
}

Expand Down Expand Up @@ -123,7 +125,7 @@ public boolean close() {

@Override
public boolean isClosed() {
return false;
return closed;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,21 @@ public void ack(String msgOffset) throws Exception {
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
.addAckFailTimes(1L);
logger.error("consumer == null");
logger.error("consumer == null {}", inLongTopic);
return;
}
MessageId messageId = offsetCache.get(msgOffset);
if (messageId == null) {
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
.addAckFailTimes(1L);
logger.error("messageId == null");
logger.error("messageId == null {}", inLongTopic);
return;
}
consumer.acknowledgeAsync(messageId)
.thenAccept(consumer -> ackSucc(msgOffset))
.exceptionally(exception -> {
logger.error("ack fail:{}", msgOffset);
logger.error("ack fail:{} {}", inLongTopic, msgOffset);
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
.addAckFailTimes(1L);
Expand Down Expand Up @@ -162,15 +162,6 @@ private boolean createConsumer(PulsarClient client) {
return true;
}

/**
* isValidState
*/
public void isValidState() {
if (closed) {
throw new IllegalStateException(inLongTopic + " closed.");
}
}

/**
* pause
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public boolean close() {

@Override
public boolean isClosed() {
return this.closed;
return closed;
}

@Override
Expand All @@ -161,7 +161,7 @@ public void stopConsume(boolean stopConsume) {

@Override
public boolean isConsumeStop() {
return this.isStopConsume;
return isStopConsume;
}

@Override
Expand All @@ -179,15 +179,6 @@ public long getAckedOffset() {
return 0L;
}

/**
* isValidState
*/
public void isValidState() {
if (closed) {
throw new IllegalStateException(inLongTopic + " closed.");
}
}

public class Fetcher implements Runnable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static String formatDate(Date date, String format) {
* @return String
*/
public static String formatDate(Date date) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(date);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,6 @@ public void init() {
}
}

@Test
public void isValidState() {
InLongPulsarFetcherImpl inLongTopicFetcher = new InLongPulsarFetcherImpl(inLongTopic, clientContext);
inLongTopicFetcher.isValidState();
}

@Test
public void pause() {
InLongTopicFetcher inLongTopicFetcher = new InLongPulsarFetcherImpl(inLongTopic, clientContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@ public void setUp() throws Exception {

}

@Test
public void isValidState() {
InLongTubeFetcherImpl inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext);
inLongTopicFetcher.isValidState();
}

@Test
public void pause() {
InLongTubeFetcherImpl inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext);
Expand Down