Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,14 @@ will be used under the directory specified by jobmanager.web.tmpdir.

- `blob.storage.directory`: Directory for storing blobs (such as user JARs) on the TaskManagers.

- `blob.service.cleanup.interval`: Cleanup interval (in seconds) of the blob caches (DEFAULT: 1 hour).
Whenever a job is not referenced at the cache anymore, we set a TTL and let the periodic cleanup task
(executed every `blob.service.cleanup.interval` seconds) remove its blob files after this TTL has passed.
- `blob.service.cleanup.interval`: Cleanup interval (in seconds) of transient blobs at server and caches as well as permanent blobs at the caches (DEFAULT: 1 hour).
Whenever a job is not referenced at the cache anymore, we set a TTL for its permanent blob files and
let the periodic cleanup task (executed every `blob.service.cleanup.interval` seconds) remove them
after this TTL has passed. We do the same for transient blob files at both server and caches but
immediately after accessing them, i.e. an put or get operation.
This means that a blob will be retained at most <tt>2 * `blob.service.cleanup.interval`</tt> seconds after
not being referenced anymore. Therefore, a recovery still has the chance to use existing files rather
than to download them again.
not being referenced anymore (permanent blobs) or their last access (transient blobs). For permanent blobs,
this means that a recovery still has the chance to use existing files rather downloading them again.

- `blob.server.port`: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,13 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO
try {
if (blobView.get(jobId, blobKey, incomingFile)) {
// now move the temp file to our local cache atomically
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
readWriteLock.writeLock().lock();
try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, log, null);
} finally {
readWriteLock.writeLock().unlock();
}

return localFile;
}
Expand All @@ -172,8 +177,13 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO
BlobClient.downloadFromBlobServer(
jobId, blobKey, incomingFile, serverAddress, blobClientConfig, numFetchRetries);

BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
readWriteLock.writeLock().lock();
try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, log, null);
} finally {
readWriteLock.writeLock().unlock();
}

