Skip to content

Commit

Permalink
HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return t…
Browse files Browse the repository at this point in the history
…he expected synced file length. (#1970)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
comnetwork committed Jul 4, 2020
1 parent e614b89 commit a7a0e1a
Show file tree
Hide file tree
Showing 17 changed files with 116 additions and 2 deletions.
Expand Up @@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable {
*/
@Override
void close() throws IOException;

/**
* @return byteSize success synced to underlying filesystem.
*/
long getSyncedLength();
}
Expand Up @@ -575,4 +575,9 @@ public void close() throws IOException {
public boolean isBroken() {
return state == State.BROKEN;
}

@Override
public long getSyncedLength() {
return this.ackedBlockLength;
}
}
Expand Up @@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {

private final ExecutorService executor;

private volatile long syncedLength = 0;

public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
this.out = out;
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
Expand Down Expand Up @@ -91,7 +93,11 @@ private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer
out.hflush();
}
}
future.complete(out.getPos());
long pos = out.getPos();
if(pos > this.syncedLength) {
this.syncedLength = pos;
}
future.complete(pos);
} catch (IOException e) {
future.completeExceptionally(e);
return;
Expand Down Expand Up @@ -124,4 +130,9 @@ public void close() throws IOException {
public boolean isBroken() {
return false;
}

@Override
public long getSyncedLength() {
return this.syncedLength;
}
}
Expand Up @@ -1109,7 +1109,7 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
Path currentPath = getOldPath();
if (path.equals(currentPath)) {
W writer = this.writer;
return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
} else {
return OptionalLong.empty();
}
Expand Down
Expand Up @@ -231,4 +231,9 @@ protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws
protected OutputStream getOutputStreamForCellEncoder() {
return asyncOutputWrapper;
}

@Override
public long getSyncedLength() {
return this.output.getSyncedLength();
}
}
Expand Up @@ -49,6 +49,11 @@ public long getLength() {
return writers.get(0).getLength();
}

@Override
public long getSyncedLength() {
return writers.get(0).getSyncedLength();
}

@Override
public void close() throws IOException {
Exception error = null;
Expand Down
Expand Up @@ -19,11 +19,14 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
Expand All @@ -46,6 +49,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter

protected FSDataOutputStream output;

private final AtomicLong syncedLength = new AtomicLong(0);

@Override
public void append(Entry entry) throws IOException {
entry.getKey().getBuilder(compressor).
Expand Down Expand Up @@ -85,6 +90,12 @@ public void sync(boolean forceSync) throws IOException {
} else {
fsdos.hflush();
}
AtomicUtils.updateMax(this.syncedLength, fsdos.getPos());
}

@Override
public long getSyncedLength() {
return this.syncedLength.get();
}

public FSDataOutputStream getStream() {
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -74,6 +75,22 @@ public interface WALProvider {

interface WriterBase extends Closeable {
long getLength();
/**
* NOTE: We add this method for {@link WALFileLengthProvider} used for replication,
* considering the case if we use {@link AsyncFSWAL},we write to 3 DNs concurrently,
* according to the visibility guarantee of HDFS, the data will be available immediately
* when arriving at DN since all the DNs will be considered as the last one in pipeline.
* This means replication may read uncommitted data and replicate it to the remote cluster
* and cause data inconsistency.
* The method {@link WriterBase#getLength} may return length which just in hdfs client
* buffer and not successfully synced to HDFS, so we use this method to return the length
* successfully synced to HDFS and replication thread could only read writing WAL file
* limited by this length.
* see also HBASE-14004 and this document for more details:
* https://docs.google.com/document/d/11AyWtGhItQs6vsLRIx32PwTxmBY3libXwGXI25obVEY/edit#
* @return byteSize successfully synced to underlying filesystem.
*/
long getSyncedLength();
}

// Writers are used internally. Users outside of the WAL should be relying on the
Expand Down
Expand Up @@ -151,6 +151,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down
Expand Up @@ -1246,6 +1246,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down
Expand Up @@ -190,6 +190,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down Expand Up @@ -374,6 +379,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down
Expand Up @@ -155,6 +155,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
CompletableFuture<Long> result = writer.sync(forceSync);
Expand Down
Expand Up @@ -109,6 +109,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
writerSyncFlag = forceSync;
Expand Down
Expand Up @@ -84,6 +84,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public void sync(boolean forceSync) throws IOException {
writerSyncFlag = forceSync;
Expand Down
Expand Up @@ -174,6 +174,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return oldWriter1.getLength();
}

@Override
public long getSyncedLength() {
return oldWriter1.getSyncedLength();
}
};
log.setWriter(newWriter1);

Expand Down Expand Up @@ -231,6 +236,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return oldWriter2.getLength();
}

@Override
public long getSyncedLength() {
return oldWriter2.getSyncedLength();
}
};
log.setWriter(newWriter2);

Expand Down
Expand Up @@ -44,6 +44,11 @@ public long getLength() {
return asyncWriter.getLength();
}

@Override
public long getSyncedLength() {
return asyncWriter.getSyncedLength();
}

@Override
public void append(Entry entry) throws IOException {
asyncWriter.append(entry);
Expand Down
Expand Up @@ -59,6 +59,11 @@ public long getLength() {
return localWriter.getLength();
}

@Override
public long getSyncedLength() {
return this.localWriter.getSyncedLength();
}

@Override
public void close() throws IOException {
Closeables.close(localWriter, true);
Expand Down

0 comments on commit a7a0e1a

Please sign in to comment.