Skip to content

Commit

Permalink
HBASE-19486: Ensure threadsafe WriteBufferPeriodicFlush operations
Browse files Browse the repository at this point in the history
Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
nielsbasjes authored and chia7712 committed Jan 2, 2018
1 parent 6a0e6fe commit a6081d3
Showing 1 changed file with 22 additions and 20 deletions.
Expand Up @@ -78,8 +78,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
private final long writeBufferSize;

private long writeBufferPeriodicFlushTimeoutMs;
private long writeBufferPeriodicFlushTimerTickMs = MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS;
private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0);
private final AtomicLong writeBufferPeriodicFlushTimerTickMs =
new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
private Timer writeBufferPeriodicFlushTimer = null;

private final int maxKeyValueSize;
Expand Down Expand Up @@ -188,7 +189,7 @@ public void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
}

if (currentWriteBufferSize.get() == 0) {
firstRecordInBufferTimestamp = System.currentTimeMillis();
firstRecordInBufferTimestamp.set(System.currentTimeMillis());
}

// This behavior is highly non-intuitive... it does not protect us against
Expand All @@ -214,23 +215,23 @@ public void mutate(List<? extends Mutation> ms) throws InterruptedIOException,

@VisibleForTesting
protected long getExecutedWriteBufferPeriodicFlushes() {
return executedWriteBufferPeriodicFlushes;
return executedWriteBufferPeriodicFlushes.get();
}

private long firstRecordInBufferTimestamp = 0;
private long executedWriteBufferPeriodicFlushes = 0;
private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0);
private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0);

private void timerCallbackForWriteBufferPeriodicFlush() {
if (currentWriteBufferSize.get() == 0) {
return; // Nothing to flush
}
long now = System.currentTimeMillis();
if (firstRecordInBufferTimestamp + writeBufferPeriodicFlushTimeoutMs > now) {
if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() > now) {
return; // No need to flush yet
}
// The first record in the writebuffer has been in there too long --> flush
try {
executedWriteBufferPeriodicFlushes++;
executedWriteBufferPeriodicFlushes.incrementAndGet();
flush();
} catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
Expand Down Expand Up @@ -370,18 +371,18 @@ public long getWriteBufferSize() {
}

@Override
public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs;
long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs.get();
long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get();

// Both parameters have minimal values.
this.writeBufferPeriodicFlushTimeoutMs = Math.max(0, timeoutMs);
this.writeBufferPeriodicFlushTimerTickMs =
Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs);
writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs));
writeBufferPeriodicFlushTimerTickMs.set(
Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs));

// If something changed we stop the old Timer.
if (this.writeBufferPeriodicFlushTimeoutMs != originalTimeoutMs ||
this.writeBufferPeriodicFlushTimerTickMs != originalTimerTickMs) {
if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs ||
writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs) {
if (writeBufferPeriodicFlushTimer != null) {
writeBufferPeriodicFlushTimer.cancel();
writeBufferPeriodicFlushTimer = null;
Expand All @@ -390,25 +391,26 @@ public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {

// If we have the need for a timer and there is none we start it
if (writeBufferPeriodicFlushTimer == null &&
writeBufferPeriodicFlushTimeoutMs > 0) {
writeBufferPeriodicFlushTimeoutMs.get() > 0) {
writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon.
writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
@Override
public void run() {
BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
}
}, writeBufferPeriodicFlushTimerTickMs, writeBufferPeriodicFlushTimerTickMs);
}, writeBufferPeriodicFlushTimerTickMs.get(),
writeBufferPeriodicFlushTimerTickMs.get());
}
}

@Override
public long getWriteBufferPeriodicFlushTimeoutMs() {
return this.writeBufferPeriodicFlushTimeoutMs;
return writeBufferPeriodicFlushTimeoutMs.get();
}

@Override
public long getWriteBufferPeriodicFlushTimerTickMs() {
return this.writeBufferPeriodicFlushTimerTickMs;
return writeBufferPeriodicFlushTimerTickMs.get();
}

@Override
Expand Down

0 comments on commit a6081d3

Please sign in to comment.