return localFile;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,18 @@
import java.net.Socket;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT;
import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobUtils.readFully;
import static org.apache.flink.runtime.blob.BlobUtils.readLength;
import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
Expand Down Expand Up @@ -496,13 +497,16 @@ private static BlobKey receiveAndCheckPutResponse(
else if (response == RETURN_OKAY) {

BlobKey remoteKey = BlobKey.readFromInputStream(is);
BlobKey localKey = BlobKey.createKey(blobType, md.digest());
byte[] localHash = md.digest();

if (!localKey.equals(remoteKey)) {
if (blobType != remoteKey.getType()) {
throw new IOException("Detected data corruption during transfer");
}
if (!Arrays.equals(localHash, remoteKey.getHash())) {
throw new IOException("Detected data corruption during transfer");
}

return localKey;
return remoteKey;
}
else if (response == RETURN_ERROR) {
Throwable cause = readExceptionFromStream(is);
Expand Down
100 changes: 89 additions & 11 deletions flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.blob;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.StringUtils;

import java.io.EOFException;
Expand All @@ -41,7 +42,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
private static final long serialVersionUID = 3847117712521785209L;

/** Size of the internal BLOB key in bytes. */
private static final int SIZE = 20;
public static final int SIZE = 20;

/** The byte buffer storing the actual key data. */
private final byte[] key;
Expand All @@ -67,6 +68,11 @@ enum BlobType {
TRANSIENT_BLOB
}

/**
* Random component of the key.
*/
private final AbstractID random;

/**
* Constructs a new BLOB key.
*
Expand All @@ -76,6 +82,7 @@ enum BlobType {
protected BlobKey(BlobType type) {
this.type = checkNotNull(type);
this.key = new byte[SIZE];
this.random = new AbstractID();
}

/**
Expand All @@ -87,13 +94,33 @@ protected BlobKey(BlobType type) {
* the actual key data
*/
protected BlobKey(BlobType type, byte[] key) {
if (key == null || key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
}

this.type = checkNotNull(type);
this.key = key;
this.random = new AbstractID();
}

/**
* Constructs a new BLOB key from the given byte array.
*
* @param type
* whether the referenced BLOB is permanent or transient
* @param key
* the actual key data
* @param random
* the random component of the key
*/
protected BlobKey(BlobType type, byte[] key, byte[] random) {
if (key == null || key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
}

this.type = checkNotNull(type);
this.key = key;
this.random = new AbstractID(random);
}

/**
Expand All @@ -107,10 +134,10 @@ protected BlobKey(BlobType type, byte[] key) {
@VisibleForTesting
static BlobKey createKey(BlobType type) {
if (type == PERMANENT_BLOB) {
return new PermanentBlobKey();
} else {
return new PermanentBlobKey();
} else {
return new TransientBlobKey();
}
}
}

/**
Expand All @@ -125,10 +152,30 @@ static BlobKey createKey(BlobType type) {
*/
static BlobKey createKey(BlobType type, byte[] key) {
if (type == PERMANENT_BLOB) {
return new PermanentBlobKey(key);
} else {
return new PermanentBlobKey(key);
} else {
return new TransientBlobKey(key);
}
}
}

/**
* Returns the right {@link BlobKey} subclass for the given parameters.
*
* @param type
* whether the referenced BLOB is permanent or transient
* @param key
* the actual key data
* @param random
* the random component of the key
*
* @return BlobKey subclass
*/
static BlobKey createKey(BlobType type, byte[] key, byte[] random) {
if (type == PERMANENT_BLOB) {
return new PermanentBlobKey(key, random);
} else {
return new TransientBlobKey(key, random);
}
}

/**
Expand All @@ -140,6 +187,15 @@ byte[] getHash() {
return key;
}

/**
* Returns the (internal) BLOB type which is reflected by the inheriting sub-class.
*
* @return BLOB type, i.e. permanent or transient
*/
BlobType getType() {
return type;
}

/**
* Adds the BLOB key to the given {@link MessageDigest}.
*
Expand All @@ -159,13 +215,16 @@ public boolean equals(final Object obj) {

final BlobKey bk = (BlobKey) obj;

return Arrays.equals(this.key, bk.key) && this.type == bk.type;
return Arrays.equals(this.key, bk.key) &&
this.type == bk.type &&
this.random.equals(bk.random);
}

@Override
public int hashCode() {
int result = Arrays.hashCode(this.key);
result = 37 * result + this.type.hashCode();
result = 37 * result + this.random.hashCode();
return result;
}

Expand All @@ -183,7 +242,7 @@ public String toString() {
// this actually never happens!
throw new IllegalStateException("Invalid BLOB type");
}
return typeString + StringUtils.byteToHexString(this.key);
return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString();
}

@Override
Expand All @@ -203,7 +262,13 @@ public int compareTo(BlobKey o) {

if (aarr.length == barr.length) {
// same hash contents - compare the BLOB types
return this.type.compareTo(o.type);
int typeCompare = this.type.compareTo(o.type);
if (typeCompare == 0) {
// same type - compare random components
return this.random.compareTo(o.random);
} else {
return typeCompare;
}
} else {
return aarr.length - barr.length;
}
Expand All @@ -223,6 +288,7 @@ public int compareTo(BlobKey o) {
static BlobKey readFromInputStream(InputStream inputStream) throws IOException {

final byte[] key = new byte[BlobKey.SIZE];
final byte[] random = new byte[AbstractID.SIZE];

int bytesRead = 0;
// read key
Expand All @@ -233,6 +299,7 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
}
bytesRead += read;
}

// read BLOB type
final BlobType blobType;
{
Expand All @@ -248,7 +315,17 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
}
}

return createKey(blobType, key);
// read random component
bytesRead = 0;
while (bytesRead < AbstractID.SIZE) {
final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead);
if (read < 0) {
throw new EOFException("Read an incomplete BLOB key");
}
bytesRead += read;
}

return createKey(blobType, key, random);
}

/**
Expand All @@ -262,5 +339,6 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
void writeToOutputStream(final OutputStream outputStream) throws IOException {
outputStream.write(this.key);
outputStream.write(this.type.ordinal());
outputStream.write(this.random.getBytes());
}
}
Loading