Skip to content

Commit

Permalink
chore(broker): update processing duration metrics to include writeEve…
Browse files Browse the repository at this point in the history
…nt and updateState
  • Loading branch information
deepthidevaki committed Dec 9, 2020
1 parent f5f958d commit 81740e0
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ public final class ProcessingStateMachine {
private long errorRecordPosition = StreamProcessor.UNSET_POSITION;
private volatile boolean onErrorHandlingLoop;
private int onErrorRetries;
// Used for processing duration metrics
private long processingStartTime;

public ProcessingStateMachine(
final ProcessingContext context, final BooleanSupplier shouldProcessNext) {
Expand Down Expand Up @@ -217,7 +219,7 @@ private void processEvent(final LoggedEvent event) {
return;
}

final long processingStartTime = ActorClock.currentTimeMillis();
processingStartTime = ActorClock.currentTimeMillis();
metrics.processingLatency(metadata.getRecordType(), event.getTimestamp(), processingStartTime);

try {
Expand All @@ -227,8 +229,6 @@ private void processEvent(final LoggedEvent event) {
processInTransaction(typedEvent);

metrics.eventProcessed();
metrics.processingDuration(
metadata.getRecordType(), processingStartTime, ActorClock.currentTimeMillis());

writeEvent();
} catch (final RecoverableException recoverableException) {
Expand Down Expand Up @@ -428,6 +428,8 @@ private void executeSideEffects() {

notifyListener();

metrics.processingDuration(
metadata.getRecordType(), processingStartTime, ActorClock.currentTimeMillis());
// continue with next event
currentProcessor = null;
actor.submit(this::readNextEvent);
Expand Down

0 comments on commit 81740e0

Please sign in to comment.