From b2ecd44a7ee13d6c98171c63d056bdef481d6e0f Mon Sep 17 00:00:00 2001 From: Wurstmeister Date: Sun, 5 Oct 2014 21:46:24 +0100 Subject: [PATCH 1/2] STORM-307: reset LocalState if files are corrupt * allow supervisor to start if LocalState was corrupted Change-Id: I3dbf957dd2e37c9c6be39dab29a9a71e99b7a2c3 --- .../jvm/backtype/storm/utils/LocalState.java | 28 +++++++++++++------ .../clj/backtype/storm/local_state_test.clj | 14 +++++++++- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java index 0d0ae0754de..f58e79c0848 100644 --- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java +++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java @@ -18,6 +18,8 @@ package backtype.storm.utils; import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.util.Map; @@ -30,25 +32,35 @@ * Every read/write hits disk. */ public class LocalState { + public static Logger LOG = LoggerFactory.getLogger(LocalState.class); + private VersionedStore _vs; public LocalState(String backingDir) throws IOException { _vs = new VersionedStore(backingDir); } - + public synchronized Map snapshot() throws IOException { int attempts = 0; + Map result = new HashMap(); while(true) { String latestPath = _vs.mostRecentVersionPath(); - if(latestPath==null) return new HashMap(); - try { - return (Map) Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath))); - } catch(IOException e) { - attempts++; - if(attempts >= 10) { - throw e; + if(latestPath != null) { + try { + byte[] serialized = FileUtils.readFileToByteArray(new File(latestPath)); + if (serialized.length == 0) { + LOG.warn("LocalState file '{}' contained no data, resetting state", latestPath); + } else { + result = (Map) Utils.deserialize(serialized); + } + } catch (IOException e) { + attempts++; + if (attempts >= 10) { + throw e; + } } } + return result; } } diff --git a/storm-core/test/clj/backtype/storm/local_state_test.clj b/storm-core/test/clj/backtype/storm/local_state_test.clj index ba2b969d64c..4bd58ec814b 100644 --- a/storm-core/test/clj/backtype/storm/local_state_test.clj +++ b/storm-core/test/clj/backtype/storm/local_state_test.clj @@ -16,7 +16,9 @@ (ns backtype.storm.local-state-test (:use [clojure test]) (:use [backtype.storm testing]) - (:import [backtype.storm.utils LocalState])) + (:import [backtype.storm.utils LocalState] + [org.apache.commons.io FileUtils] + [java.io File])) (deftest test-local-state (with-local-tmp [dir1 dir2] @@ -41,3 +43,13 @@ (.put ls2 "b" 8) (is (= 8 (.get ls2 "b"))) ))) + +(deftest empty-state + (with-local-tmp [dir] + (let [ls (LocalState. dir) + data (FileUtils/openOutputStream (File. dir "12345")) + version (FileUtils/openOutputStream (File. dir "12345.version"))] + (is (= nil (.get ls "c"))) + (.put ls "a" 1) + (is (= 1 (.get ls "a"))) + ))) From ccde2332da9ae2c4b43e38651cc0d08ae0ebdb18 Mon Sep 17 00:00:00 2001 From: Wurstmeister Date: Mon, 6 Oct 2014 10:17:26 +0100 Subject: [PATCH 2/2] STORM-307: fixed IOException retry logic Change-Id: Iffa4609ee9b8a0afbd437d2b2ee802cc83773a0a --- .../jvm/backtype/storm/utils/LocalState.java | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java index f58e79c0848..14a45dabb5a 100644 --- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java +++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java @@ -42,28 +42,32 @@ public LocalState(String backingDir) throws IOException { public synchronized Map snapshot() throws IOException { int attempts = 0; - Map result = new HashMap(); while(true) { - String latestPath = _vs.mostRecentVersionPath(); - if(latestPath != null) { - try { - byte[] serialized = FileUtils.readFileToByteArray(new File(latestPath)); - if (serialized.length == 0) { - LOG.warn("LocalState file '{}' contained no data, resetting state", latestPath); - } else { - result = (Map) Utils.deserialize(serialized); - } - } catch (IOException e) { - attempts++; - if (attempts >= 10) { - throw e; - } + try { + return deserializeLatestVersion(); + } catch (IOException e) { + attempts++; + if (attempts >= 10) { + throw e; } } - return result; } } - + + private Map deserializeLatestVersion() throws IOException { + String latestPath = _vs.mostRecentVersionPath(); + Map result = new HashMap(); + if (latestPath != null) { + byte[] serialized = FileUtils.readFileToByteArray(new File(latestPath)); + if (serialized.length == 0) { + LOG.warn("LocalState file '{}' contained no data, resetting state", latestPath); + } else { + result = (Map) Utils.deserialize(serialized); + } + } + return result; + } + public Object get(Object key) throws IOException { return snapshot().get(key); }