Skip to content

Commit

Permalink
Fix file fingerprint to use atomic get content hash of uploaded file
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Currently when complete is called on a file in Alluxio, a fingerprint of
the file will be created by performing a GetStauts on the file on the
UFS. If due to a concurrent write, the state of the file is different
than what was written through Alluxio, the fingerprint will not actually
match the content of the file in Alluxio. If this happens the state of
the file in Alluxio will always be out of sync with the UFS, and the
file will never be updated to the most recent version.
This is because metadata sync uses the fingerprint to see if the file
needs synchronization, and if the fingerprint does not match the file in
Alluxio there will be inconsistencies.

This PR fixes this by having the contentHash field of the fingerprint be
computed while the file is actually written on the UFS. For object
stores, this means the hash is taken from the result of the call to
PutObject. Unfortunately HDFS does not have a similar interface, so the
content hash is taken just after the output stream is closed to complete
the write. There could be a small chance that someone changes the file
in this window between the two operations.

pr-link: #16597
change-id: cid-64723be309bdb14b05613864af3b6a1bb30cba6d
  • Loading branch information
tcrain committed Feb 16, 2023
1 parent aea58cd commit 00da77c
Show file tree
Hide file tree
Showing 51 changed files with 657 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand Down Expand Up @@ -86,6 +87,11 @@ public int chunkSize() {
return mChunkSize;
}

@Override
public Optional<String> getUfsContentHash() {
return Optional.empty();
}

@Override
public void writeChunk(final ByteBuf buf) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand Down Expand Up @@ -97,6 +98,12 @@ public static DataWriter create(FileSystemContext context, long blockId, long bl
}
}

/**
* @return the content hash of the file if it is written to the UFS. Will only
* return a non-empty value after the data writer has been closed.
*/
Optional<String> getUfsContentHash();

