Skip to content

Commit

Permalink
Adjust the CamelSourceTask code to reduce CPU usage and heap allocations
Browse files Browse the repository at this point in the history
This modifies the code so that it blocks while waiting for the messages
to arrive while also respecting the maxPollDuration.
  • Loading branch information
orpiske committed Sep 7, 2020
1 parent 64ab3f1 commit 4b7b0f6
Showing 1 changed file with 43 additions and 33 deletions.
Expand Up @@ -119,53 +119,63 @@ public void start(Map<String, String> props) {
}
}

private long remaining(long startPollEpochMilli, long maxPollDuration) {
return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
}


@Override
public synchronized List<SourceRecord> poll() {
long startPollEpochMilli = Instant.now().toEpochMilli();
final long startPollEpochMilli = Instant.now().toEpochMilli();

long remaining = remaining(startPollEpochMilli, maxPollDuration);
long collectedRecords = 0L;

List<SourceRecord> records = new ArrayList<>();
while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
Exchange exchange = consumer.receiveNoWait();
List<SourceRecord> records = null;
while (collectedRecords < maxBatchPollSize && remaining > 0) {
Exchange exchange = consumer.receive(remaining);
if (exchange == null) {
remaining = remaining(startPollEpochMilli, maxPollDuration);
continue;
}

if (records == null) {
records = new ArrayList<>();
}

if (exchange != null) {
LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), exchange.getFromEndpoint());
LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(),
exchange.getMessage().getMessageId(), exchange.getFromEndpoint());

// TODO: see if there is a better way to use sourcePartition
// an sourceOffset
Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
// TODO: see if there is a better way to use sourcePartition
// an sourceOffset
Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());

final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
final Object messageBodyValue = exchange.getMessage().getBody();
final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
final Object messageBodyValue = exchange.getMessage().getBody();

final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;

for (String singleTopic : topics) {
SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue);
if (exchange.getMessage().hasHeaders()) {
setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
}
if (exchange.hasProperties()) {
setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
}
for (String singleTopic : topics) {
SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema,
messageHeaderKey, messageBodySchema, messageBodyValue);

TaskHelper.logRecordContent(LOG, record, config);
records.add(record);
if (exchange.getMessage().hasHeaders()) {
setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
}
if (exchange.hasProperties()) {
setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
}
collectedRecords++;
} else {
break;
}
}

if (records.isEmpty()) {
return Collections.EMPTY_LIST;
} else {
return records;
TaskHelper.logRecordContent(LOG, record, config);
records.add(record);
}
collectedRecords++;
remaining = remaining(startPollEpochMilli, maxPollDuration);
}

return records;
}

@Override
Expand Down

0 comments on commit 4b7b0f6

Please sign in to comment.