Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10126. Remove maxFlushedTransactionsInOneIteration from OzoneManagerDoubleBuffer #6007

Merged
merged 2 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,7 @@ OMClientResponse getResponse() {

private final Daemon daemon;
private final OMMetadataManager omMetadataManager;
private final AtomicLong flushedTransactionCount = new AtomicLong(0);
private final AtomicLong flushIterations = new AtomicLong(0);
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final OzoneManagerDoubleBufferMetrics ozoneManagerDoubleBufferMetrics;
private long maxFlushedTransactionsInOneIteration;

private final Consumer<TermIndex> updateLastAppliedIndex;
private final boolean isRatisEnabled;
Expand Down Expand Up @@ -189,6 +185,13 @@ public OzoneManagerDoubleBuffer build() {
}
}

private final OzoneManagerDoubleBufferMetrics metrics = OzoneManagerDoubleBufferMetrics.create();

/** Accumulative count (for testing and debug only). */
private final AtomicLong flushedTransactionCount = new AtomicLong();
/** The number of flush iterations (for testing and debug only). */
private final AtomicLong flushIterations = new AtomicLong();

@SuppressWarnings("checkstyle:parameternumber")
private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
Consumer<TermIndex> updateLastAppliedIndex,
Expand All @@ -203,8 +206,6 @@ private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
this.unFlushedTransactions = new Semaphore(maxUnFlushedTransactions);
this.omMetadataManager = omMetadataManager;
this.updateLastAppliedIndex = updateLastAppliedIndex;
this.ozoneManagerDoubleBufferMetrics =
OzoneManagerDoubleBufferMetrics.create();
this.flushNotifier = flushNotifier;
isRunning.set(true);
// Daemon thread which runs in background and flushes transactions to DB.
Expand Down Expand Up @@ -354,8 +355,7 @@ private void flushBatch(Queue<Entry> buffer) throws IOException {
() -> omMetadataManager.getStore()
.commitBatchOperation(batchOperation));

ozoneManagerDoubleBufferMetrics.updateFlushTime(
Time.monotonicNow() - startTime);
metrics.updateFlushTime(Time.monotonicNow() - startTime);
}

// Complete futures first and then do other things.
Expand All @@ -367,14 +367,10 @@ private void flushBatch(Queue<Entry> buffer) throws IOException {
.forEach(f -> f.complete(null));
}

flushedTransactionCount.addAndGet(flushedTransactionsSize);
flushIterations.incrementAndGet();

if (LOG.isDebugEnabled()) {
LOG.debug("Sync iteration {} flushed transactions in this iteration {}",
flushIterations.get(),
flushedTransactionsSize);
}
final long accumulativeCount = flushedTransactionCount.addAndGet(flushedTransactionsSize);
final long flushedIterations = flushIterations.incrementAndGet();
LOG.debug("Sync iteration: {}, size in this iteration: {}, accumulative count: {}",
flushedIterations, flushedTransactionsSize, accumulativeCount);

// Clean up committed transactions.
cleanupCache(cleanupEpochs);
Expand All @@ -386,7 +382,7 @@ private void flushBatch(Queue<Entry> buffer) throws IOException {
updateLastAppliedIndex.accept(lastTransaction);

// set metrics.
updateMetrics(flushedTransactionsSize);
metrics.updateFlush(flushedTransactionsSize);
}

