Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,13 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO
try {
if (blobView.get(jobId, blobKey, incomingFile)) {
// now move the temp file to our local cache atomically
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
readWriteLock.writeLock().lock();
try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, log, null);
} finally {
readWriteLock.writeLock().unlock();
}

return localFile;
}
Expand All @@ -172,8 +177,13 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO
BlobClient.downloadFromBlobServer(
jobId, blobKey, incomingFile, serverAddress, blobClientConfig, numFetchRetries);

BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
readWriteLock.writeLock().lock();
try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, log, null);
} finally {
readWriteLock.writeLock().unlock();
}

return localFile;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,18 @@
import java.net.Socket;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT;
import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobUtils.readFully;
import static org.apache.flink.runtime.blob.BlobUtils.readLength;
import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
Expand Down Expand Up @@ -496,13 +497,16 @@ private static BlobKey receiveAndCheckPutResponse(
else if (response == RETURN_OKAY) {

BlobKey remoteKey = BlobKey.readFromInputStream(is);
BlobKey localKey = BlobKey.createKey(blobType, md.digest());
byte[] localHash = md.digest();

if (!localKey.equals(remoteKey)) {
if (blobType != remoteKey.getType()) {
throw new IOException("Detected data corruption during transfer");
}
if (!Arrays.equals(localHash, remoteKey.getHash())) {
throw new IOException("Detected data corruption during transfer");
}

return localKey;
return remoteKey;
}
else if (response == RETURN_ERROR) {
Throwable cause = readExceptionFromStream(is);
Expand Down
100 changes: 89 additions & 11 deletions flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.blob;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.StringUtils;

import java.io.EOFException;
Expand All @@ -41,7 +42,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
private static final long serialVersionUID = 3847117712521785209L;

/** Size of the internal BLOB key in bytes. */
private static final int SIZE = 20;
public static final int SIZE = 20;

/** The byte buffer storing the actual key data. */
private final byte[] key;
Expand All @@ -67,6 +68,11 @@ enum BlobType {
TRANSIENT_BLOB
}

/**
* Random component of the key.
*/
private final AbstractID random;

/**
* Constructs a new BLOB key.
*
Expand All @@ -76,6 +82,7 @@ enum BlobType {
protected BlobKey(BlobType type) {
this.type = checkNotNull(type);
this.key = new byte[SIZE];
this.random = new AbstractID();
}

/**
Expand All @@ -87,13 +94,33 @@ protected BlobKey(BlobType type) {
* the actual key data
*/
protected BlobKey(BlobType type, byte[] key) {
if (key == null || key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
}

this.type = checkNotNull(type);
this.key = key;
this.random = new AbstractID();
}

/**
* Constructs a new BLOB key from the given byte array.
*
* @param type
* whether the referenced BLOB is permanent or transient
* @param key
* the actual key data
* @param random
* the random component of the key
*/
protected BlobKey(BlobType type, byte[] key, byte[] random) {
if (key == null || key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
}

this.type = checkNotNull(type);
this.key = key;
this.random = new AbstractID(random);
}

/**
Expand All @@ -107,10 +134,10 @@ protected BlobKey(BlobType type, byte[] key) {
@VisibleForTesting
static BlobKey createKey(BlobType type) {
if (type == PERMANENT_BLOB) {
return new PermanentBlobKey();
} else {
return new PermanentBlobKey();
} else {
return new TransientBlobKey();
}
}
}

/**
Expand All @@ -125,10 +152,30 @@ static BlobKey createKey(BlobType type) {
*/
static BlobKey createKey(BlobType type, byte[] key) {
if (type == PERMANENT_BLOB) {
return new PermanentBlobKey(key);
} else {
return new PermanentBlobKey(key);
} else {
return new TransientBlobKey(key);
}
}
}

/**
* Returns the right {@link BlobKey} subclass for the given parameters.
*
* @param type
* whether the referenced BLOB is permanent or transient
* @param key
* the actual key data
* @param random
* the random component of the key
*
* @return BlobKey subclass
*/
static BlobKey createKey(BlobType type, byte[] key, byte[] random) {
if (type == PERMANENT_BLOB) {
return new PermanentBlobKey(key, random);
} else {
return new TransientBlobKey(key, random);
}
}

/**
Expand All @@ -140,6 +187,15 @@ byte[] getHash() {
return key;
}

/**
* Returns the (internal) BLOB type which is reflected by the inheriting sub-class.
*
* @return BLOB type, i.e. permanent or transient
*/
BlobType getType() {
return type;
}

/**
* Adds the BLOB key to the given {@link MessageDigest}.
*
Expand All @@ -159,13 +215,16 @@ public boolean equals(final Object obj) {

final BlobKey bk = (BlobKey) obj;

return Arrays.equals(this.key, bk.key) && this.type == bk.type;
return Arrays.equals(this.key, bk.key) &&
this.type == bk.type &&
this.random.equals(bk.random);
}

@Override
public int hashCode() {
int result = Arrays.hashCode(this.key);
result = 37 * result + this.type.hashCode();
result = 37 * result + this.random.hashCode();
return result;
}

Expand All @@ -183,7 +242,7 @@ public String toString() {
// this actually never happens!
throw new IllegalStateException("Invalid BLOB type");
}
return typeString + StringUtils.byteToHexString(this.key);
return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString();
}

@Override
Expand All @@ -203,7 +262,13 @@ public int compareTo(BlobKey o) {

if (aarr.length == barr.length) {
// same hash contents - compare the BLOB types
return this.type.compareTo(o.type);
int typeCompare = this.type.compareTo(o.type);
if (typeCompare == 0) {
// same type - compare random components
return this.random.compareTo(o.random);
} else {
return typeCompare;
}
} else {
return aarr.length - barr.length;
}
Expand All @@ -223,6 +288,7 @@ public int compareTo(BlobKey o) {
static BlobKey readFromInputStream(InputStream inputStream) throws IOException {

final byte[] key = new byte[BlobKey.SIZE];
final byte[] random = new byte[AbstractID.SIZE];

int bytesRead = 0;
// read key
Expand All @@ -233,6 +299,7 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
}
bytesRead += read;
}

// read BLOB type
final BlobType blobType;
{
Expand All @@ -248,7 +315,17 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
}
}

return createKey(blobType, key);
// read random component
bytesRead = 0;
while (bytesRead < AbstractID.SIZE) {
final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead);
if (read < 0) {
throw new EOFException("Read an incomplete BLOB key");
}
bytesRead += read;
}

return createKey(blobType, key, random);
}

/**
Expand All @@ -262,5 +339,6 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
void writeToOutputStream(final OutputStream outputStream) throws IOException {
outputStream.write(this.key);
outputStream.write(this.type.ordinal());
outputStream.write(this.random.getBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,13 @@ void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, File localFile) thr
incomingFile = createTemporaryFilename();
blobStore.get(jobId, blobKey, incomingFile);

BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
readWriteLock.writeLock().lock();
try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, LOG, null);
} finally {
readWriteLock.writeLock().unlock();
}

return;
} finally {
Expand Down Expand Up @@ -586,10 +591,8 @@ private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType
md.update(value);
fos.write(value);

blobKey = BlobKey.createKey(blobType, md.digest());

// persist file
moveTempFileToStore(incomingFile, jobId, blobKey);
blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);

return blobKey;
} finally {
Expand Down Expand Up @@ -642,10 +645,8 @@ private BlobKey putInputStream(
md.update(buf, 0, bytesRead);
}

blobKey = BlobKey.createKey(blobType, md.digest());

// persist file
moveTempFileToStore(incomingFile, jobId, blobKey);
blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);

return blobKey;
} finally {
Expand All @@ -665,20 +666,53 @@ private BlobKey putInputStream(
* temporary file created during transfer
* @param jobId
* ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
* @param blobKey
* BLOB key identifying the file
* @param digest
* BLOB content digest, i.e. hash
* @param blobType
* whether this file is a permanent or transient BLOB
*
* @return unique BLOB key that identifies the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while moving the file or uploading it to the HA store
*/
void moveTempFileToStore(
File incomingFile, @Nullable JobID jobId, BlobKey blobKey) throws IOException {
BlobKey moveTempFileToStore(
File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType)
throws IOException {

int retries = 10;

File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
int attempt = 0;
while (true) {
// add unique component independent of the BLOB content
BlobKey blobKey = BlobKey.createKey(blobType, digest);
File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);

BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, storageFile, readWriteLock.writeLock(), LOG,
blobKey instanceof PermanentBlobKey ? blobStore : null);
// try again until the key is unique (put the existence check into the lock!)
readWriteLock.writeLock().lock();
try {
if (!storageFile.exists()) {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, storageFile, LOG,
blobKey instanceof PermanentBlobKey ? blobStore : null);
return blobKey;
}
} finally {
readWriteLock.writeLock().unlock();
}

++attempt;
if (attempt >= retries) {
String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + ".";
LOG.error(message + " No retries left.");
throw new IOException(message);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})",
jobId, attempt, storageFile.getAbsolutePath());
}
}
}
}

/**
Expand Down
Loading