From a7a0e1a596134c749c1ac3b7c730ff52f8391f4b Mon Sep 17 00:00:00 2001 From: chenglei Date: Sat, 4 Jul 2020 21:00:35 +0800 Subject: [PATCH] HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length. (#1970) Signed-off-by: Duo Zhang --- .../hadoop/hbase/io/asyncfs/AsyncFSOutput.java | 5 +++++ .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 5 +++++ .../hbase/io/asyncfs/WrapperAsyncFSOutput.java | 13 ++++++++++++- .../hbase/regionserver/wal/AbstractFSWAL.java | 2 +- .../wal/AsyncProtobufLogWriter.java | 5 +++++ .../regionserver/wal/CombinedAsyncWriter.java | 5 +++++ .../regionserver/wal/ProtobufLogWriter.java | 11 +++++++++++ .../apache/hadoop/hbase/wal/WALProvider.java | 17 +++++++++++++++++ .../regionserver/TestFailedAppendAndSync.java | 5 +++++ .../hadoop/hbase/regionserver/TestHRegion.java | 5 +++++ .../hbase/regionserver/TestWALLockup.java | 10 ++++++++++ .../hbase/regionserver/wal/TestAsyncFSWAL.java | 5 +++++ .../wal/TestAsyncFSWALDurability.java | 5 +++++ .../regionserver/wal/TestFSHLogDurability.java | 5 +++++ .../hbase/regionserver/wal/TestLogRolling.java | 10 ++++++++++ .../regionserver/wal/WriterOverAsyncWriter.java | 5 +++++ .../replication/DualAsyncFSWALForTest.java | 5 +++++ 17 files changed, 116 insertions(+), 2 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java index 3c520b80e3a1..059ca00b02cc 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java @@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable { */ @Override void close() throws IOException; + + /** + * @return byteSize success synced to underlying filesystem. + */ + long getSyncedLength(); } diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 154ae296cc7b..74e7f68b0ada 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -575,4 +575,9 @@ public void close() throws IOException { public boolean isBroken() { return state == State.BROKEN; } + + @Override + public long getSyncedLength() { + return this.ackedBlockLength; + } } diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java index bbb4e5468936..39f1f71e2473 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java @@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput { private final ExecutorService executor; + private volatile long syncedLength = 0; + public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) { this.out = out; this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) @@ -91,7 +93,11 @@ private void flush0(CompletableFuture future, ByteArrayOutputStream buffer out.hflush(); } } - future.complete(out.getPos()); + long pos = out.getPos(); + if(pos > this.syncedLength) { + this.syncedLength = pos; + } + future.complete(pos); } catch (IOException e) { future.completeExceptionally(e); return; @@ -124,4 +130,9 @@ public void close() throws IOException { public boolean isBroken() { return false; } + + @Override + public long getSyncedLength() { + return this.syncedLength; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 919e3b06ce22..a1e4facde712 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -1109,7 +1109,7 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) { Path currentPath = getOldPath(); if (path.equals(currentPath)) { W writer = this.writer; - return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); + return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); } else { return OptionalLong.empty(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index e731611b5c7b..8c944b1bdf57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -231,4 +231,9 @@ protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws protected OutputStream getOutputStreamForCellEncoder() { return asyncOutputWrapper; } + + @Override + public long getSyncedLength() { + return this.output.getSyncedLength(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java index 2844f80d1d04..850359187ae5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java @@ -49,6 +49,11 @@ public long getLength() { return writers.get(0).getLength(); } + @Override + public long getSyncedLength() { + return writers.get(0).getSyncedLength(); + } + @Override public void close() throws IOException { Exception error = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index ff08da8f4434..4bbc13d3ab88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -19,11 +19,14 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.util.AtomicUtils; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.wal.FSHLogProvider; @@ -46,6 +49,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter protected FSDataOutputStream output; + private final AtomicLong syncedLength = new AtomicLong(0); + @Override public void append(Entry entry) throws IOException { entry.getKey().getBuilder(compressor). @@ -85,6 +90,12 @@ public void sync(boolean forceSync) throws IOException { } else { fsdos.hflush(); } + AtomicUtils.updateMax(this.syncedLength, fsdos.getPos()); + } + + @Override + public long getSyncedLength() { + return this.syncedLength.get(); } public FSDataOutputStream getStream() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 6f0b983444ce..c3bd14995077 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -74,6 +75,22 @@ public interface WALProvider { interface WriterBase extends Closeable { long getLength(); + /** + * NOTE: We add this method for {@link WALFileLengthProvider} used for replication, + * considering the case if we use {@link AsyncFSWAL},we write to 3 DNs concurrently, + * according to the visibility guarantee of HDFS, the data will be available immediately + * when arriving at DN since all the DNs will be considered as the last one in pipeline. + * This means replication may read uncommitted data and replicate it to the remote cluster + * and cause data inconsistency. + * The method {@link WriterBase#getLength} may return length which just in hdfs client + * buffer and not successfully synced to HDFS, so we use this method to return the length + * successfully synced to HDFS and replication thread could only read writing WAL file + * limited by this length. + * see also HBASE-14004 and this document for more details: + * https://docs.google.com/document/d/11AyWtGhItQs6vsLRIx32PwTxmBY3libXwGXI25obVEY/edit# + * @return byteSize successfully synced to underlying filesystem. + */ + long getSyncedLength(); } // Writers are used internally. Users outside of the WAL should be relying on the diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 464afad37147..1313886e38a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -151,6 +151,11 @@ public void append(Entry entry) throws IOException { public long getLength() { return w.getLength(); } + + @Override + public long getSyncedLength() { + return w.getSyncedLength(); + } }; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 138c8adac600..3a5dc6a43f94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1246,6 +1246,11 @@ public void append(Entry entry) throws IOException { public long getLength() { return w.getLength(); } + + @Override + public long getSyncedLength() { + return w.getSyncedLength(); + } }; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index eb1b92e572eb..21d6b7ca428d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -190,6 +190,11 @@ public void append(Entry entry) throws IOException { public long getLength() { return w.getLength(); } + + @Override + public long getSyncedLength() { + return w.getSyncedLength(); + } }; } } @@ -374,6 +379,11 @@ public void append(Entry entry) throws IOException { public long getLength() { return w.getLength(); } + + @Override + public long getSyncedLength() { + return w.getSyncedLength(); + } }; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 8360f1922290..717352708460 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -155,6 +155,11 @@ public long getLength() { return writer.getLength(); } + @Override + public long getSyncedLength() { + return writer.getSyncedLength(); + } + @Override public CompletableFuture sync(boolean forceSync) { CompletableFuture result = writer.sync(forceSync); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java index 353f54935437..a482d934e948 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java @@ -109,6 +109,11 @@ public long getLength() { return writer.getLength(); } + @Override + public long getSyncedLength() { + return writer.getSyncedLength(); + } + @Override public CompletableFuture sync(boolean forceSync) { writerSyncFlag = forceSync; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java index 9c460588fdbc..3c250446bec9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java @@ -84,6 +84,11 @@ public long getLength() { return writer.getLength(); } + @Override + public long getSyncedLength() { + return writer.getSyncedLength(); + } + @Override public void sync(boolean forceSync) throws IOException { writerSyncFlag = forceSync; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 691250a56092..0712b594c34d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -174,6 +174,11 @@ public void append(Entry entry) throws IOException { public long getLength() { return oldWriter1.getLength(); } + + @Override + public long getSyncedLength() { + return oldWriter1.getSyncedLength(); + } }; log.setWriter(newWriter1); @@ -231,6 +236,11 @@ public void append(Entry entry) throws IOException { public long getLength() { return oldWriter2.getLength(); } + + @Override + public long getSyncedLength() { + return oldWriter2.getSyncedLength(); + } }; log.setWriter(newWriter2); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java index c9e8ce22e26d..8ae74ad3f38d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java @@ -44,6 +44,11 @@ public long getLength() { return asyncWriter.getLength(); } + @Override + public long getSyncedLength() { + return asyncWriter.getSyncedLength(); + } + @Override public void append(Entry entry) throws IOException { asyncWriter.append(entry); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java index d4313a84963e..19e51126e367 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java @@ -59,6 +59,11 @@ public long getLength() { return localWriter.getLength(); } + @Override + public long getSyncedLength() { + return this.localWriter.getSyncedLength(); + } + @Override public void close() throws IOException { Closeables.close(localWriter, true);