From 0649255ac69ca368e58d42d14de1602c2c13e674 Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Sun, 28 Feb 2016 09:07:29 -0800 Subject: [PATCH] APEXCORE-362 - NPE in StreamingContainerManager. Fixed race condition between the thread that insert into endWindowStatsOperatorMap and the thread that removes entries when endWindowStatsOperatorMap exceeds 1000 entries. --- .../com/datatorrent/stram/StreamingContainerManager.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 29c6a2c951..1c179874b1 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -1684,8 +1684,12 @@ public void run() if (stats.windowId > currentEndWindowStatsWindowId) { Map endWindowStatsMap = endWindowStatsOperatorMap.get(stats.windowId); if (endWindowStatsMap == null) { - endWindowStatsOperatorMap.putIfAbsent(stats.windowId, new ConcurrentSkipListMap()); - endWindowStatsMap = endWindowStatsOperatorMap.get(stats.windowId); + endWindowStatsMap = new ConcurrentSkipListMap(); + Map endWindowStatsMapPrevious = + endWindowStatsOperatorMap.putIfAbsent(stats.windowId, endWindowStatsMap); + if (endWindowStatsMapPrevious != null) { + endWindowStatsMap = endWindowStatsMapPrevious; + } } endWindowStatsMap.put(shb.getNodeId(), endWindowStats);