Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-7111. SCMHADBTransactionBuffer not flushed in time #3670

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,24 @@ public final class ScmConfigKeys {
public static final boolean
OZONE_SCM_HA_RATIS_SERVER_ELECTION_PRE_VOTE_DEFAULT = false;

// SCMHADBTransactionBufferFlushMonitorService related
public static final String OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL =
"ozone.scm.ha.dbtransactionbuffer.flush.interval";
public static final long
OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL_DEFAULT = 60 * 1000L;
public static final String
OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_TIMEOUT =
"ozone.scm.ha.dbtransactionbuffer.monitor.service.timeout";
public static final long
OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_TIMEOUT_DEFAULT =
300 * 1000L;
public static final String
OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_INTERVAL =
"ozone.scm.ha.dbtransactionbuffer.monitor.service.interval";
public static final long
OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_INTERVAL_DEFAULT =
30 * 1000L;

public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_SCMAUDIT =
"ozone.audit.log.debug.cmd.list.scmaudit";
/**
Expand Down
33 changes: 33 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3145,6 +3145,39 @@
<description>Deadline for SCM DB checkpoint interval.</description>
</property>

<property>
<name>ozone.scm.ha.dbtransactionbuffer.flush.interval</name>
<value>60s</value>
<tag>SCM, OZONE, HA</tag>
<description>
SCMHADBTransactionbuffer flush interval, if elapses more than
this time since the last flush, the background monitor will trigger flush
the non-empty buffer into DB.
Unit could be defined with postfix (ns,ms,s,m,h,d).
</description>
</property>
<property>
<name>ozone.scm.ha.dbtransactionbuffer.monitor.service.timeout</name>
<value>300s</value>
<tag>SCM, OZONE, HA</tag>
<description>
A timeout value of SCMHADBTransactionBufferMonitorService. If this is set
greater than 0, the service will stop waiting for the flush after
this time.
Unit could be defined with postfix (ns,ms,s,m,h,d).
</description>
</property>
<property>
<name>ozone.scm.ha.dbtransactionbuffer.monitor.service.interval</name>
<value>30s</value>
<tag>SCM, OZONE, HA</tag>
<description>
Time interval of the SCMHADBTransactionBufferMonitorService.
The service runs on each SCM periodically (if ratis enabled), checks
the pending transaction buffer and then flushes it.
Unit could be defined with postfix (ns,ms,s,m,h,d).
</description>
</property>

<property>
<name>ozone.s3g.kerberos.keytab.file</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,27 @@
package org.apache.hadoop.hdds.scm.ha;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.util.Time;
import org.apache.ratis.statemachine.SnapshotInfo;

import java.io.IOException;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;

/**
Expand All @@ -42,6 +52,9 @@ public class SCMHADBTransactionBufferImpl implements SCMHADBTransactionBuffer {
private BatchOperation currentBatchOperation;
private TransactionInfo latestTrxInfo;
private SnapshotInfo latestSnapshot;
private long lastFlushTime;
private AtomicLong bufferSize;
private SCMHADBTransactionBufferMonitorService monitorService;

public SCMHADBTransactionBufferImpl(StorageContainerManager scm)
throws IOException {
Expand All @@ -57,12 +70,14 @@ private BatchOperation getCurrentBatchOperation() {
public <KEY, VALUE> void addToBuffer(
Table<KEY, VALUE> table, KEY key, VALUE value) throws IOException {
table.putWithBatch(getCurrentBatchOperation(), key, value);
bufferSize.incrementAndGet();
}

@Override
public <KEY, VALUE> void removeFromBuffer(Table<KEY, VALUE> table, KEY key)
throws IOException {
table.deleteWithBatch(getCurrentBatchOperation(), key);
bufferSize.incrementAndGet();
}

@Override
Expand Down Expand Up @@ -91,7 +106,7 @@ public void setLatestSnapshot(SnapshotInfo latestSnapshot) {
}

@Override
public void flush() throws IOException {
public synchronized void flush() throws IOException {
// write latest trx info into trx table in the same batch
Table<String, TransactionInfo> transactionInfoTable
= metadataStore.getTransactionInfoTable();
Expand All @@ -109,6 +124,8 @@ public void flush() throws IOException {
Preconditions.checkArgument(
deletedBlockLog instanceof DeletedBlockLogImpl);
((DeletedBlockLogImpl) deletedBlockLog).onFlush();
bufferSize.set(0);
lastFlushTime = Time.monotonicNow();
}

@Override
Expand All @@ -129,6 +146,28 @@ public void init() throws IOException {
.build();
}
latestSnapshot = latestTrxInfo.toSnapshotInfo();

if (monitorService == null) {
OzoneConfiguration conf = scm.getConfiguration();
long serviceInterval = conf.getTimeDuration(
OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_INTERVAL,
OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
long serviceTimeout = conf.getTimeDuration(
OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_TIMEOUT,
OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
long flushTimeout = conf.getTimeDuration(
OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL,
OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
monitorService = new SCMHADBTransactionBufferMonitorService(
this, serviceInterval, serviceTimeout, TimeUnit.MILLISECONDS,
flushTimeout);
monitorService.start();
}
lastFlushTime = Time.monotonicNow();
bufferSize = new AtomicLong(0);
}

@Override
Expand All @@ -138,5 +177,17 @@ public String toString() {

@Override
public void close() throws IOException {
if (monitorService != null) {
monitorService.shutdown();
}
}

public long getBufferSize() {
return bufferSize.get();
}

public long getLastFlushTime() {
return lastFlushTime;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.apache.hadoop.hdds.scm.ha;

import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
* This service monitors the flush time and SCMHADBTransactionBuffer, triggers
* flush when last flush time elapsed too long and buffer size is not 0.
* This provides the short-term visibility of SCMHADBTransactionBuffer where
* flush opportunity is quantitative and not periodic.
*/
class SCMHADBTransactionBufferMonitorService extends BackgroundService {
private static final Logger LOG = LoggerFactory.
getLogger(SCMHADBTransactionBufferMonitorService.class);

private final SCMHADBTransactionBufferImpl dbBuffer;
private final long flushInterval;
private static final int BUFFER_FLUSH_WORKER_POOL_SIZE = 1;


SCMHADBTransactionBufferMonitorService(
SCMHADBTransactionBufferImpl buffer,
long serviceInterval,
long serviceTimeout,
TimeUnit unit,
long flushTimeout) {
super("SCMHADBTransactionBufferMonitorService",
serviceInterval, unit, BUFFER_FLUSH_WORKER_POOL_SIZE, serviceTimeout);
this.dbBuffer = buffer;
this.flushInterval = flushTimeout;
}

@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
queue.add(new SCMHADBTransactionBufferFlushTask());
return queue;
}

private class SCMHADBTransactionBufferFlushTask implements BackgroundTask {

@Override
public BackgroundTaskResult.EmptyTaskResult call() throws Exception {
long elapsedTime = Time.monotonicNow() - dbBuffer.getLastFlushTime();
long size = dbBuffer.getBufferSize();
if (elapsedTime > flushInterval && size > 0) {
dbBuffer.flush();
LOG.info("Last flush elapsed {} ms > timeout {} ms, and buffer size" +
" is {}, trigger SCMHADBTransactionBuffer flush ",
elapsedTime, flushInterval, size);
}
return BackgroundTaskResult.EmptyTaskResult.newResult();
}

@Override
public int getPriority() {
return 0;
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer;
import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBufferStub;
import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBufferImpl;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
Expand Down Expand Up @@ -72,6 +72,9 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_TIMEOUT;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -102,11 +105,16 @@ public void setup() throws Exception {
conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
conf.setLong(OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL, 5000);
conf.setLong(OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_INTERVAL,
2000);
conf.setLong(OZONE_SCM_HA_DBTRANSACTIONBUFFER_MONITOR_SERVICE_TIMEOUT,
30 * 1000L);

scm = HddsTestUtils.getScm(conf);
containerManager = Mockito.mock(ContainerManager.class);
containerTable = scm.getScmMetadataStore().getContainerTable();
scmHADBTransactionBuffer =
new SCMHADBTransactionBufferStub(scm.getScmMetadataStore().getStore());
scmHADBTransactionBuffer = new SCMHADBTransactionBufferImpl(scm);
metrics = Mockito.mock(ScmBlockDeletingServiceMetrics.class);
deletedBlockLog = new DeletedBlockLogImpl(conf,
containerManager,
Expand Down Expand Up @@ -467,7 +475,7 @@ public void testPersistence() throws Exception {
// close db and reopen it again to make sure
// currentTxnID = 50
deletedBlockLog.close();
new DeletedBlockLogImpl(conf,
deletedBlockLog = new DeletedBlockLogImpl(conf,
containerManager,
scm.getScmHAManager().getRatisServer(),
scm.getScmMetadataStore().getDeletedBlocksTXTable(),
Expand All @@ -480,6 +488,22 @@ public void testPersistence() throws Exception {
//Assertions.assertEquals((long)deletedBlockLog.getCurrentTXID(), 50L);
}

@Test
public void testTimeTriggeredFlush() throws Exception {
addTransactions(generateData(50), false);
// txns are invisible until transaction buffer flush.
List<DeletedBlocksTransaction> blocks =
getTransactions(BLOCKS_PER_TXN * 50);
Assertions.assertEquals(0, blocks.size());

// Sleep more than the ozone.scm.ha.dbtransactionbuffer.flush.interval,
// buffer shall be flushed, and transactions now be visible
Thread.sleep(6000);

blocks = getTransactions(BLOCKS_PER_TXN * 50);
Assertions.assertEquals(50, blocks.size());
}

@Test
public void testDeletedBlockTransactions()
throws IOException, TimeoutException {
Expand Down