/**
* Writes a chunk. This method takes the ownership of this chunk even if it fails to write
* the chunk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -243,14 +244,20 @@ public void cancel() {
* Wait for server to complete the inbound stream.
*
* @param timeoutMs maximum time to wait for server response
* @return the last response of the stream
*/
public void waitForComplete(long timeoutMs) throws IOException {
public Optional<ResT> waitForComplete(long timeoutMs) throws IOException {
if (mCompleted || mCanceled) {
return;
return Optional.empty();
}
while (receive(timeoutMs) != null) {
ResT prevResponse;
ResT response = null;
do {
// wait until inbound stream is closed from server.
}
prevResponse = response;
response = receive(timeoutMs);
} while (response != null);
return Optional.ofNullable(prevResponse);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.concurrent.NotThreadSafe;

Expand Down Expand Up @@ -104,17 +105,21 @@ public void sendDataMessage(DataMessage<ReqT, DataBuffer> message, long timeoutM
}

@Override
public void waitForComplete(long timeoutMs) throws IOException {
public Optional<ResT> waitForComplete(long timeoutMs) throws IOException {
if (mResponseMarshaller == null) {
super.waitForComplete(timeoutMs);
return;
return super.waitForComplete(timeoutMs);
}
// loop until the last response is received, whose result will be returned
DataMessage<ResT, DataBuffer> message;
DataMessage<ResT, DataBuffer> prevMessage = null;
while (!isCanceled() && (message = receiveDataMessage(timeoutMs)) != null) {
if (message.getBuffer() != null) {
message.getBuffer().release();
if (prevMessage != null && prevMessage.getBuffer() != null) {
prevMessage.getBuffer().release();
}
prevMessage = message;
}
super.waitForComplete(timeoutMs);
// note that the combineData call is responsible for releasing the buffer of prevMessage
ResT result = mResponseMarshaller.combineData(prevMessage);
return Optional.ofNullable(super.waitForComplete(timeoutMs).orElse(result));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand Down Expand Up @@ -68,6 +69,9 @@ public final class GrpcDataWriter implements DataWriter {
private final long mChunkSize;
private final GrpcBlockingStream<WriteRequest, WriteResponse> mStream;

/** The content hash resulting from the write operation if one is available. */
private String mContentHash = null;

/**
* The next pos to queue to the buffer.
*/
Expand Down Expand Up @@ -177,6 +181,11 @@ public long pos() {
return mPosToQueue;
}

@Override
public Optional<String> getUfsContentHash() {
return Optional.ofNullable(mContentHash);
}

@Override
public void writeChunk(final ByteBuf buf) throws IOException {
mPosToQueue += buf.readableBytes();
Expand Down Expand Up @@ -239,6 +248,9 @@ public void flush() throws IOException {
writeRequest, mAddress));
}
posWritten = response.getOffset();
if (response.hasContentHash()) {
mContentHash = response.getContentHash();
}
} while (mPosToQueue != posWritten);
}

Expand All @@ -249,7 +261,9 @@ public void close() throws IOException {
return;
}
mStream.close();
mStream.waitForComplete(mWriterCloseTimeoutMs);
mStream.waitForComplete(mWriterCloseTimeoutMs)
.ifPresent(writeResponse -> mContentHash = writeResponse.hasContentHash()
? writeResponse.getContentHash() : null);
} finally {
mClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand Down Expand Up @@ -119,6 +120,11 @@ public int chunkSize() {
return (int) mChunkSize;
}

@Override
public Optional<String> getUfsContentHash() {
return Optional.empty();
}

@Override
public void writeChunk(final ByteBuf buf) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand Down Expand Up @@ -82,6 +83,11 @@ public static UfsFallbackLocalFileDataWriter create(FileSystemContext context,
mIsWritingToLocal = mLocalFileDataWriter != null;
}

@Override
public Optional<String> getUfsContentHash() {
return mGrpcDataWriter.getUfsContentHash();
}

@Override
public void writeChunk(ByteBuf chunk) throws IOException {
if (mIsWritingToLocal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@NotThreadSafe
public class UnderFileSystemFileOutStream extends BlockOutStream {
private static final int ID_UNUSED = -1;
private final DataWriter mDataWriter;

/**
* Creates an instance of {@link UnderFileSystemFileOutStream} that writes to a UFS file.
Expand All @@ -52,6 +53,14 @@ public static UnderFileSystemFileOutStream create(FileSystemContext context,
*/
protected UnderFileSystemFileOutStream(DataWriter dataWriter, WorkerNetAddress address) {
super(dataWriter, Long.MAX_VALUE, address);
mDataWriter = dataWriter;
}

/**
* @return the data writer for the stream
*/
public DataWriter getDataWriter() {
return mDataWriter;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ public void close() throws IOException {
} else {
mUnderStorageOutputStream.close();
optionsBuilder.setUfsLength(mBytesWritten);
mUnderStorageOutputStream.getDataWriter().getUfsContentHash().ifPresent(
optionsBuilder::setContentHash);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;

/**
* A {@link DataWriter} which writes data to a bytebuffer.
Expand All @@ -26,6 +27,11 @@ public TestDataWriter(ByteBuffer buffer) {
mBuffer = buffer;
}

@Override
public Optional<String> getUfsContentHash() {
return Optional.empty();
}

@Override
public void writeChunk(ByteBuf chunk) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/**
* Forwarder for {@link UnderFileSystem} objects that works through with ForkJoinPool's
Expand Down Expand Up @@ -288,6 +289,11 @@ public Fingerprint getParsedFingerprint(String path) {
return mUfs.getParsedFingerprint(path);
}

@Override
public Fingerprint getParsedFingerprint(String path, @Nullable String contentHash) {
return mUfs.getParsedFingerprint(path, contentHash);
}

@Override
public UfsMode getOperationMode(Map<String, UfsMode> physicalUfsState) {
return mUfs.getOperationMode(physicalUfsState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import javax.annotation.Nullable;

/**
* Marshaller for data messages.
Expand Down Expand Up @@ -104,7 +105,7 @@ public DataBuffer pollBuffer(T message) {
* @param message the message to be combined
* @return the message with the combined buffer
*/
public abstract T combineData(DataMessage<T, DataBuffer> message);
public abstract T combineData(@Nullable DataMessage<T, DataBuffer> message);

/**
* Serialize the message to buffers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.io.InputStream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand Down Expand Up @@ -83,7 +84,7 @@ protected ReadResponse deserialize(ReadableBuffer buffer) throws IOException {
}

@Override
public ReadResponse combineData(DataMessage<ReadResponse, DataBuffer> message) {
public ReadResponse combineData(@Nullable DataMessage<ReadResponse, DataBuffer> message) {
if (message == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.io.InputStream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand Down Expand Up @@ -99,7 +100,7 @@ protected WriteRequest deserialize(ReadableBuffer buffer) throws IOException {
}

@Override
public WriteRequest combineData(DataMessage<WriteRequest, DataBuffer> message) {
public WriteRequest combineData(@Nullable DataMessage<WriteRequest, DataBuffer> message) {
if (message == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

/**
* A {@link AtomicFileOutputStream} writes to a temporary file and renames on close. This ensures
* that writing to the stream is atomic, i.e., all writes become readable only after a close.
*/
@NotThreadSafe
public class AtomicFileOutputStream extends OutputStream {
public class AtomicFileOutputStream extends OutputStream implements ContentHashable {
private static final Logger LOG = LoggerFactory.getLogger(AtomicFileOutputStream.class);

private AtomicFileOutputStreamCallback mUfs;
Expand Down Expand Up @@ -95,5 +96,15 @@ public void close() throws IOException {
// TODO(chaomin): consider setMode of the ufs file.
mClosed = true;
}

@Override
public Optional<String> getContentHash() throws IOException {
// get the content hash immediately after the file has completed writing
// which will be used for generating the fingerprint of the file in Alluxio
// ideally this value would be received as a result from the close call
// so that we would be sure to have the hash relating to the file uploaded
// (but such an API is not available for the UFSs that use this stream type)
return Optional.of(mUfs.getFileStatus(mPermanentPath).getContentHash());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public String getFingerprint(String path) {
if (aclPair == null || aclPair.getFirst() == null || !aclPair.getFirst().hasExtended()) {
return Fingerprint.create(getUnderFSType(), status).serialize();
} else {
return Fingerprint.create(getUnderFSType(), status, aclPair.getFirst()).serialize();
return Fingerprint.create(getUnderFSType(), status, null, aclPair.getFirst()).serialize();
}
} catch (Exception e) {
// In certain scenarios, it is expected that the UFS path does not exist.
Expand All @@ -120,14 +120,19 @@ public String getFingerprint(String path) {

@Override
public Fingerprint getParsedFingerprint(String path) {
return getParsedFingerprint(path, null);
}

@Override
public Fingerprint getParsedFingerprint(String path, @Nullable String contentHash) {
try {
UfsStatus status = getStatus(path);
Pair<AccessControlList, DefaultAccessControlList> aclPair = getAclPair(path);

if (aclPair == null || aclPair.getFirst() == null || !aclPair.getFirst().hasExtended()) {
return Fingerprint.create(getUnderFSType(), status);
return Fingerprint.create(getUnderFSType(), status, contentHash);
} else {
return Fingerprint.create(getUnderFSType(), status, aclPair.getFirst());
return Fingerprint.create(getUnderFSType(), status, contentHash, aclPair.getFirst());
}
} catch (IOException e) {
return Fingerprint.INVALID_FINGERPRINT;
Expand Down
Loading

0 comments on commit 00da77c

Please sign in to comment.