Skip to content
Permalink
Browse files
Debug logging added
  • Loading branch information
nabarunnag committed Mar 4, 2020
1 parent a713123 commit 3c3804b85d18a0c485a798bf237997469bf1fa1c
Showing 2 changed files with 6 additions and 0 deletions.
@@ -85,12 +85,17 @@ void setRegionNameToRegion(Map<String, Region<Object, Object>> regionNameToRegio

@Override
public void put(Collection<SinkRecord> records) {
logger.debug("Received " + records.size() + " records.");
put(records, new HashMap<>());
}

void put(Collection<SinkRecord> records, Map<String, BatchRecords> batchRecordsMap) {
// spin off a new thread to handle this operation? Downside is ordering and retries...
for (SinkRecord record : records) {
logger.debug("kafka coordinates:(Topic:"
+ record.topic() +
" Partition:" + record.kafkaPartition() + " Offset:" + record.kafkaOffset()
+ ")");
updateBatchForRegionByTopic(record, batchRecordsMap);
}
batchRecordsMap.forEach(
@@ -104,6 +104,7 @@ public List<SourceRecord> poll() {
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
logger.debug("Geode events polled :" + events.size());
for (GeodeEvent event : events) {
String regionName = event.getRegionName();
List<String> topics = regionToTopics.get(regionName);

0 comments on commit 3c3804b

Please sign in to comment.