Skip to content

Commit

Permalink
[SPARK-13823][SPARK-13397][SPARK-13395][CORE] More warnings, Standard…
Browse files Browse the repository at this point in the history
…Charset follow up

## What changes were proposed in this pull request?

Follow up to #11657

- Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8`
- And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests)
- And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings

## How was this patch tested?

Jenkins tests

Author: Sean Owen <sowen@cloudera.com>

Closes #11725 from srowen/SPARK-13823.2.
  • Loading branch information
srowen committed Mar 16, 2016
1 parent 05ab294 commit 3b461d9
Show file tree
Hide file tree
Showing 41 changed files with 178 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -93,7 +94,7 @@ public void receive(
ByteBuffer message,
RpcResponseCallback callback) {
try {
semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS);
semaphore.acquire();
callback.onSuccess(ByteBuffer.allocate(responseSize));
} catch (InterruptedException e) {
// do nothing
Expand All @@ -113,20 +114,17 @@ public StreamManager getStreamManager() {

// First completes quickly (semaphore starts at 1).
TestCallback callback0 = new TestCallback();
synchronized (callback0) {
client.sendRpc(ByteBuffer.allocate(0), callback0);
callback0.wait(FOREVER);
assertEquals(responseSize, callback0.successLength);
}
client.sendRpc(ByteBuffer.allocate(0), callback0);
callback0.latch.await();
assertEquals(responseSize, callback0.successLength);

// Second times out after 2 seconds, with slack. Must be IOException.
TestCallback callback1 = new TestCallback();
synchronized (callback1) {
client.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.wait(4 * 1000);
assertNotNull(callback1.failure);
assertTrue(callback1.failure instanceof IOException);
}
client.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.latch.await(4, TimeUnit.SECONDS);
assertNotNull(callback1.failure);
assertTrue(callback1.failure instanceof IOException);

semaphore.release();
}

Expand All @@ -143,7 +141,7 @@ public void receive(
ByteBuffer message,
RpcResponseCallback callback) {
try {
semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS);
semaphore.acquire();
callback.onSuccess(ByteBuffer.allocate(responseSize));
} catch (InterruptedException e) {
// do nothing
Expand All @@ -164,24 +162,20 @@ public StreamManager getStreamManager() {
TransportClient client0 =
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
TestCallback callback0 = new TestCallback();
synchronized (callback0) {
client0.sendRpc(ByteBuffer.allocate(0), callback0);
callback0.wait(FOREVER);
assertTrue(callback0.failure instanceof IOException);
assertFalse(client0.isActive());
}
client0.sendRpc(ByteBuffer.allocate(0), callback0);
callback0.latch.await();
assertTrue(callback0.failure instanceof IOException);
assertFalse(client0.isActive());

// Increment the semaphore and the second request should succeed quickly.
semaphore.release(2);
TransportClient client1 =
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
TestCallback callback1 = new TestCallback();
synchronized (callback1) {
client1.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.wait(FOREVER);
assertEquals(responseSize, callback1.successLength);
assertNull(callback1.failure);
}
client1.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.latch.await();
assertEquals(responseSize, callback1.successLength);
assertNull(callback1.failure);
}

// The timeout is relative to the LAST request sent, which is kinda weird, but still.
Expand Down Expand Up @@ -226,18 +220,14 @@ public StreamManager getStreamManager() {
client.fetchChunk(0, 1, callback1);
Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);

synchronized (callback0) {
// not complete yet, but should complete soon
assertEquals(-1, callback0.successLength);
assertNull(callback0.failure);
callback0.wait(2 * 1000);
assertTrue(callback0.failure instanceof IOException);
}
// not complete yet, but should complete soon
assertEquals(-1, callback0.successLength);
assertNull(callback0.failure);
callback0.latch.await(2, TimeUnit.SECONDS);
assertTrue(callback0.failure instanceof IOException);

synchronized (callback1) {
// failed at same time as previous
assertTrue(callback0.failure instanceof IOException);
}
// failed at same time as previous
assertTrue(callback1.failure instanceof IOException);
}

/**
Expand All @@ -248,41 +238,35 @@ static class TestCallback implements RpcResponseCallback, ChunkReceivedCallback

int successLength = -1;
Throwable failure;
final CountDownLatch latch = new CountDownLatch(1);

@Override
public void onSuccess(ByteBuffer response) {
synchronized(this) {
successLength = response.remaining();
this.notifyAll();
}
successLength = response.remaining();
latch.countDown();
}

@Override
public void onFailure(Throwable e) {
synchronized(this) {
failure = e;
this.notifyAll();
}
failure = e;
latch.countDown();
}

@Override
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
synchronized(this) {
try {
successLength = buffer.nioByteBuffer().remaining();
this.notifyAll();
} catch (IOException e) {
// weird
}
try {
successLength = buffer.nioByteBuffer().remaining();
} catch (IOException e) {
// weird
} finally {
latch.countDown();
}
}

@Override
public void onFailure(int chunkIndex, Throwable e) {
synchronized(this) {
failure = e;
this.notifyAll();
}
failure = e;
latch.countDown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -276,25 +277,21 @@ public ManagedBuffer answer(InvocationOnMock invocation) {

ctx = new SaslTestCtx(rpcHandler, true, false);

final Object lock = new Object();
final CountDownLatch lock = new CountDownLatch(1);

ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
response.set((ManagedBuffer) invocation.getArguments()[1]);
response.get().retain();
synchronized (lock) {
lock.notifyAll();
}
lock.countDown();
return null;
}
}).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class));

synchronized (lock) {
ctx.client.fetchChunk(0, 0, callback);
lock.wait(10 * 1000);
}
ctx.client.fetchChunk(0, 0, callback);
lock.await(10, TimeUnit.SECONDS);

verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class));
verify(callback, never()).onFailure(anyInt(), any(Throwable.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -197,26 +198,23 @@ public void testAppIsolation() throws Exception {

final AtomicReference<Throwable> exception = new AtomicReference<>();

final CountDownLatch blockFetchLatch = new CountDownLatch(1);
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
notifyAll();
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
blockFetchLatch.countDown();
}

@Override
public synchronized void onBlockFetchFailure(String blockId, Throwable t) {
public void onBlockFetchFailure(String blockId, Throwable t) {
exception.set(t);
notifyAll();
blockFetchLatch.countDown();
}
};

String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" };
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0",
blockIds, listener);
synchronized (listener) {
fetcher.start();
listener.wait();
}
String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener);
fetcher.start();
blockFetchLatch.await();
checkSecurityException(exception.get());

// Register an executor so that the next steps work.
Expand All @@ -240,24 +238,22 @@ public synchronized void onBlockFetchFailure(String blockId, Throwable t) {
client2 = clientFactory2.createClient(TestUtils.getLocalHost(),
blockServer.getPort());

final CountDownLatch chunkReceivedLatch = new CountDownLatch(1);
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
@Override
public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) {
notifyAll();
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
chunkReceivedLatch.countDown();
}

@Override
public synchronized void onFailure(int chunkIndex, Throwable t) {
public void onFailure(int chunkIndex, Throwable t) {
exception.set(t);
notifyAll();
chunkReceivedLatch.countDown();
}
};

exception.set(null);
synchronized (callback) {
client2.fetchChunk(streamId, 0, callback);
callback.wait();
}
client2.fetchChunk(streamId, 0, callback);
chunkReceivedLatch.await();
checkSecurityException(exception.get());
} finally {
if (client1 != null) {
Expand Down
24 changes: 24 additions & 0 deletions common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,29 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<javacArg>-XDignore.symbol.file</javacArg>
</javacArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<arg>-XDignore.symbol.file</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@

package org.apache.spark.util.sketch;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;

class Utils {
public static byte[] getBytesFromUTF8String(String str) {
try {
return str.getBytes("utf-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalArgumentException("Only support utf-8 string", e);
}
return str.getBytes(StandardCharsets.UTF_8);
}

public static long integralToLong(Object i) {
Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgs>
<compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<arg>-XDignore.symbol.file</arg>
</compilerArgs>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,7 @@ public static UTF8String fromAddress(Object base, long offset, int numBytes) {
* Creates an UTF8String from String.
*/
public static UTF8String fromString(String str) {
if (str == null) return null;
try {
return fromBytes(str.getBytes("utf-8"));
} catch (UnsupportedEncodingException e) {
// Turn the exception into unchecked so we can find out about it at runtime, but
// don't need to add lots of boilerplate code everywhere.
throwException(e);
return null;
}
return str == null ? null : fromBytes(str.getBytes(StandardCharsets.UTF_8));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.unsafe.types;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;

Expand All @@ -30,9 +30,9 @@

public class UTF8StringSuite {

private static void checkBasic(String str, int len) throws UnsupportedEncodingException {
private static void checkBasic(String str, int len) {
UTF8String s1 = fromString(str);
UTF8String s2 = fromBytes(str.getBytes("utf8"));
UTF8String s2 = fromBytes(str.getBytes(StandardCharsets.UTF_8));
assertEquals(s1.numChars(), len);
assertEquals(s2.numChars(), len);

Expand All @@ -51,7 +51,7 @@ private static void checkBasic(String str, int len) throws UnsupportedEncodingEx
}

@Test
public void basicTest() throws UnsupportedEncodingException {
public void basicTest() {
checkBasic("", 0);
checkBasic("hello", 5);
checkBasic("大 千 世 界", 7);
Expand Down
Loading

0 comments on commit 3b461d9

Please sign in to comment.