From 5f4b1864a2ec8dd257d804c3887be78662cb2c7e Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Mon, 18 Dec 2017 13:22:13 +0100 Subject: [PATCH 1/2] [FLINK-8280][checkstyle] enable and fix checkstyle in BlobServer and BlobUtils --- .../apache/flink/runtime/blob/BlobClient.java | 2 +- .../flink/runtime/blob/BlobInputStream.java | 6 +-- .../apache/flink/runtime/blob/BlobKey.java | 12 ++--- .../apache/flink/runtime/blob/BlobServer.java | 46 +++++++++---------- .../runtime/blob/BlobServerProtocol.java | 5 ++ .../apache/flink/runtime/blob/BlobUtils.java | 11 ++--- .../runtime/blob/FileSystemBlobStore.java | 8 ++-- .../flink/runtime/blob/BlobCacheGetTest.java | 2 +- .../flink/runtime/blob/BlobClientSslTest.java | 40 ++++++++-------- .../flink/runtime/blob/BlobClientTest.java | 12 ++--- .../blob/TestingFailingBlobServer.java | 2 +- tools/maven/suppressions-runtime.xml | 7 --- 12 files changed, 75 insertions(+), 78 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index fbcce58d246ce..8e6b32811c2e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -119,7 +119,7 @@ public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) t } } - catch(Exception e) { + catch (Exception e) { BlobUtils.closeSilently(socket, LOG); throw new IOException("Could not connect to BlobServer at address " + serverAddress, e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java index 7a73917ce1a4a..ad318bb85d68f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java @@ -50,7 +50,7 @@ final class BlobInputStream extends InputStream { private final OutputStream wrappedOutputStream; /** - * The BLOB key if the GET operation has been performed on a content-addressable BLOB, otherwise null. + * The BLOB key if the GET operation has been performed on a content-addressable BLOB, otherwise null. */ private final BlobKey blobKey; @@ -72,7 +72,7 @@ final class BlobInputStream extends InputStream { /** * Constructs a new BLOB input stream. - * + * * @param wrappedInputStream * the underlying input stream to read from * @param blobKey @@ -98,7 +98,7 @@ final class BlobInputStream extends InputStream { /** * Convenience method to throw an {@link EOFException}. - * + * * @throws EOFException * thrown to indicate the underlying input stream did not provide as much data as expected */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java index 4b1d498586971..988af8be0d969 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java @@ -199,14 +199,14 @@ BlobType getType() { /** * Adds the BLOB key to the given {@link MessageDigest}. - * + * * @param md * the message digest to add the BLOB key to */ public void addToMessageDigest(MessageDigest md) { md.update(this.key); } - + @Override public boolean equals(final Object obj) { @@ -252,7 +252,7 @@ public int compareTo(BlobKey o) { final byte[] aarr = this.key; final byte[] barr = o.key; final int len = Math.min(aarr.length, barr.length); - + for (int i = 0; i < len; ++i) { final int a = (aarr[i] & 0xff); final int b = (barr[i] & 0xff); @@ -274,12 +274,12 @@ public int compareTo(BlobKey o) { return aarr.length - barr.length; } } - + // -------------------------------------------------------------------------------------------- /** * Auxiliary method to read a BLOB key from an input stream. - * + * * @param inputStream * the input stream to read the BLOB key from * @return the read BLOB key @@ -331,7 +331,7 @@ static BlobKey readFromInputStream(InputStream inputStream) throws IOException { /** * Auxiliary method to write this BLOB key to an output stream. - * + * * @param outputStream * the output stream to write the BLOB key to * @throws IOException diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 01fb80870de44..a3522bf3b8181 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -77,28 +77,28 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma /** The server socket listening for incoming connections. */ private final ServerSocket serverSocket; - /** The SSL server context if ssl is enabled for the connections */ + /** The SSL server context if ssl is enabled for the connections. */ private final SSLContext serverSSLContext; - /** Blob Server configuration */ + /** Blob Server configuration. */ private final Configuration blobServiceConfiguration; /** Indicates whether a shutdown of server component has been requested. */ private final AtomicBoolean shutdownRequested = new AtomicBoolean(); - /** Root directory for local file storage */ + /** Root directory for local file storage. */ private final File storageDir; - /** Blob store for distributed file storage, e.g. in HA */ + /** Blob store for distributed file storage, e.g. in HA. */ private final BlobStore blobStore; - /** Set of currently running threads */ + /** Set of currently running threads. */ private final Set activeConnections = new HashSet<>(); - /** The maximum number of concurrent connections */ + /** The maximum number of concurrent connections. */ private final int maxConnections; - /** Lock guarding concurrent file accesses */ + /** Lock guarding concurrent file accesses. */ private final ReadWriteLock readWriteLock; /** @@ -201,8 +201,8 @@ public ServerSocket createSocket(int port) throws IOException { } }); - if(socketAttempt == null) { - throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange); + if (socketAttempt == null) { + throw new IOException("Unable to allocate socket for blob server in specified port range: " + serverPortRange); } else { SSLUtils.setSSLVerAndCipherSuites(socketAttempt, config); this.serverSocket = socketAttempt; @@ -254,7 +254,7 @@ File createTemporaryFilename() throws IOException { } /** - * Returns the lock used to guard file accesses + * Returns the lock used to guard file accesses. */ ReadWriteLock getReadWriteLock() { return readWriteLock; @@ -360,7 +360,7 @@ public void close() throws IOException { } } - if(LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled()) { LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort()); } @@ -375,8 +375,8 @@ protected BlobClient createClient() throws IOException { /** * Retrieves the local path of a (job-unrelated) file associated with a job and a blob key. - *

- * The blob server looks the blob key up in its local storage. If the file exists, it is + * + *

The blob server looks the blob key up in its local storage. If the file exists, it is * returned. If the file does not exist, it is retrieved from the HA blob store (if available) * or a {@link FileNotFoundException} is thrown. * @@ -395,8 +395,8 @@ public File getFile(TransientBlobKey key) throws IOException { /** * Retrieves the local path of a file associated with a job and a blob key. - *

- * The blob server looks the blob key up in its local storage. If the file exists, it is + * + *

The blob server looks the blob key up in its local storage. If the file exists, it is * returned. If the file does not exist, it is retrieved from the HA blob store (if available) * or a {@link FileNotFoundException} is thrown. * @@ -419,8 +419,8 @@ public File getFile(JobID jobId, TransientBlobKey key) throws IOException { /** * Returns the path to a local copy of the file associated with the provided job ID and blob * key. - *

- * We will first attempt to serve the BLOB from the local storage. If the BLOB is not in + * + *

We will first attempt to serve the BLOB from the local storage. If the BLOB is not in * there, we will try to download it from the HA store. * * @param jobId @@ -443,8 +443,8 @@ public File getFile(JobID jobId, PermanentBlobKey key) throws IOException { /** * Retrieves the local path of a file associated with a job and a blob key. - *

- * The blob server looks the blob key up in its local storage. If the file exists, it is + * + *

The blob server looks the blob key up in its local storage. If the file exists, it is * returned. If the file does not exist, it is retrieved from the HA blob store (if available) * or a {@link FileNotFoundException} is thrown. * @@ -474,12 +474,12 @@ private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOEx /** * Helper to retrieve the local path of a file associated with a job and a blob key. - *

- * The blob server looks the blob key up in its local storage. If the file exists, it is + * + *

The blob server looks the blob key up in its local storage. If the file exists, it is * returned. If the file does not exist, it is retrieved from the HA blob store (if available) * or a {@link FileNotFoundException} is thrown. - *

- * Assumes the read lock has already been acquired. + * + *

Assumes the read lock has already been acquired. * * @param jobId * ID of the job this blob belongs to (or null if job-unrelated) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java index 5c9c7b0b52aa5..d158428d143d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java @@ -15,8 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.blob; +/** + * Defines constants for the protocol between the BLOB {@link BlobServer server} and the + * {@link AbstractBlobCache caches}. + */ public class BlobServerProtocol { // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index 04f2cdb7a3e7b..ac0335095794a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -150,8 +150,7 @@ static File initLocalStorageDirectory(String basePath) throws IOException { File storageDir; // NOTE: although we will be using UUIDs, there may be collisions - final int MAX_ATTEMPTS = 10; - for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { + for (int attempt = 0; attempt < 10; attempt++) { storageDir = new File(baseDir, String.format( "blobStore-%s", UUID.randomUUID().toString())); @@ -251,8 +250,8 @@ static String getStorageLocationPath(String storageDir, @Nullable JobID jobId) { /** * Returns the path for the given blob key. - *

- * The returned path can be used with the (local or HA) BLOB store file system back-end for + * + *

The returned path can be used with the (local or HA) BLOB store file system back-end for * recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID, * BlobKey)}. * @@ -403,12 +402,12 @@ static void readFully(InputStream inputStream, byte[] buf, int off, int len, Str } } - static void closeSilently(Socket socket, Logger LOG) { + static void closeSilently(Socket socket, Logger log) { if (socket != null) { try { socket.close(); } catch (Throwable t) { - LOG.debug("Exception while closing BLOB server connection socket.", t); + log.debug("Exception while closing BLOB server connection socket.", t); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java index 4fed4cd43c9d6..4a07a6097a97f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java @@ -46,10 +46,10 @@ public class FileSystemBlobStore implements BlobStoreService { private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class); - /** The file system in which blobs are stored */ + /** The file system in which blobs are stored. */ private final FileSystem fileSystem; - - /** The base path of the blob store */ + + /** The base path of the blob store. */ private final String basePath; public FileSystemBlobStore(FileSystem fileSystem, String storagePath) throws IOException { @@ -148,7 +148,7 @@ public boolean deleteAll(JobID jobId) { private boolean delete(String blobPath) { try { LOG.debug("Deleting {}.", blobPath); - + Path path = new Path(blobPath); boolean result = fileSystem.delete(path, true); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java index c760d04cbd188..6e05aa09f9946 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java @@ -78,7 +78,7 @@ /** * Tests for GET-specific parts of the {@link BlobCacheService}. * - * This includes access of transient BLOBs from the {@link PermanentBlobCache}, permanent BLOBS from + *

This includes access of transient BLOBs from the {@link PermanentBlobCache}, permanent BLOBS from * the {@link TransientBlobCache}, and how failing GET requests behave in the presence of failures * when used with a {@link BlobCacheService}. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index 664dc28cba4fe..b654ceee6ae92 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -36,10 +36,10 @@ public class BlobClientSslTest extends BlobClientTest { /** The instance of the SSL BLOB server used during the tests. */ - private static BlobServer BLOB_SSL_SERVER; + private static BlobServer blobSslServer; /** Instance of a non-SSL BLOB server with SSL-enabled security options. */ - private static BlobServer BLOB_NON_SSL_SERVER; + private static BlobServer blobNonSslServer; /** The SSL blob service client configuration. */ private static Configuration sslClientConfig; @@ -62,8 +62,8 @@ public static void startSSLServer() throws IOException { config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); - BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); - BLOB_SSL_SERVER.start(); + blobSslServer = new BlobServer(config, new VoidBlobStore()); + blobSslServer.start(); sslClientConfig = new Configuration(); sslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); @@ -81,8 +81,8 @@ public static void startNonSSLServer() throws IOException { config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); - BLOB_NON_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); - BLOB_NON_SSL_SERVER.start(); + blobNonSslServer = new BlobServer(config, new VoidBlobStore()); + blobNonSslServer.start(); nonSslClientConfig = new Configuration(); nonSslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); @@ -96,11 +96,11 @@ public static void startNonSSLServer() throws IOException { */ @AfterClass public static void stopServers() throws IOException { - if (BLOB_SSL_SERVER != null) { - BLOB_SSL_SERVER.close(); + if (blobSslServer != null) { + blobSslServer.close(); } - if (BLOB_NON_SSL_SERVER != null) { - BLOB_NON_SSL_SERVER.close(); + if (blobNonSslServer != null) { + blobNonSslServer.close(); } } @@ -109,7 +109,7 @@ protected Configuration getBlobClientConfig() { } protected BlobServer getBlobServer() { - return BLOB_SSL_SERVER; + return blobSslServer; } /** @@ -117,7 +117,7 @@ protected BlobServer getBlobServer() { */ @Test public void testUploadJarFilesHelper() throws Exception { - uploadJarFile(BLOB_SSL_SERVER, sslClientConfig); + uploadJarFile(blobSslServer, sslClientConfig); } /** @@ -126,7 +126,7 @@ public void testUploadJarFilesHelper() throws Exception { @Test(expected = IOException.class) public void testSSLClientFailure() throws Exception { // SSL client connected to non-ssl server - uploadJarFile(BLOB_SERVER, sslClientConfig); + uploadJarFile(blobServer, sslClientConfig); } /** @@ -135,7 +135,7 @@ public void testSSLClientFailure() throws Exception { @Test(expected = IOException.class) public void testSSLClientFailure2() throws Exception { // SSL client connected to non-ssl server - uploadJarFile(BLOB_NON_SSL_SERVER, sslClientConfig); + uploadJarFile(blobNonSslServer, sslClientConfig); } /** @@ -144,7 +144,7 @@ public void testSSLClientFailure2() throws Exception { @Test(expected = IOException.class) public void testSSLServerFailure() throws Exception { // Non-SSL client connected to ssl server - uploadJarFile(BLOB_SSL_SERVER, clientConfig); + uploadJarFile(blobSslServer, clientConfig); } /** @@ -153,7 +153,7 @@ public void testSSLServerFailure() throws Exception { @Test(expected = IOException.class) public void testSSLServerFailure2() throws Exception { // Non-SSL client connected to ssl server - uploadJarFile(BLOB_SSL_SERVER, nonSslClientConfig); + uploadJarFile(blobSslServer, nonSslClientConfig); } /** @@ -161,7 +161,7 @@ public void testSSLServerFailure2() throws Exception { */ @Test public void testNonSSLConnection() throws Exception { - uploadJarFile(BLOB_SERVER, clientConfig); + uploadJarFile(blobServer, clientConfig); } /** @@ -169,7 +169,7 @@ public void testNonSSLConnection() throws Exception { */ @Test public void testNonSSLConnection2() throws Exception { - uploadJarFile(BLOB_SERVER, nonSslClientConfig); + uploadJarFile(blobServer, nonSslClientConfig); } /** @@ -177,7 +177,7 @@ public void testNonSSLConnection2() throws Exception { */ @Test public void testNonSSLConnection3() throws Exception { - uploadJarFile(BLOB_NON_SSL_SERVER, clientConfig); + uploadJarFile(blobNonSslServer, clientConfig); } /** @@ -185,6 +185,6 @@ public void testNonSSLConnection3() throws Exception { */ @Test public void testNonSSLConnection4() throws Exception { - uploadJarFile(BLOB_NON_SSL_SERVER, nonSslClientConfig); + uploadJarFile(blobNonSslServer, nonSslClientConfig); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 9e4f4b71f8069..8c2b7c0220c4f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -62,7 +62,7 @@ public class BlobClientTest extends TestLogger { private static final int TEST_BUFFER_SIZE = 17 * 1000; /** The instance of the (non-ssl) BLOB server used during the tests. */ - static BlobServer BLOB_SERVER; + static BlobServer blobServer; /** The blob service (non-ssl) client configuration. */ static Configuration clientConfig; @@ -79,8 +79,8 @@ public static void startServer() throws IOException { config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); - BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); - BLOB_SERVER.start(); + blobServer = new BlobServer(config, new VoidBlobStore()); + blobServer.start(); clientConfig = new Configuration(); } @@ -90,8 +90,8 @@ public static void startServer() throws IOException { */ @AfterClass public static void stopServer() throws IOException { - if (BLOB_SERVER != null) { - BLOB_SERVER.close(); + if (blobServer != null) { + blobServer.close(); } } @@ -319,7 +319,7 @@ protected Configuration getBlobClientConfig() { } protected BlobServer getBlobServer() { - return BLOB_SERVER; + return blobServer; } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java index 003eaf65df928..57c6366c2bddb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java @@ -78,7 +78,7 @@ public void run() { if (socket != null) { try { socket.close(); - } catch(Throwable ignored) {} + } catch (Throwable ignored) {} } } } diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml index 59f3019b8ba33..8f0162d8c9ee0 100644 --- a/tools/maven/suppressions-runtime.xml +++ b/tools/maven/suppressions-runtime.xml @@ -23,13 +23,6 @@ under the License. "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd"> - - - From cc3b37235d8d3e17f1e3b0ceadce5dd54f0c30e9 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 2 Jan 2018 12:28:44 +0100 Subject: [PATCH 2/2] fixup! [FLINK-8280][checkstyle] enable and fix checkstyle in BlobServer and BlobUtils --- .../src/main/java/org/apache/flink/runtime/blob/BlobUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index ac0335095794a..28430adf23811 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -150,7 +150,8 @@ static File initLocalStorageDirectory(String basePath) throws IOException { File storageDir; // NOTE: although we will be using UUIDs, there may be collisions - for (int attempt = 0; attempt < 10; attempt++) { + int maxAttempts = 10; + for (int attempt = 0; attempt < maxAttempts; attempt++) { storageDir = new File(baseDir, String.format( "blobStore-%s", UUID.randomUUID().toString()));