Skip to content

Commit

Permalink
[FLINK-7140][blob] add an additional random component into the BlobKey
Browse files Browse the repository at this point in the history
This should guard us from uploading (and deleting) the same file more than
once and also from hash collisions.

This closes #4359.
  • Loading branch information
Nico Kruber authored and tillrohrmann committed Oct 16, 2017
1 parent b2b9463 commit f853f33
Show file tree
Hide file tree
Showing 19 changed files with 405 additions and 171 deletions.
Expand Up @@ -159,8 +159,13 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO
try { try {
if (blobView.get(jobId, blobKey, incomingFile)) { if (blobView.get(jobId, blobKey, incomingFile)) {
// now move the temp file to our local cache atomically // now move the temp file to our local cache atomically
BlobUtils.moveTempFileToStore( readWriteLock.writeLock().lock();
incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null); try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, log, null);
} finally {
readWriteLock.writeLock().unlock();
}


return localFile; return localFile;
} }
Expand All @@ -172,8 +177,13 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO
BlobClient.downloadFromBlobServer( BlobClient.downloadFromBlobServer(
jobId, blobKey, incomingFile, serverAddress, blobClientConfig, numFetchRetries); jobId, blobKey, incomingFile, serverAddress, blobClientConfig, numFetchRetries);


BlobUtils.moveTempFileToStore( readWriteLock.writeLock().lock();
incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null); try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, log, null);
} finally {
readWriteLock.writeLock().unlock();
}