private String addToBatch(Queue<Entry> buffer, BatchOperation batchOperation) {
Expand Down Expand Up @@ -492,25 +488,6 @@ private void cleanupCache(Map<String, List<Long>> cleanupEpochs) {
private synchronized void clearReadyBuffer() {
readyBuffer.clear();
}
/**
* Update OzoneManagerDoubleBuffer metrics values.
*/
private void updateMetrics(int flushedTransactionsSize) {
ozoneManagerDoubleBufferMetrics.incrTotalNumOfFlushOperations();
ozoneManagerDoubleBufferMetrics.incrTotalSizeOfFlushedTransactions(
flushedTransactionsSize);
ozoneManagerDoubleBufferMetrics.setAvgFlushTransactionsInOneIteration(
(float) ozoneManagerDoubleBufferMetrics
.getTotalNumOfFlushedTransactions() /
ozoneManagerDoubleBufferMetrics.getTotalNumOfFlushOperations());
if (maxFlushedTransactionsInOneIteration < flushedTransactionsSize) {
maxFlushedTransactionsInOneIteration = flushedTransactionsSize;
ozoneManagerDoubleBufferMetrics
.setMaxNumberOfTransactionsFlushedInOneIteration(
flushedTransactionsSize);
}
ozoneManagerDoubleBufferMetrics.updateQueueSize(flushedTransactionsSize);
}

/**
* Stop OM DoubleBuffer flush thread.
Expand All @@ -520,7 +497,7 @@ private void updateMetrics(int flushedTransactionsSize) {
@SuppressWarnings("squid:S2142")
public void stop() {
stopDaemon();
ozoneManagerDoubleBufferMetrics.unRegister();
metrics.unRegister();
}

@VisibleForTesting
Expand Down Expand Up @@ -553,22 +530,6 @@ private void terminate(Throwable t, int status, OMResponse omResponse) {
ExitUtils.terminate(status, message.toString(), t, LOG);
}

/**
* Returns the flushed transaction count to OM DB.
* @return flushedTransactionCount
*/
public long getFlushedTransactionCount() {
return flushedTransactionCount.get();
}

/**
* Returns total number of flush iterations run by sync thread.
* @return flushIterations
*/
public long getFlushIterations() {
return flushIterations.get();
}

/**
* Add OmResponseBufferEntry to buffer.
*/
Expand Down Expand Up @@ -623,8 +584,20 @@ private synchronized void swapCurrentAndReadyBuffer() {
}

@VisibleForTesting
public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
return ozoneManagerDoubleBufferMetrics;
OzoneManagerDoubleBufferMetrics getMetrics() {
return metrics;
}

/** @return the flushed transaction count to OM DB. */
@VisibleForTesting
long getFlushedTransactionCountForTesting() {
return flushedTransactionCount.get();
}

/** @return total number of flush iterations run by sync thread. */
@VisibleForTesting
long getFlushIterationsForTesting() {
return flushIterations.get();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand All @@ -37,16 +35,13 @@
import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerStateMachineMetrics;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.response.DummyOMClientResponse;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
import org.apache.hadoop.ozone.protocolPB.RequestHandler;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -90,7 +85,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
LoggerFactory.getLogger(OzoneManagerStateMachine.class);
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final OzoneManagerRatisServer omRatisServer;
private final OzoneManager ozoneManager;
private RequestHandler handler;
private RaftGroupId raftGroupId;
Expand All @@ -106,14 +100,10 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
/** The last index skipped by {@link #notifyTermIndexUpdated(long, long)}. */
private volatile long lastSkippedIndex = RaftLog.INVALID_LOG_INDEX;

private OzoneManagerStateMachineMetrics metrics;


public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer,
boolean isTracingEnabled) throws IOException {
this.omRatisServer = ratisServer;
this.isTracingEnabled = isTracingEnabled;
this.ozoneManager = omRatisServer.getOzoneManager();
this.ozoneManager = ratisServer.getOzoneManager();

loadSnapshotInfoFromDB();
this.threadPrefix = ozoneManager.getThreadNamePrefix();
Expand All @@ -132,7 +122,6 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer,
.setNameFormat(threadPrefix + "InstallSnapshotThread").build();
this.installSnapshotExecutor =
HadoopExecutors.newSingleThreadExecutor(installSnapshotThreadFactory);
this.metrics = OzoneManagerStateMachineMetrics.create();
}

/**
Expand Down Expand Up @@ -270,7 +259,14 @@ public TransactionContext startTransaction(
ctxt.setException(ioe);
return ctxt;
}
return handleStartTransactionRequests(raftClientRequest, omRequest);

return TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(raftClientRequest.getMessage().getContent())
.setStateMachineContext(omRequest)
.build();
}

@Override
Expand Down Expand Up @@ -499,18 +495,9 @@ public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
LOG.info("Received install snapshot notification from OM leader: {} with " +
"term index: {}", leaderNodeId, firstTermIndexInLog);

CompletableFuture<TermIndex> future = CompletableFuture.supplyAsync(
return CompletableFuture.supplyAsync(
() -> ozoneManager.installSnapshotFromLeader(leaderNodeId),
installSnapshotExecutor);
return future;
}

/**
* Notifies the state machine that the raft peer is no longer leader.
*/
@Override
public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
throws IOException {
}

@Override
Expand All @@ -530,29 +517,10 @@ public void close() {
}
}

/**
* Handle the RaftClientRequest and return TransactionContext object.
* @param raftClientRequest
* @param omRequest
* @return TransactionContext
*/
private TransactionContext handleStartTransactionRequests(
RaftClientRequest raftClientRequest, OMRequest omRequest) {

return TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(raftClientRequest.getMessage().getContent())
.setStateMachineContext(omRequest)
.build();
}

/**
* Submits write request to OM and returns the response Message.
* @param request OMRequest
* @return response from OM
* @throws ServiceException
*/
private OMResponse runCommand(OMRequest request, TermIndex termIndex) {
try {
Expand Down Expand Up @@ -635,23 +603,10 @@ public OzoneManagerRequestHandler getHandler() {
return (OzoneManagerRequestHandler) this.handler;
}

@VisibleForTesting
public void setRaftGroupId(RaftGroupId raftGroupId) {
this.raftGroupId = raftGroupId;
}

@VisibleForTesting
public OzoneManagerStateMachineMetrics getMetrics() {
return this.metrics;
}

public void stop() {
ozoneManagerDoubleBuffer.stop();
HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
if (metrics != null) {
metrics.unRegister();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ public void updateQueueSize(long size) {
queueSize.add(size);
}

public void updateFlush(int flushedTransactionsInOneIteration) {
incrTotalNumOfFlushOperations();
incrTotalSizeOfFlushedTransactions(flushedTransactionsInOneIteration);
setAvgFlushTransactionsInOneIteration(getTotalNumOfFlushedTransactions() / (float)getTotalNumOfFlushOperations());
final long max = getMaxNumberOfTransactionsFlushedInOneIteration();
if (flushedTransactionsInOneIteration > max) {
maxNumberOfTransactionsFlushedInOneIteration.incr(flushedTransactionsInOneIteration - max);
}
updateQueueSize(flushedTransactionsInOneIteration);
}

@VisibleForTesting
public MutableStat getQueueSize() {
return queueSize;
Expand Down