Skip to content

Commit

Permalink
Merge branch '0.7.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed May 9, 2012
2 parents 3d0692a + 45d0640 commit 3e048a1
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 13 deletions.
2 changes: 1 addition & 1 deletion conf/defaults.yaml
Expand Up @@ -50,7 +50,7 @@ supervisor.slots.ports:
- 6703
supervisor.childopts: "-Xmx1024m"
#how long supervisor will wait to ensure that a worker process is started
supervisor.worker.start.timeout.secs: 240
supervisor.worker.start.timeout.secs: 120
#how long between heartbeats until supervisor considers that worker dead and tries to restart it
supervisor.worker.timeout.secs: 30
#how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary
Expand Down
2 changes: 1 addition & 1 deletion project.clj
@@ -1,4 +1,4 @@
(defproject storm "0.7.2-SNAPSHOT"
(defproject storm "0.7.2-rc1"
:source-path "src/clj"
:test-path "test/clj"
:java-source-path "src/jvm"
Expand Down
7 changes: 4 additions & 3 deletions src/clj/backtype/storm/daemon/supervisor.clj
Expand Up @@ -259,16 +259,17 @@
storm-code-map (read-storm-code-locations storm-cluster-state sync-callback)
assigned-storm-ids (set (keys storm-code-map))
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
new-assignment (->>
(read-assignments
all-assignment (read-assignments
storm-cluster-state
(:supervisor-id supervisor)
sync-callback)
(filter-key #(.confirmAssigned isupervisor %)))
new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %)))
existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
(log-debug "Synchronizing supervisor")
(log-debug "Storm code map: " storm-code-map)
(log-debug "Downloaded storm ids: " downloaded-storm-ids)
(log-debug "All assignment: " all-assignment)
(log-debug "New assignment: " new-assignment)
;; download code first
;; This might take awhile
Expand Down
14 changes: 7 additions & 7 deletions src/jvm/backtype/storm/task/TopologyContext.java
@@ -1,25 +1,21 @@
package backtype.storm.task;

import backtype.storm.Config;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.StreamInfo;
import backtype.storm.hooks.ITaskHook;
import backtype.storm.state.ISubscribedState;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.ThriftTopologyUtils;
import backtype.storm.utils.Utils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.NotImplementedException;
import org.json.simple.JSONValue;

/**
* A TopologyContext is given to bolts and spouts in their "prepare" and "open"
Expand All @@ -45,7 +41,11 @@ public TopologyContext(StormTopology topology, Map stormConf,
super(topology, stormConf, taskToComponent, stormId);
_workerPort = workerPort;
_taskId = taskId;
_pidDir = pidDir;
try {
_pidDir = new File(pidDir).getCanonicalPath();
} catch (IOException e) {
throw new RuntimeException("Could not get canonical path for " + _pidDir, e);
}
_codeDir = codeDir;
_workerTasks = new ArrayList<Integer>(workerTasks);
Collections.sort(_workerTasks);
Expand Down
2 changes: 1 addition & 1 deletion src/jvm/backtype/storm/utils/ShellProcess.java
Expand Up @@ -70,7 +70,7 @@ private String readString() throws IOException {
while (true) {
String subline = processOut.readLine();
if(subline==null)
throw new RuntimeException("Pipe to subprocess seems to be broken!");
throw new RuntimeException("Pipe to subprocess seems to be broken! Currently read output: " + line.toString());
if(subline.equals("end")) {
break;
}
Expand Down

0 comments on commit 3e048a1

Please sign in to comment.