Skip to content

Commit

Permalink
pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jwomeara committed May 1, 2024
1 parent 034755b commit f41489c
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datawave.microservice.query.messaging.hazelcast;

import static datawave.microservice.query.messaging.hazelcast.HazelcastQueryResultsManager.HAZELCAST;
import static datawave.microservice.query.messaging.hazelcast.HazelcastQueryResultsManager.QUEUE_PREFIX;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -44,7 +45,7 @@ public <T> void check(String queryId, T data) throws InterruptedException, JsonP
}

String stringData = objectMapper.writeValueAsString(new DataWrapper<>(data));
getQueueForQueryId(queryId).put(stringData);
getQueueForQueryId(QUEUE_PREFIX + queryId).put(stringData);
}

@Override
Expand All @@ -53,15 +54,15 @@ public <T> T claim(String queryId) throws InterruptedException, JsonProcessingEx
log.trace("Claiming large payload for query {}", queryId);
}

String stringData = getQueueForQueryId(queryId).take();
String stringData = getQueueForQueryId(QUEUE_PREFIX + queryId).take();
DataWrapper<T> wrapper = objectMapper.readerFor(DataWrapper.class).readValue(stringData);
return wrapper.data;
}

private IQueue<String> getQueueForQueryId(String queryId) {
// @formatter:off
return claimCheckMap.computeIfAbsent(
queryId,
QUEUE_PREFIX + queryId,
key -> HazelcastMessagingUtils.getOrCreateQueue(
hazelcastInstance,
messagingProperties.getHazelcast().getBackupCount(),
Expand All @@ -74,7 +75,7 @@ public void empty(String queryId) {
log.trace("Emptying claim check queue for query {}", queryId);
}

IQueue<?> queue = claimCheckMap.remove(queryId);
IQueue<?> queue = claimCheckMap.remove(QUEUE_PREFIX + queryId);
if (queue != null) {
queue.clear();
}
Expand All @@ -85,7 +86,7 @@ public void delete(String queryId) {
log.trace("Deleting claim check queue for query {}", queryId);
}

IQueue<?> queue = claimCheckMap.remove(queryId);
IQueue<?> queue = claimCheckMap.remove(QUEUE_PREFIX + queryId);
if (queue != null) {
queue.destroy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class HazelcastQueryResultsManager implements QueryResultsManager {

public static final String HAZELCAST = "hazelcast";

static final String QUEUE_PREFIX = "queryResults.";

static final String SPLIT_BRAIN_PROTECTION_NAME = "splitBrainDefault";

private final MessagingProperties messagingProperties;
Expand All @@ -35,22 +37,20 @@ public HazelcastQueryResultsManager(MessagingProperties messagingProperties, Haz

@Override
public QueryResultsListener createListener(String listenerId, String queryId) {
return new HazelcastQueryResultsListener(
HazelcastMessagingUtils.getOrCreateQueue(hazelcastInstance, messagingProperties.getHazelcast().getBackupCount(), queryId), objectMapper,
listenerId);
return new HazelcastQueryResultsListener(HazelcastMessagingUtils.getOrCreateQueue(hazelcastInstance,
messagingProperties.getHazelcast().getBackupCount(), QUEUE_PREFIX + queryId), objectMapper, listenerId);
}

@Override
public QueryResultsPublisher createPublisher(String queryId) {
return new HazelcastQueryResultsPublisher(
HazelcastMessagingUtils.getOrCreateQueue(hazelcastInstance, messagingProperties.getHazelcast().getBackupCount(), queryId),
objectMapper);
return new HazelcastQueryResultsPublisher(HazelcastMessagingUtils.getOrCreateQueue(hazelcastInstance,
messagingProperties.getHazelcast().getBackupCount(), QUEUE_PREFIX + queryId), objectMapper);
}

@Override
public void deleteQuery(String queryId) {
try {
hazelcastInstance.getQueue(queryId).destroy();
hazelcastInstance.getQueue(QUEUE_PREFIX + queryId).destroy();
} catch (Exception e) {
log.error("Failed to delete queue {}", queryId, e);
}
Expand All @@ -59,14 +59,14 @@ public void deleteQuery(String queryId) {
@Override
public void emptyQuery(String queryId) {
try {
hazelcastInstance.getQueue(queryId).clear();
hazelcastInstance.getQueue(QUEUE_PREFIX + queryId).clear();
} catch (Exception e) {
log.error("Unable to empty queue {}", queryId, e);
}
}

@Override
public int getNumResultsRemaining(String queryId) {
return hazelcastInstance.getQueue(queryId).size();
return hazelcastInstance.getQueue(QUEUE_PREFIX + queryId).size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static datawave.microservice.query.messaging.AcknowledgementCallback.Status.ACK;
import static datawave.microservice.query.messaging.AcknowledgementCallback.Status.NACK;
import static datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager.TOPIC_PREFIX;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -45,12 +46,12 @@ public KafkaQueryResultsListener(MessagingProperties messagingProperties, Consum
log.trace("Creating kafka query results listener for " + queryId + " with listenerId " + listenerId);
}
this.queryId = queryId;
ContainerProperties containerProps = new ContainerProperties(queryId);
ContainerProperties containerProps = new ContainerProperties(TOPIC_PREFIX + queryId);
containerProps.setClientId(listenerId);

// use the topicId (i.e. queryId) as the groupId. this makes it possible
// to get the size of the queue later on using just the query id
containerProps.setGroupId(queryId);
containerProps.setGroupId(TOPIC_PREFIX + queryId);

containerProps.setMessageListener(this);
containerProps.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
Expand Down Expand Up @@ -83,7 +84,7 @@ public String getListenerId() {
}

public String getQueryId() {
return container.getContainerProperties().getGroupId();
return queryId;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class KafkaQueryResultsManager implements QueryResultsManager {

public static final String KAFKA = "kafka";

static final String TOPIC_PREFIX = "queryResults.";

private final MessagingProperties messagingProperties;
private final AdminClient adminClient;
private final ProducerFactory<String,String> kafkaProducerFactory;
Expand All @@ -58,7 +60,7 @@ public KafkaQueryResultsManager(MessagingProperties messagingProperties, KafkaAd
*/
@Override
public QueryResultsListener createListener(String listenerId, String queryId) {
createTopic(queryId);
createTopic(TOPIC_PREFIX + queryId);
return new KafkaQueryResultsListener(messagingProperties, kafkaConsumerFactory, listenerId, queryId);
}

Expand All @@ -71,9 +73,9 @@ public QueryResultsListener createListener(String listenerId, String queryId) {
*/
@Override
public QueryResultsPublisher createPublisher(String queryId) {
createTopic(queryId);
createTopic(TOPIC_PREFIX + queryId);
KafkaTemplate<String,String> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
kafkaTemplate.setDefaultTopic(queryId);
kafkaTemplate.setDefaultTopic(TOPIC_PREFIX + queryId);
return new KafkaQueryResultsPublisher(kafkaTemplate);
}

Expand All @@ -85,7 +87,7 @@ public QueryResultsPublisher createPublisher(String queryId) {
*/
@Override
public void deleteQuery(String queryId) {
deleteTopic(queryId);
deleteTopic(TOPIC_PREFIX + queryId);
}

private void deleteTopic(String topic) {
Expand Down Expand Up @@ -126,25 +128,34 @@ private void createTopic(String topic) {

@Override
public void emptyQuery(String name) {
TopicDescription topic = describeTopic(name);
if (topic != null) {
emptyTopic(TOPIC_PREFIX + name);

}

private void emptyTopic(String topic) {
TopicDescription topicDesc = describeTopic(topic);
if (topicDesc != null) {
Map<TopicPartition,RecordsToDelete> partitions = new HashMap<>();
RecordsToDelete records = RecordsToDelete.beforeOffset(Long.MAX_VALUE);
for (TopicPartitionInfo info : topic.partitions()) {
TopicPartition partition = new TopicPartition(name, info.partition());
for (TopicPartitionInfo info : topicDesc.partitions()) {
TopicPartition partition = new TopicPartition(topic, info.partition());
partitions.put(partition, records);
}
DeleteRecordsResult result = adminClient.deleteRecords(partitions);
try {
result.all();
} catch (Exception e) {
log.debug("Unable to empty queue " + name, e);
log.debug("Unable to empty topic " + topic, e);
}
}
}

@Override
public int getNumResultsRemaining(final String topic) {
public int getNumResultsRemaining(final String queryId) {
return getNumResultsRemainingFromTopic(TOPIC_PREFIX + queryId);
}

private int getNumResultsRemainingFromTopic(final String topic) {
Map<TopicPartition,Long> consumerOffsetMap = new HashMap<>();
try {
// @formatter:off
Expand Down

0 comments on commit f41489c

Please sign in to comment.