Skip to content

Commit

Permalink
[SPARK-21369][CORE] Don't use Scala Tuple2 in common/network-*
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Remove all usages of Scala Tuple2 from common/network-* projects. Otherwise, Yarn users cannot use `spark.reducer.maxReqSizeShuffleToMem`.

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#18593 from zsxwing/SPARK-21369.
  • Loading branch information
zsxwing authored and cloud-fan committed Jul 11, 2017
1 parent 1471ee7 commit 833eab2
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 23 deletions.
3 changes: 2 additions & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
<scope>test</scope>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

import scala.Tuple2;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.Channel;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,7 +58,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {

private final Map<Long, RpcResponseCallback> outstandingRpcs;

private final Queue<Tuple2<String, StreamCallback>> streamCallbacks;
private final Queue<Pair<String, StreamCallback>> streamCallbacks;
private volatile boolean streamActive;

/** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */
Expand Down Expand Up @@ -92,7 +92,7 @@ public void removeRpcRequest(long requestId) {

public void addStreamCallback(String streamId, StreamCallback callback) {
timeOfLastRequestNs.set(System.nanoTime());
streamCallbacks.offer(new Tuple2<>(streamId, callback));
streamCallbacks.offer(ImmutablePair.of(streamId, callback));
}

@VisibleForTesting
Expand All @@ -119,9 +119,9 @@ private void failOutstandingRequests(Throwable cause) {
logger.warn("RpcResponseCallback.onFailure throws exception", e);
}
}
for (Tuple2<String, StreamCallback> entry : streamCallbacks) {
for (Pair<String, StreamCallback> entry : streamCallbacks) {
try {
entry._2().onFailure(entry._1(), cause);
entry.getValue().onFailure(entry.getKey(), cause);
} catch (Exception e) {
logger.warn("StreamCallback.onFailure throws exception", e);
}
Expand Down Expand Up @@ -208,9 +208,9 @@ public void handle(ResponseMessage message) throws Exception {
}
} else if (message instanceof StreamResponse) {
StreamResponse resp = (StreamResponse) message;
Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
Pair<String, StreamCallback> entry = streamCallbacks.poll();
if (entry != null) {
StreamCallback callback = entry._2();
StreamCallback callback = entry.getValue();
if (resp.byteCount > 0) {
StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
callback);
Expand All @@ -235,9 +235,9 @@ public void handle(ResponseMessage message) throws Exception {
}
} else if (message instanceof StreamFailure) {
StreamFailure resp = (StreamFailure) message;
Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
Pair<String, StreamCallback> entry = streamCallbacks.poll();
if (entry != null) {
StreamCallback callback = entry._2();
StreamCallback callback = entry.getValue();
try {
callback.onFailure(resp.streamId, new RuntimeException(resp.error));
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import scala.Tuple2;

import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import org.slf4j.Logger;
Expand Down Expand Up @@ -98,21 +96,16 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) {

@Override
public ManagedBuffer openStream(String streamChunkId) {
Tuple2<Long, Integer> streamIdAndChunkId = parseStreamChunkId(streamChunkId);
return getChunk(streamIdAndChunkId._1, streamIdAndChunkId._2);
}

public static String genStreamChunkId(long streamId, int chunkId) {
return String.format("%d_%d", streamId, chunkId);
}

public static Tuple2<Long, Integer> parseStreamChunkId(String streamChunkId) {
String[] array = streamChunkId.split("_");
assert array.length == 2:
"Stream id and chunk index should be specified when open stream for fetching block.";
long streamId = Long.valueOf(array[0]);
int chunkIndex = Integer.valueOf(array[1]);
return new Tuple2<>(streamId, chunkIndex);
return getChunk(streamId, chunkIndex);
}

public static String genStreamChunkId(long streamId, int chunkId) {
return String.format("%d_%d", streamId, chunkId);
}

@Override
Expand Down
1 change: 1 addition & 0 deletions common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<!--
Expand Down
1 change: 1 addition & 0 deletions common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<!--
Expand Down

0 comments on commit 833eab2

Please sign in to comment.