From 2d91e74f8e80a7e7b943f8bbc1e9eb879e7c7dcd Mon Sep 17 00:00:00 2001 From: Malcolm Taylor Date: Sun, 8 Oct 2017 08:06:17 +0100 Subject: [PATCH 1/4] flink-7736: fix some lgtm.com alerts --- .../flink/optimizer/dag/GroupReduceNode.java | 2 +- .../webmonitor/handlers/JarListHandler.java | 3 +- .../flink/runtime/memory/MemoryManager.java | 2 +- .../hash/InPlaceMutableHashTable.java | 4 +-- .../runtime/taskexecutor/TaskExecutor.java | 2 +- .../taskexecutor/slot/TaskSlotTable.java | 2 +- .../flink/runtime/util/JarFileCreator.java | 2 +- .../webmonitor/history/ArchivedJson.java | 9 ++++++ .../webmonitor/history/ArchivedJsonTest.java | 10 +++++++ .../source/SocketTextStreamFunction.java | 29 ++++++++++--------- .../util/typeutils/FieldAccessor.java | 4 --- .../yarn/YarnApplicationMasterRunner.java | 2 +- 12 files changed, 43 insertions(+), 28 deletions(-) diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java index bd118ec830111..bef2c0b2f44e2 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java @@ -97,7 +97,7 @@ else if (Optimizer.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) { // check if we can work with a grouping (simple reducer), or if we need ordering because of a group order Ordering groupOrder = null; - if (getOperator() instanceof GroupReduceOperatorBase) { + if (getOperator() != null) { groupOrder = getOperator().getGroupOrder(); if (groupOrder != null && groupOrder.getNumberOfFields() == 0) { groupOrder = null; diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java index 2b56ecd81a897..66cc7f57b2d80 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java @@ -111,8 +111,7 @@ public boolean accept(File dir, String name) { gen.writeArrayFieldStart("entry"); String[] classes = new String[0]; - try { - JarFile jar = new JarFile(f); + try (JarFile jar = new JarFile(f)) { Manifest manifest = jar.getManifest(); String assemblerClass = null; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java index c1a98cf7a641e..f3bea87a03931 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java @@ -568,7 +568,7 @@ public int computeNumberOfPages(double fraction) { * @return The number of pages corresponding to the memory fraction. */ public long computeMemorySize(double fraction) { - return pageSize * computeNumberOfPages(fraction); + return pageSize * (long) computeNumberOfPages(fraction); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java index bfc9aec426119..22a5d2a3989c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTable.java @@ -199,7 +199,7 @@ public InPlaceMutableHashTable(TypeSerializer serializer, TypeComparator c * @return The hash table's total capacity. */ public long getCapacity() { - return numAllMemorySegments * segmentSize; + return numAllMemorySegments * (long)segmentSize; } /** @@ -562,7 +562,7 @@ public void giveBackSegments() { } public long getTotalSize() { - return segments.size() * segmentSize; + return segments.size() * (long)segmentSize; } // ----------------------- Output ----------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 0f98c49947c41..30cf3770366ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -981,7 +981,7 @@ private JobManagerConnection associateWithJobManager( Preconditions.checkNotNull(jobID); Preconditions.checkNotNull(resourceID); Preconditions.checkNotNull(jobMasterGateway); - Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob server port is out of range."); + Preconditions.checkArgument(blobPort > 0 && blobPort < MAX_BLOB_PORT, "Blob server port is out of range."); TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index ab62a86f89c96..fcb27611e0715 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -452,7 +452,7 @@ public boolean addTask(Task task) throws SlotNotFoundException, SlotNotActiveExc throw new SlotNotActiveException(task.getJobID(), task.getAllocationId()); } } else { - throw new SlotNotFoundException(taskSlot.getAllocationId()); + throw new SlotNotFoundException(task.getAllocationId()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java index ad7906ae8c108..d77c9f863501e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java @@ -190,7 +190,7 @@ public synchronized void createJarFile() throws IOException { this.outputFile.delete(); } - try ( JarOutputStream jos = new JarOutputStream(new FileOutputStream(this.outputFile), new Manifest())) { + try ( FileOutputStream fos = new FileOutputStream(this.outputFile); JarOutputStream jos = new JarOutputStream(fos, new Manifest())) { final Iterator> it = this.classSet.iterator(); while (it.hasNext()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java index 23e7676fadbd2..9b455bcaf085a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java @@ -21,6 +21,8 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.util.Preconditions; +import org.apache.commons.lang3.builder.HashCodeBuilder; + /** * A simple container for a handler's JSON response and the REST URLs for which the response would've been returned. * @@ -55,6 +57,13 @@ public boolean equals(Object obj) { } } + public int hashCode() { + HashCodeBuilder bldr = new HashCodeBuilder(); + bldr.append(path); + bldr.append(json); + return bldr.toHashCode(); + } + @Override public String toString() { return path + ":" + json; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java index 57ebbc971e0d2..05fe2eb88bede 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchivedJsonTest.java @@ -38,4 +38,14 @@ public void testEquals() { Assert.assertNotEquals(original, identicalPath); Assert.assertNotEquals(original, identicalJson); } + + @Test + public void testHashCode() { + ArchivedJson original = new ArchivedJson("path", "json"); + ArchivedJson twin = new ArchivedJson("path", "json"); + + Assert.assertEquals(original, original); + Assert.assertEquals(original, twin); + Assert.assertEquals(original.hashCode(), twin.hashCode()); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 1552ee298c671..8d04257ec50e8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -94,21 +94,22 @@ public void run(SourceContext ctx) throws Exception { LOG.info("Connecting to server socket " + hostname + ':' + port); socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); - BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); - - char[] cbuf = new char[8192]; - int bytesRead; - while (isRunning && (bytesRead = reader.read(cbuf)) != -1) { - buffer.append(cbuf, 0, bytesRead); - int delimPos; - while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) { - String record = buffer.substring(0, delimPos); - // truncate trailing carriage return - if (delimiter.equals("\n") && record.endsWith("\r")) { - record = record.substring(0, record.length() - 1); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { + + char[] cbuf = new char[8192]; + int bytesRead; + while (isRunning && (bytesRead = reader.read(cbuf)) != -1) { + buffer.append(cbuf, 0, bytesRead); + int delimPos; + while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) { + String record = buffer.substring(0, delimPos); + // truncate trailing carriage return + if (delimiter.equals("\n") && record.endsWith("\r")) { + record = record.substring(0, record.length() - 1); + } + ctx.collect(record); + buffer.delete(0, delimPos + delimiter.length()); } - ctx.collect(record); - buffer.delete(0, delimPos + delimiter.length()); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java index 3751670eb54a2..411e4adaefcfe 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java @@ -204,10 +204,6 @@ static final class RecursiveTupleFieldAccessor extends Fi typeInfo.toString() + "\", which is an invalid index."); } - if (pos < 0) { - throw new CompositeType.InvalidFieldReferenceException("Tried to select " + ((Integer) pos).toString() + ". field."); - } - this.pos = pos; this.innerAccessor = innerAccessor; this.fieldType = innerAccessor.fieldType; diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index e97fac304708e..68c0aec828412 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -139,7 +139,7 @@ protected int run(String[] args) { LOG.debug("All environment variables: {}", ENV); final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - require(yarnClientUsername != null, "YARN client user name environment variable {} not set", + require(yarnClientUsername != null, "YARN client user name environment variable (%s) not set", YarnConfigKeys.ENV_HADOOP_USER_NAME); final String currDir = ENV.get(Environment.PWD.key()); From c08e7c36993a9f47f973d4ad5984d87809750861 Mon Sep 17 00:00:00 2001 From: Malcolm Taylor Date: Mon, 9 Oct 2017 10:43:18 +0100 Subject: [PATCH 2/4] flink-7736: address review comments --- .../flink/runtime/webmonitor/history/ArchivedJson.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java index 9b455bcaf085a..99c362fb9e2c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java @@ -18,9 +18,10 @@ package org.apache.flink.runtime.webmonitor.history; +import java.util.Objects; + import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.util.Preconditions; - import org.apache.commons.lang3.builder.HashCodeBuilder; /** @@ -57,11 +58,9 @@ public boolean equals(Object obj) { } } + @Override public int hashCode() { - HashCodeBuilder bldr = new HashCodeBuilder(); - bldr.append(path); - bldr.append(json); - return bldr.toHashCode(); + return Objects.hash(path, json); } @Override From e6f3aa5f3f549d3c6d00016301c7f3b0f5752f5e Mon Sep 17 00:00:00 2001 From: Malcolm Taylor Date: Mon, 9 Oct 2017 14:38:46 +0100 Subject: [PATCH 3/4] flink-7736: fix style violations --- .../flink/runtime/webmonitor/history/ArchivedJson.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java index 99c362fb9e2c8..a15dd529e121a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java @@ -18,11 +18,9 @@ package org.apache.flink.runtime.webmonitor.history; -import java.util.Objects; - -import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.util.Preconditions; -import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.util.Objects; /** * A simple container for a handler's JSON response and the REST URLs for which the response would've been returned. From 78ca168c178a5836f4fd3254e66a0f7e00f6fb78 Mon Sep 17 00:00:00 2001 From: Malcolm Taylor Date: Fri, 15 Dec 2017 19:50:10 +0000 Subject: [PATCH 4/4] flink-7736: fix a null pointer exception --- .../main/java/org/apache/flink/yarn/YarnResourceManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 910172d97d867..4e5cfce87d2a1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -292,7 +292,7 @@ public boolean stopWorker(YarnWorkerNode workerNode) { resourceManagerClient.releaseAssignedContainer(container.getId()); workerNodeMap.remove(workerNode.getResourceID()); } else { - log.error("Can not find container with resource ID {}.", workerNode.getResourceID()); + log.error("Can not find container for null workerNode."); } return true; }