return localFile; return localFile;
} finally { } finally {
Expand Down
Expand Up @@ -47,17 +47,18 @@
import java.net.Socket; import java.net.Socket;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;


import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE; import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION; import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT; import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT; import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT;
import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION; import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR; import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY; import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobUtils.readFully; import static org.apache.flink.runtime.blob.BlobUtils.readFully;
import static org.apache.flink.runtime.blob.BlobUtils.readLength; import static org.apache.flink.runtime.blob.BlobUtils.readLength;
import static org.apache.flink.runtime.blob.BlobUtils.writeLength; import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
Expand Down Expand Up @@ -496,13 +497,16 @@ private static BlobKey receiveAndCheckPutResponse(
else if (response == RETURN_OKAY) { else if (response == RETURN_OKAY) {


BlobKey remoteKey = BlobKey.readFromInputStream(is); BlobKey remoteKey = BlobKey.readFromInputStream(is);
BlobKey localKey = BlobKey.createKey(blobType, md.digest()); byte[] localHash = md.digest();


if (!localKey.equals(remoteKey)) { if (blobType != remoteKey.getType()) {
throw new IOException("Detected data corruption during transfer");
}
if (!Arrays.equals(localHash, remoteKey.getHash())) {
throw new IOException("Detected data corruption during transfer"); throw new IOException("Detected data corruption during transfer");
} }


return localKey; return remoteKey;
} }
else if (response == RETURN_ERROR) { else if (response == RETURN_ERROR) {
Throwable cause = readExceptionFromStream(is); Throwable cause = readExceptionFromStream(is);
Expand Down
100 changes: 89 additions & 11 deletions flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.blob; package org.apache.flink.runtime.blob;


import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.StringUtils; import org.apache.flink.util.StringUtils;


import java.io.EOFException; import java.io.EOFException;
Expand All @@ -41,7 +42,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
private static final long serialVersionUID = 3847117712521785209L; private static final long serialVersionUID = 3847117712521785209L;


/** Size of the internal BLOB key in bytes. */ /** Size of the internal BLOB key in bytes. */
private static final int SIZE = 20; public static final int SIZE = 20;


/** The byte buffer storing the actual key data. */ /** The byte buffer storing the actual key data. */
private final byte[] key; private final byte[] key;
Expand All @@ -67,6 +68,11 @@ enum BlobType {
TRANSIENT_BLOB TRANSIENT_BLOB
} }


/**
* Random component of the key.
*/
private final AbstractID random;

/** /**
* Constructs a new BLOB key. * Constructs a new BLOB key.
* *
Expand All @@ -76,6 +82,7 @@ enum BlobType {
protected BlobKey(BlobType type) { protected BlobKey(BlobType type) {
this.type = checkNotNull(type); this.type = checkNotNull(type);
this.key = new byte[SIZE]; this.key = new byte[SIZE];
this.random = new AbstractID();
} }


/** /**
Expand All @@ -87,13 +94,33 @@ protected BlobKey(BlobType type) {
* the actual key data * the actual key data
*/ */
protected BlobKey(BlobType type, byte[] key) { protected BlobKey(BlobType type, byte[] key) {
if (key == null || key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
}

this.type = checkNotNull(type); this.type = checkNotNull(type);
this.key = key;
this.random = new AbstractID();
}


/**
* Constructs a new BLOB key from the given byte array.
*
* @param type
* whether the referenced BLOB is permanent or transient
* @param key
* the actual key data
* @param random
* the random component of the key
*/
protected BlobKey(BlobType type, byte[] key, byte[] random) {
if (key == null || key.length != SIZE) { if (key == null || key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes"); throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
} }


this.type = checkNotNull(type);
this.key = key; this.key = key;
this.random = new AbstractID(random);
} }


/** /**
Expand All @@ -107,10 +134,10 @@ protected BlobKey(BlobType type, byte[] key) {
@VisibleForTesting @VisibleForTesting
static BlobKey createKey(BlobType type) { static BlobKey createKey(BlobType type) {
if (type == PERMANENT_BLOB) { if (type == PERMANENT_BLOB) {
return new PermanentBlobKey(); return new PermanentBlobKey();
} else { } else {
return new TransientBlobKey(); return new TransientBlobKey();
} }
} }


/** /**
Expand All @@ -125,10 +152,30 @@ static BlobKey createKey(BlobType type) {
*/ */
static BlobKey createKey(BlobType type, byte[] key) { static BlobKey createKey(BlobType type, byte[] key) {
if (type == PERMANENT_BLOB) { if (type == PERMANENT_BLOB) {
return new PermanentBlobKey(key); return new PermanentBlobKey(key);
} else { } else {
return new TransientBlobKey(key); return new TransientBlobKey(key);
} }
}

/**
* Returns the right {@link BlobKey} subclass for the given parameters.
*
* @param type
* whether the referenced BLOB is permanent or transient
* @param key
* the actual key data
* @param random
* the random component of the key
*
* @return BlobKey subclass
*/
static BlobKey createKey(BlobType type, byte[] key, byte[] random) {
if (type == PERMANENT_BLOB) {
return new PermanentBlobKey(key, random);
} else {
return new TransientBlobKey(key, random);
}
} }


/** /**
Expand All @@ -140,6 +187,15 @@ byte[] getHash() {
return key; return key;
} }


/**
* Returns the (internal) BLOB type which is reflected by the inheriting sub-class.
*
* @return BLOB type, i.e. permanent or transient
*/
BlobType getType() {
return type;
}

/** /**
* Adds the BLOB key to the given {@link MessageDigest}. * Adds the BLOB key to the given {@link MessageDigest}.
* *
Expand All @@ -159,13 +215,16 @@ public boolean equals(final Object obj) {


final BlobKey bk = (BlobKey) obj; final BlobKey bk = (BlobKey) obj;


return Arrays.equals(this.key, bk.key) && this.type == bk.type; return Arrays.equals(this.key, bk.key) &&
this.type == bk.type &&
this.random.equals(bk.random);
} }


@Override @Override
public int hashCode() { public int hashCode() {
int result = Arrays.hashCode(this.key); int result = Arrays.hashCode(this.key);
result = 37 * result + this.type.hashCode(); result = 37 * result + this.type.hashCode();
result = 37 * result + this.random.hashCode();
return result; return result;
} }


Expand All @@ -183,7 +242,7 @@ public String toString() {
// this actually never happens! // this actually never happens!
throw new IllegalStateException("Invalid BLOB type"); throw new IllegalStateException("Invalid BLOB type");
} }
return typeString + StringUtils.byteToHexString(this.key); return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString();
} }


@Override @Override
Expand All @@ -203,7 +262,13 @@ public int compareTo(BlobKey o) {


if (aarr.length == barr.length) { if (aarr.length == barr.length) {
// same hash contents - compare the BLOB types // same hash contents - compare the BLOB types
return this.type.compareTo(o.type); int typeCompare = this.type.compareTo(o.type);
if (typeCompare == 0) {
// same type - compare random components
return this.random.compareTo(o.random);
} else {
return typeCompare;
}
} else { } else {
return aarr.length - barr.length; return aarr.length - barr.length;
} }
Expand All @@ -223,6 +288,7 @@ public int compareTo(BlobKey o) {
static BlobKey readFromInputStream(InputStream inputStream) throws IOException { static BlobKey readFromInputStream(InputStream inputStream) throws IOException {


final byte[] key = new byte[BlobKey.SIZE]; final byte[] key = new byte[BlobKey.SIZE];
final byte[] random = new byte[AbstractID.SIZE];


int bytesRead = 0; int bytesRead = 0;
// read key // read key
Expand All @@ -233,6 +299,7 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
} }
bytesRead += read; bytesRead += read;
} }

// read BLOB type // read BLOB type
final BlobType blobType; final BlobType blobType;
{ {
Expand All @@ -248,7 +315,17 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
} }
} }


return createKey(blobType, key); // read random component
bytesRead = 0;
while (bytesRead < AbstractID.SIZE) {
final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead);
if (read < 0) {
throw new EOFException("Read an incomplete BLOB key");
}
bytesRead += read;
}

return createKey(blobType, key, random);
} }


/** /**
Expand All @@ -262,5 +339,6 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
void writeToOutputStream(final OutputStream outputStream) throws IOException { void writeToOutputStream(final OutputStream outputStream) throws IOException {
outputStream.write(this.key); outputStream.write(this.key);
outputStream.write(this.type.ordinal()); outputStream.write(this.type.ordinal());
outputStream.write(this.random.getBytes());
} }
} }
Expand Up @@ -474,8 +474,13 @@ void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, File localFile) thr
incomingFile = createTemporaryFilename(); incomingFile = createTemporaryFilename();
blobStore.get(jobId, blobKey, incomingFile); blobStore.get(jobId, blobKey, incomingFile);


BlobUtils.moveTempFileToStore( readWriteLock.writeLock().lock();
incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null); try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, LOG, null);
} finally {
readWriteLock.writeLock().unlock();
}


return; return;
} finally { } finally {
Expand Down Expand Up @@ -586,10 +591,8 @@ private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType
md.update(value); md.update(value);
fos.write(value); fos.write(value);


blobKey = BlobKey.createKey(blobType, md.digest());

// persist file // persist file
moveTempFileToStore(incomingFile, jobId, blobKey); blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);


return blobKey; return blobKey;
} finally { } finally {
Expand Down Expand Up @@ -642,10 +645,8 @@ private BlobKey putInputStream(
md.update(buf, 0, bytesRead); md.update(buf, 0, bytesRead);
} }


blobKey = BlobKey.createKey(blobType, md.digest());

// persist file // persist file
moveTempFileToStore(incomingFile, jobId, blobKey); blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);


return blobKey; return blobKey;
} finally { } finally {
Expand All @@ -665,20 +666,53 @@ private BlobKey putInputStream(
* temporary file created during transfer * temporary file created during transfer
* @param jobId * @param jobId
* ID of the job this blob belongs to or <tt>null</tt> if job-unrelated * ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
* @param blobKey * @param digest
* BLOB key identifying the file * BLOB content digest, i.e. hash
* @param blobType
* whether this file is a permanent or transient BLOB
*
* @return unique BLOB key that identifies the BLOB on the server
* *
* @throws IOException * @throws IOException
* thrown if an I/O error occurs while moving the file or uploading it to the HA store * thrown if an I/O error occurs while moving the file or uploading it to the HA store
*/ */
void moveTempFileToStore( BlobKey moveTempFileToStore(
File incomingFile, @Nullable JobID jobId, BlobKey blobKey) throws IOException { File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType)
throws IOException {

int retries = 10;


File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); int attempt = 0;
while (true) {
// add unique component independent of the BLOB content
BlobKey blobKey = BlobKey.createKey(blobType, digest);
File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);


BlobUtils.moveTempFileToStore( // try again until the key is unique (put the existence check into the lock!)
incomingFile, jobId, blobKey, storageFile, readWriteLock.writeLock(), LOG, readWriteLock.writeLock().lock();
blobKey instanceof PermanentBlobKey ? blobStore : null); try {
if (!storageFile.exists()) {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, storageFile, LOG,
blobKey instanceof PermanentBlobKey ? blobStore : null);
return blobKey;
}
} finally {
readWriteLock.writeLock().unlock();
}

++attempt;
if (attempt >= retries) {
String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + ".";
LOG.error(message + " No retries left.");
throw new IOException(message);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})",
jobId, attempt, storageFile.getAbsolutePath());
}
}
}
} }


/** /**
Expand Down

0 comments on commit f853f33

Please sign in to comment.