Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ public void run() {
return;
}
boolean staged = false;
long startNs = System.nanoTime();
try {
LogFileWriter newWriter = createNewWriter();
LogFileWriter undrained = pendingWriter.getAndSet(newWriter);
Expand All @@ -458,6 +459,8 @@ public void run() {
numFailures, maxRotationRetries, t);
}
} finally {
// Time both success and failure paths so slow rotations are visible even when they fail.
logGroup.getMetrics().updateRotationTime(System.nanoTime() - startNs);
if (onDemand) {
// Clear the flag last so requestRotation()'s CAS rejects duplicate on-demand
// submissions while this task is still creating/staging a writer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,12 +962,27 @@ protected void abort(String reason, Throwable cause) throws IOException {
}

/**
* Handles events from the Disruptor,
* Holder for a pending SYNC event captured at consumer pickup time. Stored separately from the
* LogEvent because Disruptor reuses ring buffer slots — retaining a LogEvent reference past the
* end of its onEvent invocation is unsafe.
*/
private static class PendingSync {
final CompletableFuture<Void> future;
final long pickupTimeNs;

PendingSync(CompletableFuture<Void> future, long pickupTimeNs) {
this.future = future;
this.pickupTimeNs = pickupTimeNs;
}
}

/** Handles events from the Disruptor. */
protected class LogEventHandler implements EventHandler<LogEvent>, LifecycleAware {
private final List<CompletableFuture<Void>> pendingSyncFutures = new ArrayList<>();
private final List<PendingSync> pendingSyncs = new ArrayList<>();
private ReplicationModeImpl currentModeImpl;
private volatile IOException fatalException;
// Counts events drained per Disruptor batch. Single-threaded access from onEvent.
private int batchEventCount;

public LogEventHandler() {
}
Expand Down Expand Up @@ -1028,16 +1043,27 @@ private void updateModeOnFailure(IOException e) throws IOException {
* @throws IOException if the sync operation fails
*/
private void processPendingSyncs(long sequence) throws IOException {
if (pendingSyncFutures.isEmpty()) {
int pendingSyncCount = pendingSyncs.size();
if (pendingSyncCount == 0) {
return;
}
metrics.updatePendingSyncCount(pendingSyncCount);
// Record per-event wait between SYNC pickup and fsync start.
long syncStartNs = System.nanoTime();
Copy link
Copy Markdown
Contributor

@apurtell apurtell Jun 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move store to syncStartNs below the histogram updates to only time the actual sync. Just a nit.

for (PendingSync ps : pendingSyncs) {
metrics.updatePendingSyncWaitTime(syncStartNs - ps.pickupTimeNs);
}
// call sync on the current mode
currentModeImpl.sync();
try {
currentModeImpl.sync();
} finally {
metrics.updateFsSyncTime(System.nanoTime() - syncStartNs);
}
// Complete all pending sync futures
for (CompletableFuture<Void> future : pendingSyncFutures) {
future.complete(null);
for (PendingSync ps : pendingSyncs) {
ps.future.complete(null);
}
pendingSyncFutures.clear();
pendingSyncs.clear();
LOG.debug("Sync operation completed successfully up to sequence {}", sequence);
// after a successful sync check the mode set on the replication group
// Doing the mode check on sync points makes the implementation more robust
Expand Down Expand Up @@ -1068,13 +1094,13 @@ private void processPendingSyncs(long sequence) throws IOException {
* @param e The IOException that caused the failure
*/
private void failPendingSyncs(long sequence, IOException e) {
if (pendingSyncFutures.isEmpty()) {
if (pendingSyncs.isEmpty()) {
return;
}
for (CompletableFuture<Void> future : pendingSyncFutures) {
future.completeExceptionally(e);
for (PendingSync ps : pendingSyncs) {
ps.future.completeExceptionally(e);
}
pendingSyncFutures.clear();
pendingSyncs.clear();
LOG.warn("Failed to process syncs at sequence {}", sequence, e);
}

Expand Down Expand Up @@ -1145,6 +1171,7 @@ public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Ex
long currentTimeNs = System.nanoTime();
long ringBufferTimeNs = currentTimeNs - event.timestampNs;
metrics.updateRingBufferTime(ringBufferTimeNs);
batchEventCount++;
if (fatalException != null) {
// Append events are ignored; sync futures are failed immediately
// so producer threads unblock without waiting for the sync timeout.
Expand All @@ -1159,7 +1186,7 @@ public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Ex
currentModeImpl.append(event.record);
break;
case EVENT_TYPE_SYNC:
pendingSyncFutures.add(event.syncFuture);
pendingSyncs.add(new PendingSync(event.syncFuture, currentTimeNs));
break;
case EVENT_TYPE_SWAP:
// Wake-up marker from LogRotationTask. Drain the staged writer so the old writer is
Expand Down Expand Up @@ -1188,6 +1215,13 @@ public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Ex
new IOException("Unexpected error in event handler at sequence " + sequence, t);
setFatalException(wrapped);
failPendingSyncs(sequence, wrapped);
} finally {
// Reset on endOfBatch regardless of success/failure so the counter never leaks
// across batches when an exception path bypasses processPendingSyncs.
if (endOfBatch) {
metrics.updateBatchSize(batchEventCount);
batchEventCount = 0;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,33 @@ public interface MetricsReplicationLogGroupSource extends BaseSource {
String ROTATION_FAILURES = "rotationFailures";

String ROTATION_FAILURES_DESC = "Number of times log rotation has failed";
String APPEND_TIME = "appendTimeMs";

// All time histograms in this source are nanoseconds.
String APPEND_TIME = "appendTime";
String APPEND_TIME_DESC = "Histogram of time taken for append operations in nanoseconds";

String SYNC_TIME = "syncTimeMs";
String SYNC_TIME = "syncTime";
Comment on lines +37 to +41
String SYNC_TIME_DESC = "Histogram of time taken for sync operations in nanoseconds";

String ROTATION_TIME = "rotationTimeMs";
String ROTATION_TIME = "rotationTime";
String ROTATION_TIME_DESC = "Histogram of time taken for log rotations in nanoseconds";

String RING_BUFFER_TIME = "ringBufferTime";
String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer";
String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer in nanoseconds";

String FS_SYNC_TIME = "fsSyncTime";
String FS_SYNC_TIME_DESC =
"Histogram of time taken for the underlying filesystem sync (fsync) in nanoseconds";

String BATCH_SIZE = "batchSize";
String BATCH_SIZE_DESC = "Histogram of number of events drained per Disruptor batch";

String PENDING_SYNC_COUNT = "pendingSyncCount";
String PENDING_SYNC_COUNT_DESC = "Histogram of pending sync futures coalesced into one fsync";

String PENDING_SYNC_WAIT_TIME = "pendingSyncWaitTime";
String PENDING_SYNC_WAIT_TIME_DESC =
"Time a SYNC event waits between consumer pickup and fsync start, in nanoseconds";

String SYNC_TO_SAF_TRANSITIONS = "syncToSafTransitions";
String SYNC_TO_SAF_TRANSITIONS_DESC = "Number of SYNC to STORE_AND_FORWARD mode transitions";
Expand Down Expand Up @@ -78,6 +94,30 @@ public interface MetricsReplicationLogGroupSource extends BaseSource {
*/
void updateRingBufferTime(long timeNs);

/**
* Update the time taken for the underlying filesystem sync (fsync) in nanoseconds.
* @param timeNs Time taken in nanoseconds
*/
void updateFsSyncTime(long timeNs);

/**
* Update the number of events drained in a single Disruptor batch.
* @param size Number of events in the batch
*/
void updateBatchSize(long size);

/**
* Update the number of pending sync futures coalesced into one fsync.
* @param count Number of sync futures
*/
void updatePendingSyncCount(long count);

/**
* Update the time a SYNC event waited between consumer pickup and fsync start.
* @param timeNs Time in nanoseconds
*/
void updatePendingSyncWaitTime(long timeNs);

/**
* Increments the counter for log rotation failures. This counter tracks the number of times log
* rotation has failed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl
private final MutableHistogram syncTime;
private final MutableHistogram rotationTime;
private final MutableHistogram ringBufferTime;
private final MutableHistogram fsSyncTime;
private final MutableHistogram batchSize;
private final MutableHistogram pendingSyncCount;
private final MutableHistogram pendingSyncWaitTime;

public MetricsReplicationLogGroupSourceImpl(String haGroupName) {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, haGroupName);
Expand All @@ -51,6 +55,12 @@ public MetricsReplicationLogGroupSourceImpl(String metricsName, String metricsDe
syncTime = getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC);
rotationTime = getMetricsRegistry().newHistogram(ROTATION_TIME, ROTATION_TIME_DESC);
ringBufferTime = getMetricsRegistry().newHistogram(RING_BUFFER_TIME, RING_BUFFER_TIME_DESC);
fsSyncTime = getMetricsRegistry().newHistogram(FS_SYNC_TIME, FS_SYNC_TIME_DESC);
batchSize = getMetricsRegistry().newHistogram(BATCH_SIZE, BATCH_SIZE_DESC);
pendingSyncCount =
getMetricsRegistry().newHistogram(PENDING_SYNC_COUNT, PENDING_SYNC_COUNT_DESC);
pendingSyncWaitTime =
getMetricsRegistry().newHistogram(PENDING_SYNC_WAIT_TIME, PENDING_SYNC_WAIT_TIME_DESC);
}

@Override
Expand Down Expand Up @@ -93,11 +103,35 @@ public void updateRingBufferTime(long timeNs) {
ringBufferTime.add(timeNs);
}

@Override
public void updateFsSyncTime(long timeNs) {
fsSyncTime.add(timeNs);
}

@Override
public void updateBatchSize(long size) {
batchSize.add(size);
}

@Override
public void updatePendingSyncCount(long count) {
pendingSyncCount.add(count);
}

@Override
public void updatePendingSyncWaitTime(long timeNs) {
pendingSyncWaitTime.add(timeNs);
}

@Override
public ReplicationLogMetricValues getCurrentMetricValues() {
return new ReplicationLogMetricValues(rotationCount.value(), rotationFailuresCount.value(),
syncToSafTransitions.value(), appendTime.getMax(), syncTime.getMax(), rotationTime.getMax(),
ringBufferTime.getMax());
return ReplicationLogMetricValues.builder().rotationCount(rotationCount.value())
.rotationFailuresCount(rotationFailuresCount.value())
.syncToSafTransitions(syncToSafTransitions.value()).appendTime(appendTime.getMax())
.syncTime(syncTime.getMax()).rotationTime(rotationTime.getMax())
.ringBufferTime(ringBufferTime.getMax()).fsSyncTime(fsSyncTime.getMax())
.batchSize(batchSize.getMax()).pendingSyncCount(pendingSyncCount.getMax())
.pendingSyncWaitTime(pendingSyncWaitTime.getMax()).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,27 @@ public class ReplicationLogMetricValues {
private final long syncTime;
private final long rotationTime;
private final long ringBufferTime;
private final long fsSyncTime;
private final long batchSize;
private final long pendingSyncCount;
private final long pendingSyncWaitTime;

public ReplicationLogMetricValues(long rotationCount, long rotationFailuresCount,
long syncToSafTransitions, long appendTime, long syncTime, long rotationTime,
long ringBufferTime) {
this.rotationCount = rotationCount;
this.rotationFailuresCount = rotationFailuresCount;
this.syncToSafTransitions = syncToSafTransitions;
this.appendTime = appendTime;
this.syncTime = syncTime;
this.rotationTime = rotationTime;
this.ringBufferTime = ringBufferTime;
private ReplicationLogMetricValues(Builder b) {
this.rotationCount = b.rotationCount;
this.rotationFailuresCount = b.rotationFailuresCount;
this.syncToSafTransitions = b.syncToSafTransitions;
this.appendTime = b.appendTime;
this.syncTime = b.syncTime;
this.rotationTime = b.rotationTime;
this.ringBufferTime = b.ringBufferTime;
this.fsSyncTime = b.fsSyncTime;
this.batchSize = b.batchSize;
this.pendingSyncCount = b.pendingSyncCount;
this.pendingSyncWaitTime = b.pendingSyncWaitTime;
}

public static Builder builder() {
return new Builder();
}

public long getRotationCount() {
Expand Down Expand Up @@ -68,4 +78,92 @@ public long getRingBufferTime() {
return ringBufferTime;
}

public long getFsSyncTime() {
return fsSyncTime;
}

public long getBatchSize() {
return batchSize;
}

public long getPendingSyncCount() {
return pendingSyncCount;
}

public long getPendingSyncWaitTime() {
return pendingSyncWaitTime;
}

public static class Builder {
private long rotationCount;
private long rotationFailuresCount;
private long syncToSafTransitions;
private long appendTime;
private long syncTime;
private long rotationTime;
private long ringBufferTime;
private long fsSyncTime;
private long batchSize;
private long pendingSyncCount;
private long pendingSyncWaitTime;

public Builder rotationCount(long v) {
this.rotationCount = v;
return this;
}

public Builder rotationFailuresCount(long v) {
this.rotationFailuresCount = v;
return this;
}

public Builder syncToSafTransitions(long v) {
this.syncToSafTransitions = v;
return this;
}

public Builder appendTime(long v) {
this.appendTime = v;
return this;
}

public Builder syncTime(long v) {
this.syncTime = v;
return this;
}

public Builder rotationTime(long v) {
this.rotationTime = v;
return this;
}

public Builder ringBufferTime(long v) {
this.ringBufferTime = v;
return this;
}

public Builder fsSyncTime(long v) {
this.fsSyncTime = v;
return this;
}

public Builder batchSize(long v) {
this.batchSize = v;
return this;
}

public Builder pendingSyncCount(long v) {
this.pendingSyncCount = v;
return this;
}

public Builder pendingSyncWaitTime(long v) {
this.pendingSyncWaitTime = v;
return this;
}

public ReplicationLogMetricValues build() {
return new ReplicationLogMetricValues(this);
}
}
}
Loading