Skip to content

Commit

Permalink
[FLINK-1349] [runtime] Various fixes concerning Akka
Browse files Browse the repository at this point in the history
 - Remove obsolete code from old IPC net utils
 - Smaller Writable/IOReadableWritable serialzation buffer start size (most messages are rather small)
 - For message logging, make system calls (timestamps) only in debug mode
 - Clean up warnings / code simplifications
  • Loading branch information
StephanEwen committed Jan 6, 2015
1 parent 24c4736 commit 972a7b0
Show file tree
Hide file tree
Showing 33 changed files with 372 additions and 1,476 deletions.
Expand Up @@ -121,7 +121,7 @@ private static void addPathToConfig(Configuration conf, File path) {
// chain-in a new classloader
URL fileUrl = null;
try {
fileUrl = path.toURL();
fileUrl = path.toURI().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException("Erroneous config file path", e);
}
Expand Down
Expand Up @@ -295,7 +295,7 @@ public final class ConfigConstants {
public static final String AKKA_STARTUP_TIMEOUT = "akka.startup-timeout";

/**
* Hearbeat interval of the transport failure detector
* Heartbeat interval of the transport failure detector
*/
public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "akka.transport.heartbeat.interval";

Expand Down Expand Up @@ -465,11 +465,6 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = -1;

/**
* The default interval for TaskManager heart beats (5000 msecs).
*/
public static final int DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL = 5000;

/**
* Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
*/
Expand Down Expand Up @@ -559,8 +554,8 @@ public final class ConfigConstants {
/**
* The default directory to store temporary objects (e.g. during file uploads).
*/
public static final String DEFAULT_WEB_TMP_DIR = System.getProperty("java.io.tmpdir") == null ? "/tmp" : System
.getProperty("java.io.tmpdir");
public static final String DEFAULT_WEB_TMP_DIR =
System.getProperty("java.io.tmpdir") == null ? "/tmp" : System.getProperty("java.io.tmpdir");

/**
* The default directory for temporary plan dumps from the web frontend.
Expand Down
23 changes: 2 additions & 21 deletions flink-core/src/main/java/org/apache/flink/core/fs/Path.java
Expand Up @@ -16,16 +16,12 @@
* limitations under the License.
*/


/**
* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
/* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/
* additional information regarding copyright ownership. */

package org.apache.flink.core.fs;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
Expand Down Expand Up @@ -501,19 +497,4 @@ public void write(DataOutputView out) throws IOException {
}

}

public static String constructTestPath(Class<?> forClass, String folder) {
// we create test path that depends on class to prevent name clashes when two tests
// create temp files with the same name
String path = System.getProperty("java.io.tmpdir");
if (!(path.endsWith("/") || path.endsWith("\\")) ) {
path += System.getProperty("file.separator");
}
path += (forClass.getName() + "-" + folder);
return path;
}

public static String constructTestURI(Class<?> forClass, String folder) {
return new File(constructTestPath(forClass, folder)).toURI().toString();
}
}
2 changes: 1 addition & 1 deletion flink-dist/pom.xml
Expand Up @@ -210,7 +210,7 @@ under the License.
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<version>2.4</version><!--$NO-MVN-MAN-VER$-->
<executions>
<!-- yarn bin directory -->
<execution>
Expand Down
1 change: 0 additions & 1 deletion flink-runtime/pom.xml
Expand Up @@ -130,7 +130,6 @@ under the License.
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_2.10</artifactId>
<version>3.2.0</version>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
Expand Down
Expand Up @@ -39,6 +39,8 @@
*/
public class AccumulatorEvent implements Serializable {

private static final long serialVersionUID = 8965894516006882735L;

private JobID jobID;

private Map<String, Accumulator<?, ?>> accumulators;
Expand Down
Expand Up @@ -32,19 +32,16 @@
*/
public final class BlobKey implements Serializable, Comparable<BlobKey> {

/**
* Array of hex characters to facilitate fast toString() method.
*/
private static final long serialVersionUID = 3847117712521785209L;

/** Array of hex characters to facilitate fast toString() method. */
private static final char[] HEX_ARRAY = "0123456789abcdef".toCharArray();

/**
* Size of the internal BLOB key in bytes.
*/
/** Size of the internal BLOB key in bytes. */
private static final int SIZE = 20;

/**
* The byte buffer storing the actual key data.
*/

/** The byte buffer storing the actual key data. */
private final byte[] key;

/**
Expand All @@ -60,8 +57,7 @@ public BlobKey() {
* @param key
* the actual key data
*/
BlobKey(final byte[] key) {

BlobKey(byte[] key) {
if (key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
}
Expand All @@ -70,8 +66,15 @@ public BlobKey() {
}

/**
* {@inheritDoc}
* Adds the BLOB key to the given {@link MessageDigest}.
*
* @param md
* the message digest to add the BLOB key to
*/
public void addToMessageDigest(MessageDigest md) {
md.update(this.key);
}

@Override
public boolean equals(final Object obj) {

Expand All @@ -84,17 +87,11 @@ public boolean equals(final Object obj) {
return Arrays.equals(this.key, bk.key);
}

/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return Arrays.hashCode(this.key);
}

/**
* {@inheritDoc}
*/
@Override
public String toString() {
// from http://stackoverflow.com/questions/9655181/convert-from-byte-array-to-hex-string-in-java
Expand All @@ -108,6 +105,26 @@ public String toString() {
return new String(hexChars);
}

@Override
public int compareTo(BlobKey o) {

final byte[] aarr = this.key;
final byte[] barr = o.key;
final int len = Math.min(aarr.length, barr.length);

for (int i = 0; i < len; ++i) {
final int a = (aarr[i] & 0xff);
final int b = (barr[i] & 0xff);
if (a != b) {
return a - b;
}
}

return aarr.length - barr.length;
}

// --------------------------------------------------------------------------------------------

/**
* Auxiliary method to read a BLOB key from an input stream.
*
Expand All @@ -117,7 +134,7 @@ public String toString() {
* @throws IOException
* throw if an I/O error occurs while reading from the input stream
*/
static BlobKey readFromInputStream(final InputStream inputStream) throws IOException {
static BlobKey readFromInputStream(InputStream inputStream) throws IOException {

final byte[] key = new byte[BlobKey.SIZE];

Expand All @@ -142,39 +159,6 @@ static BlobKey readFromInputStream(final InputStream inputStream) throws IOExcep
* thrown if an I/O error occurs while writing the BLOB key
*/
void writeToOutputStream(final OutputStream outputStream) throws IOException {

outputStream.write(this.key);
}

/**
* {@inheritDoc}
*/
@Override
public int compareTo(final BlobKey o) {

final byte[] aarr = this.key;
final byte[] barr = o.key;
final int len = Math.min(aarr.length, barr.length);

for (int i = 0; i < len; ++i) {
final int a = (aarr[i] & 0xff);
final int b = (barr[i] & 0xff);
if (a != b) {
return a - b;
}
}

return aarr.length - barr.length;
}

/**
* Adds the BLOB key to the given {@link MessageDigest}.
*
* @param md
* the message digest to add the BLOB key to
*/
public void addToMessageDigest(final MessageDigest md) {

md.update(this.key);
}
}
Expand Up @@ -29,6 +29,8 @@
*/
public final class ChannelDeploymentDescriptor implements Serializable {

private static final long serialVersionUID = -4079084629425460213L;

/** The ID of the output channel. */
private final ChannelID outputChannelID;

Expand Down
Expand Up @@ -29,6 +29,8 @@
*/
public final class GateDeploymentDescriptor implements Serializable {

private static final long serialVersionUID = -8433936680266802364L;

/** The list of channel deployment descriptors attached to this gate. */
private final List<ChannelDeploymentDescriptor> channels;

Expand Down
Expand Up @@ -34,6 +34,8 @@
*/
public final class TaskDeploymentDescriptor implements Serializable {

private static final long serialVersionUID = -3233562176034358530L;

/** The ID of the job the tasks belongs to. */
private final JobID jobID;

Expand Down Expand Up @@ -192,6 +194,15 @@ public int getIndexInSubtaskGroup() {
public int getCurrentNumberOfSubtasks() {
return this.currentNumberOfSubtasks;
}

/**
* Gets the number of the slot into which the task is to be deployed.
*
* @return The number of the target slot.
*/
public int getTargetSlotNumber() {
return targetSlotNumber;
}

/**
* Returns the configuration of the job the task belongs to.
Expand Down
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.runtime.fs.hdfs;

import org.apache.flink.core.fs.FileStatus;
Expand All @@ -25,7 +24,6 @@
/**
* Concrete implementation of the {@link FileStatus} interface for the
* Hadoop Distribution File System.
*
*/
public final class DistributedFileStatus implements FileStatus {

Expand All @@ -43,13 +41,11 @@ public DistributedFileStatus(org.apache.hadoop.fs.FileStatus fileStatus) {

@Override
public long getLen() {

return fileStatus.getLen();
}

@Override
public long getBlockSize() {

long blocksize = fileStatus.getBlockSize();
if (blocksize > fileStatus.getLen()) {
return fileStatus.getLen();
Expand All @@ -60,19 +56,16 @@ public long getBlockSize() {

@Override
public long getAccessTime() {

return fileStatus.getAccessTime();
}

@Override
public long getModificationTime() {

return fileStatus.getModificationTime();
}

@Override
public short getReplication() {

return fileStatus.getReplication();
}

Expand All @@ -82,13 +75,12 @@ public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {

@Override
public Path getPath() {

return new Path(fileStatus.getPath().toString());
}

@SuppressWarnings("deprecation")
@Override
public boolean isDir() {

return fileStatus.isDir();
}
}
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.runtime.io.network;

import java.io.Serializable;
Expand All @@ -25,6 +24,9 @@

public class ConnectionInfoLookupResponse implements Serializable {

private static final long serialVersionUID = 3961171754642077522L;


private enum ReturnCode {
NOT_FOUND, FOUND_AND_RECEIVER_READY, FOUND_BUT_RECEIVER_NOT_READY, JOB_IS_ABORTING
};
Expand Down
Expand Up @@ -32,6 +32,8 @@
*/
public final class RemoteReceiver implements IOReadableWritable, Serializable {

private static final long serialVersionUID = 4304924747853162443L;

/**
* The address of the connection to the remote TaskManager.
*/
Expand Down

0 comments on commit 972a7b0

Please sign in to comment.