Skip to content

Commit

Permalink
[FLINK-1578] [BLOB manager] Improve failure handling and add more fai…
Browse files Browse the repository at this point in the history
…lure tests.
  • Loading branch information
StephanEwen committed Feb 18, 2015
1 parent 47fed3d commit cfce493
Show file tree
Hide file tree
Showing 21 changed files with 2,642 additions and 974 deletions.
Expand Up @@ -42,17 +42,6 @@ public final class ConfigConstants {
public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default"; public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";


// -------------------------------- Runtime ------------------------------- // -------------------------------- Runtime -------------------------------

/**
* The config parameter defining the storage directory to be used by the blob server.
*/
public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory";

/**
* The config parameter defining the cleanup interval of the library cache manager.
*/
public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager" +
".cleanup.interval";


/** /**
* The config parameter defining the network address to connect to * The config parameter defining the network address to connect to
Expand All @@ -71,7 +60,32 @@ public final class ConfigConstants {
* marked as failed. * marked as failed.
*/ */
public static final String JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY = "jobmanager.max-heartbeat-delay-before-failure.msecs"; public static final String JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY = "jobmanager.max-heartbeat-delay-before-failure.msecs";


/**
* The config parameter defining the storage directory to be used by the blob server.
*/
public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory";

/**
* The config parameter defining number of retires for failed BLOB fetches.
*/
public static final String BLOB_FETCH_RETRIES_KEY = "blob.fetch.retries";

/**
* The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves.
*/
public static final String BLOB_FETCH_CONCURRENT_KEY = "blob.fetch.num-concurrent";

/**
* The config parameter defining the backlog of BLOB fetches on the JobManager
*/
public static final String BLOB_FETCH_BACKLOG_KEY = "blob.fetch.backlog";

/**
* The config parameter defining the cleanup interval of the library cache manager.
*/
public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";

/** /**
* The config parameter defining the task manager's IPC port from the configuration. * The config parameter defining the task manager's IPC port from the configuration.
*/ */
Expand Down Expand Up @@ -405,7 +419,22 @@ public final class ConfigConstants {
*/ */
// 30 seconds (its enough to get to mars, should be enough to detect failure) // 30 seconds (its enough to get to mars, should be enough to detect failure)
public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30*1000; public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30*1000;


/**
* Default number of retries for failed BLOB fetches.
*/
public static final int DEFAULT_BLOB_FETCH_RETRIES = 5;

/**
* Default number of concurrent BLOB fetch operations.
*/
public static final int DEFAULT_BLOB_FETCH_CONCURRENT = 50;

/**
* Default BLOB fetch connection backlog.
*/
public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000;

/** /**
* The default network port the task manager expects incoming IPC connections. The {@code 0} means that * The default network port the task manager expects incoming IPC connections. The {@code 0} means that
* the TaskManager searches for a free port. * the TaskManager searches for a free port.
Expand Down
Expand Up @@ -16,11 +16,9 @@
* limitations under the License. * limitations under the License.
*/ */



package org.apache.flink.runtime; package org.apache.flink.runtime;


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random; import java.util.Random;


import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.io.IOReadableWritable;
Expand Down Expand Up @@ -103,48 +101,39 @@ public AbstractID() {
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


/**
* Gets the lower 64 bits of the ID.
*
* @return The lower 64 bits of the ID.
*/
public long getLowerPart() { public long getLowerPart() {
return lowerPart; return lowerPart;
} }

public long getUpperPart() {
return upperPart;
}

// --------------------------------------------------------------------------------------------


/** /**
* Converts the given byte array to a long. * Gets the upper 64 bits of the ID.
* *
* @param ba the byte array to be converted * @return The upper 64 bits of the ID.
* @param offset the offset indicating at which byte inside the array the conversion shall begin
* @return the long variable
*/ */
private static long byteArrayToLong(byte[] ba, int offset) { public long getUpperPart() {
long l = 0; return upperPart;

for (int i = 0; i < SIZE_OF_LONG; ++i) {
l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
}

return l;
} }


/** /**
* Converts a long to a byte array. * Gets the bytes underlying this ID.
* *
* @param l the long variable to be converted * @return The bytes underlying this ID.
* @param ba the byte array to store the result the of the conversion
* @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored
*/ */
private static void longToByteArray(final long l, final byte[] ba, final int offset) { public byte[] getBytes() {
for (int i = 0; i < SIZE_OF_LONG; ++i) { byte[] bytes = new byte[SIZE];
final int shift = i << 3; // i * 8 longToByteArray(lowerPart, bytes, 0);
ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift); longToByteArray(upperPart, bytes, SIZE_OF_LONG);
} return bytes;
} }


// --------------------------------------------------------------------------------------------
// Serialization
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


@Override @Override
Expand All @@ -159,16 +148,13 @@ public void write(DataOutputView out) throws IOException {
out.writeLong(this.upperPart); out.writeLong(this.upperPart);
} }


public void write(ByteBuffer buffer) {
buffer.putLong(this.lowerPart);
buffer.putLong(this.upperPart);
}

public void writeTo(ByteBuf buf) { public void writeTo(ByteBuf buf) {
buf.writeLong(this.lowerPart); buf.writeLong(this.lowerPart);
buf.writeLong(this.upperPart); buf.writeLong(this.upperPart);
} }


// --------------------------------------------------------------------------------------------
// Standard Utilities
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


@Override @Override
Expand Down Expand Up @@ -203,4 +189,39 @@ public int compareTo(AbstractID o) {
int diff2 = (this.lowerPart < o.lowerPart) ? -1 : ((this.lowerPart == o.lowerPart) ? 0 : 1); int diff2 = (this.lowerPart < o.lowerPart) ? -1 : ((this.lowerPart == o.lowerPart) ? 0 : 1);
return diff1 == 0 ? diff2 : diff1; return diff1 == 0 ? diff2 : diff1;
} }

// --------------------------------------------------------------------------------------------
// Conversion Utilities
// --------------------------------------------------------------------------------------------

/**
* Converts the given byte array to a long.
*
* @param ba the byte array to be converted
* @param offset the offset indicating at which byte inside the array the conversion shall begin
* @return the long variable
*/
private static long byteArrayToLong(byte[] ba, int offset) {
long l = 0;

for (int i = 0; i < SIZE_OF_LONG; ++i) {
l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
}

return l;
}

/**
* Converts a long to a byte array.
*
* @param l the long variable to be converted
* @param ba the byte array to store the result the of the conversion
* @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored
*/
private static void longToByteArray(long l, byte[] ba, int offset) {
for (int i = 0; i < SIZE_OF_LONG; ++i) {
final int shift = i << 3; // i * 8
ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift);
}
}
} }

0 comments on commit cfce493

Please sign in to comment.