Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13823][SPARK-13397][SPARK-13395] [CORE] More warnings, StandardCharset follow up #11725

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -93,7 +94,7 @@ public void receive(
ByteBuffer message,
RpcResponseCallback callback) {
try {
semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS);
semaphore.acquire();
callback.onSuccess(ByteBuffer.allocate(responseSize));
} catch (InterruptedException e) {
// do nothing
Expand All @@ -113,20 +114,17 @@ public StreamManager getStreamManager() {

// First completes quickly (semaphore starts at 1).
TestCallback callback0 = new TestCallback();
synchronized (callback0) {
client.sendRpc(ByteBuffer.allocate(0), callback0);
callback0.wait(FOREVER);
assertEquals(responseSize, callback0.successLength);
}
client.sendRpc(ByteBuffer.allocate(0), callback0);
callback0.latch.await();
assertEquals(responseSize, callback0.successLength);

// Second times out after 2 seconds, with slack. Must be IOException.
TestCallback callback1 = new TestCallback();
synchronized (callback1) {
client.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.wait(4 * 1000);
assertNotNull(callback1.failure);
assertTrue(callback1.failure instanceof IOException);
}
client.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.latch.await(4, TimeUnit.SECONDS);
assertNotNull(callback1.failure);
assertTrue(callback1.failure instanceof IOException);

semaphore.release();
}

Expand All @@ -143,7 +141,7 @@ public void receive(
ByteBuffer message,
RpcResponseCallback callback) {
try {
semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS);
semaphore.acquire();
callback.onSuccess(ByteBuffer.allocate(responseSize));
} catch (InterruptedException e) {
// do nothing
Expand All @@ -164,24 +162,20 @@ public StreamManager getStreamManager() {
TransportClient client0 =
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
TestCallback callback0 = new TestCallback();
synchronized (callback0) {
client0.sendRpc(ByteBuffer.allocate(0), callback0);
callback0.wait(FOREVER);
assertTrue(callback0.failure instanceof IOException);
assertFalse(client0.isActive());
}
client0.sendRpc(ByteBuffer.allocate(0), callback0);
callback0.latch.await();
assertTrue(callback0.failure instanceof IOException);
assertFalse(client0.isActive());

// Increment the semaphore and the second request should succeed quickly.
semaphore.release(2);
TransportClient client1 =
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
TestCallback callback1 = new TestCallback();
synchronized (callback1) {
client1.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.wait(FOREVER);
assertEquals(responseSize, callback1.successLength);
assertNull(callback1.failure);
}
client1.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.latch.await();
assertEquals(responseSize, callback1.successLength);
assertNull(callback1.failure);
}

// The timeout is relative to the LAST request sent, which is kinda weird, but still.
Expand Down Expand Up @@ -226,18 +220,14 @@ public StreamManager getStreamManager() {
client.fetchChunk(0, 1, callback1);
Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);

synchronized (callback0) {
// not complete yet, but should complete soon
assertEquals(-1, callback0.successLength);
assertNull(callback0.failure);
callback0.wait(2 * 1000);
assertTrue(callback0.failure instanceof IOException);
}
// not complete yet, but should complete soon
assertEquals(-1, callback0.successLength);
assertNull(callback0.failure);
callback0.latch.await(2, TimeUnit.SECONDS);
assertTrue(callback0.failure instanceof IOException);

synchronized (callback1) {
// failed at same time as previous
assertTrue(callback0.failure instanceof IOException);
}
// failed at same time as previous
assertTrue(callback1.failure instanceof IOException);
}

/**
Expand All @@ -248,41 +238,35 @@ static class TestCallback implements RpcResponseCallback, ChunkReceivedCallback

int successLength = -1;
Throwable failure;
final CountDownLatch latch = new CountDownLatch(1);

@Override
public void onSuccess(ByteBuffer response) {
synchronized(this) {
successLength = response.remaining();
this.notifyAll();
}
successLength = response.remaining();
latch.countDown();
}

@Override
public void onFailure(Throwable e) {
synchronized(this) {
failure = e;
this.notifyAll();
}
failure = e;
latch.countDown();
}

@Override
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
synchronized(this) {
try {
successLength = buffer.nioByteBuffer().remaining();
this.notifyAll();
} catch (IOException e) {
// weird
}
try {
successLength = buffer.nioByteBuffer().remaining();
} catch (IOException e) {
// weird
} finally {
latch.countDown();
}
}

@Override
public void onFailure(int chunkIndex, Throwable e) {
synchronized(this) {
failure = e;
this.notifyAll();
}
failure = e;
latch.countDown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -276,25 +277,21 @@ public ManagedBuffer answer(InvocationOnMock invocation) {

ctx = new SaslTestCtx(rpcHandler, true, false);

final Object lock = new Object();
final CountDownLatch lock = new CountDownLatch(1);

ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
response.set((ManagedBuffer) invocation.getArguments()[1]);
response.get().retain();
synchronized (lock) {
lock.notifyAll();
}
lock.countDown();
return null;
}
}).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class));

synchronized (lock) {
ctx.client.fetchChunk(0, 0, callback);
lock.wait(10 * 1000);
}
ctx.client.fetchChunk(0, 0, callback);
lock.await(10, TimeUnit.SECONDS);

verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class));
verify(callback, never()).onFailure(anyInt(), any(Throwable.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -197,26 +198,23 @@ public void testAppIsolation() throws Exception {

final AtomicReference<Throwable> exception = new AtomicReference<>();

final CountDownLatch blockFetchLatch = new CountDownLatch(1);
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
notifyAll();
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
blockFetchLatch.countDown();
}

@Override
public synchronized void onBlockFetchFailure(String blockId, Throwable t) {
public void onBlockFetchFailure(String blockId, Throwable t) {
exception.set(t);
notifyAll();
blockFetchLatch.countDown();
}
};

String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" };
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0",
blockIds, listener);
synchronized (listener) {
fetcher.start();
listener.wait();
}
String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener);
fetcher.start();
blockFetchLatch.await();
checkSecurityException(exception.get());

// Register an executor so that the next steps work.
Expand All @@ -240,24 +238,22 @@ public synchronized void onBlockFetchFailure(String blockId, Throwable t) {
client2 = clientFactory2.createClient(TestUtils.getLocalHost(),
blockServer.getPort());

final CountDownLatch chunkReceivedLatch = new CountDownLatch(1);
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
@Override
public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) {
notifyAll();
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
chunkReceivedLatch.countDown();
}

@Override
public synchronized void onFailure(int chunkIndex, Throwable t) {
public void onFailure(int chunkIndex, Throwable t) {
exception.set(t);
notifyAll();
chunkReceivedLatch.countDown();
}
};

exception.set(null);
synchronized (callback) {
client2.fetchChunk(streamId, 0, callback);
callback.wait();
}
client2.fetchChunk(streamId, 0, callback);
chunkReceivedLatch.await();
checkSecurityException(exception.get());
} finally {
if (client1 != null) {
Expand Down
24 changes: 24 additions & 0 deletions common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,29 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<javacArg>-XDignore.symbol.file</javacArg>
</javacArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<arg>-XDignore.symbol.file</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@

package org.apache.spark.util.sketch;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;

class Utils {
public static byte[] getBytesFromUTF8String(String str) {
try {
return str.getBytes("utf-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalArgumentException("Only support utf-8 string", e);
}
return str.getBytes(StandardCharsets.UTF_8);
}

public static long integralToLong(Object i) {
Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgs>
<compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<arg>-XDignore.symbol.file</arg>
</compilerArgs>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,7 @@ public static UTF8String fromAddress(Object base, long offset, int numBytes) {
* Creates an UTF8String from String.
*/
public static UTF8String fromString(String str) {
if (str == null) return null;
try {
return fromBytes(str.getBytes("utf-8"));
} catch (UnsupportedEncodingException e) {
// Turn the exception into unchecked so we can find out about it at runtime, but
// don't need to add lots of boilerplate code everywhere.
throwException(e);
return null;
}
return str == null ? null : fromBytes(str.getBytes(StandardCharsets.UTF_8));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.unsafe.types;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;

Expand All @@ -30,9 +30,9 @@

public class UTF8StringSuite {

private static void checkBasic(String str, int len) throws UnsupportedEncodingException {
private static void checkBasic(String str, int len) {
UTF8String s1 = fromString(str);
UTF8String s2 = fromBytes(str.getBytes("utf8"));
UTF8String s2 = fromBytes(str.getBytes(StandardCharsets.UTF_8));
assertEquals(s1.numChars(), len);
assertEquals(s2.numChars(), len);

Expand All @@ -51,7 +51,7 @@ private static void checkBasic(String str, int len) throws UnsupportedEncodingEx
}

@Test
public void basicTest() throws UnsupportedEncodingException {
public void basicTest() {
checkBasic("", 0);
checkBasic("hello", 5);
checkBasic("大 千 世 界", 7);
Expand Down
Loading