From 1ef04f56b805f003d172024f7a8e7cc84a0e0b09 Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 10 Sep 2014 11:32:28 -0700 Subject: [PATCH] fix platform-dependent paths for Windows build --- .../clj/backtype/storm/daemon/supervisor.clj | 4 +- storm-core/src/dev/resources/storm.py | 248 +++++++++++++++++- storm-core/src/dev/resources/storm.rb | 228 +++++++++++++++- .../clj/backtype/storm/supervisor_test.clj | 12 +- 4 files changed, 482 insertions(+), 10 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 9c6e3f8240a..1f96a5382e3 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -502,7 +502,7 @@ :distributed [supervisor storm-id port worker-id] (let [conf (:conf supervisor) storm-home (System/getProperty "storm.home") - storm-log-dir (or (System/getProperty "storm.log.dir") (str storm-home "/logs")) + storm-log-dir (or (System/getProperty "storm.log.dir") (str storm-home file-path-separator "logs")) stormroot (supervisor-stormdist-root conf storm-id) jlp (jlp stormroot conf) stormjar (supervisor-stormjar-path stormroot) @@ -529,7 +529,7 @@ (str "-Dlogfile.name=" logfilename) (str "-Dstorm.home=" storm-home) (str "-Dstorm.log.dir=" storm-log-dir) - (str "-Dlogback.configurationFile=" storm-home "/logback/cluster.xml") + (str "-Dlogback.configurationFile=" storm-home file-path-separator "logback" file-path-separator "cluster.xml") (str "-Dstorm.id=" storm-id) (str "-Dworker.id=" worker-id) (str "-Dworker.port=" port) diff --git a/storm-core/src/dev/resources/storm.py b/storm-core/src/dev/resources/storm.py index 5e7311133f5..e143974072f 120000 --- a/storm-core/src/dev/resources/storm.py +++ b/storm-core/src/dev/resources/storm.py @@ -1 +1,247 @@ -../../multilang/py/storm.py \ No newline at end of file +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import os +import traceback +from collections import deque + +try: + import simplejson as json +except ImportError: + import json + +json_encode = lambda x: json.dumps(x) +json_decode = lambda x: json.loads(x) + +#reads lines and reconstructs newlines appropriately +def readMsg(): + msg = "" + while True: + line = sys.stdin.readline() + if not line: + raise Exception('Read EOF from stdin') + if line[0:-1] == "end": + break + msg = msg + line + return json_decode(msg[0:-1]) + +MODE = None +ANCHOR_TUPLE = None + +#queue up commands we read while trying to read taskids +pending_commands = deque() + +def readTaskIds(): + if pending_taskids: + return pending_taskids.popleft() + else: + msg = readMsg() + while type(msg) is not list: + pending_commands.append(msg) + msg = readMsg() + return msg + +#queue up taskids we read while trying to read commands/tuples +pending_taskids = deque() + +def readCommand(): + if pending_commands: + return pending_commands.popleft() + else: + msg = readMsg() + while type(msg) is list: + pending_taskids.append(msg) + msg = readMsg() + return msg + +def readTuple(): + cmd = readCommand() + return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"]) + +def sendMsgToParent(msg): + print json_encode(msg) + print "end" + sys.stdout.flush() + +def sync(): + sendMsgToParent({'command':'sync'}) + +def sendpid(heartbeatdir): + pid = os.getpid() + sendMsgToParent({'pid':pid}) + open(heartbeatdir + "/" + str(pid), "w").close() + +def emit(*args, **kwargs): + __emit(*args, **kwargs) + return readTaskIds() + +def emitDirect(task, *args, **kwargs): + kwargs["directTask"] = task + __emit(*args, **kwargs) + +def __emit(*args, **kwargs): + global MODE + if MODE == Bolt: + emitBolt(*args, **kwargs) + elif MODE == Spout: + emitSpout(*args, **kwargs) + +def emitBolt(tup, stream=None, anchors = [], directTask=None): + global ANCHOR_TUPLE + if ANCHOR_TUPLE is not None: + anchors = [ANCHOR_TUPLE] + m = {"command": "emit"} + if stream is not None: + m["stream"] = stream + m["anchors"] = map(lambda a: a.id, anchors) + if directTask is not None: + m["task"] = directTask + m["tuple"] = tup + sendMsgToParent(m) + +def emitSpout(tup, stream=None, id=None, directTask=None): + m = {"command": "emit"} + if id is not None: + m["id"] = id + if stream is not None: + m["stream"] = stream + if directTask is not None: + m["task"] = directTask + m["tuple"] = tup + sendMsgToParent(m) + +def ack(tup): + sendMsgToParent({"command": "ack", "id": tup.id}) + +def fail(tup): + sendMsgToParent({"command": "fail", "id": tup.id}) + +def reportError(msg): + sendMsgToParent({"command": "error", "msg": msg}) + +def log(msg, level=2): + sendMsgToParent({"command": "log", "msg": msg, "level":level}) + +def logTrace(msg): + log(msg, 0) + +def logDebug(msg): + log(msg, 1) + +def logInfo(msg): + log(msg, 2) + +def logWarn(msg): + log(msg, 3) + +def logError(msg): + log(msg, 4) + +def rpcMetrics(name, params): + sendMsgToParent({"command": "metrics", "name": name, "params": params}) + +def initComponent(): + setupInfo = readMsg() + sendpid(setupInfo['pidDir']) + return [setupInfo['conf'], setupInfo['context']] + +class Tuple(object): + def __init__(self, id, component, stream, task, values): + self.id = id + self.component = component + self.stream = stream + self.task = task + self.values = values + + def __repr__(self): + return '<%s%s>' % ( + self.__class__.__name__, + ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) + +class Bolt(object): + def initialize(self, stormconf, context): + pass + + def process(self, tuple): + pass + + def run(self): + global MODE + MODE = Bolt + conf, context = initComponent() + try: + self.initialize(conf, context) + while True: + tup = readTuple() + self.process(tup) + except Exception, e: + reportError(traceback.format_exc(e)) + +class BasicBolt(object): + def initialize(self, stormconf, context): + pass + + def process(self, tuple): + pass + + def run(self): + global MODE + MODE = Bolt + global ANCHOR_TUPLE + conf, context = initComponent() + try: + self.initialize(conf, context) + while True: + tup = readTuple() + ANCHOR_TUPLE = tup + self.process(tup) + ack(tup) + except Exception, e: + reportError(traceback.format_exc(e)) + +class Spout(object): + def initialize(self, conf, context): + pass + + def ack(self, id): + pass + + def fail(self, id): + pass + + def nextTuple(self): + pass + + def run(self): + global MODE + MODE = Spout + conf, context = initComponent() + try: + self.initialize(conf, context) + while True: + msg = readCommand() + if msg["command"] == "next": + self.nextTuple() + if msg["command"] == "ack": + self.ack(msg["id"]) + if msg["command"] == "fail": + self.fail(msg["id"]) + sync() + except Exception, e: + reportError(traceback.format_exc(e)) diff --git a/storm-core/src/dev/resources/storm.rb b/storm-core/src/dev/resources/storm.rb index 96db018a4aa..c632fb3425b 120000 --- a/storm-core/src/dev/resources/storm.rb +++ b/storm-core/src/dev/resources/storm.rb @@ -1 +1,227 @@ -../../multilang/rb/storm.rb \ No newline at end of file +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require "rubygems" +require "json" + +module Storm + module Protocol + class << self + attr_accessor :mode, :pending_taskids, :pending_commands + end + + self.pending_taskids = [] + self.pending_commands = [] + + def read_message + msg = "" + loop do + line = STDIN.readline.chomp + break if line == "end" + msg << line + msg << "\n" + end + JSON.parse msg.chomp + end + + def read_task_ids + Storm::Protocol.pending_taskids.shift || + begin + msg = read_message + until msg.is_a? Array + Storm::Protocol.pending_commands.push(msg) + msg = read_message + end + msg + end + end + + def read_command + Storm::Protocol.pending_commands.shift || + begin + msg = read_message + while msg.is_a? Array + Storm::Protocol.pending_taskids.push(msg) + msg = read_message + end + msg + end + end + + def send_msg_to_parent(msg) + puts msg.to_json + puts "end" + STDOUT.flush + end + + def sync + send_msg_to_parent({'command' => 'sync'}) + end + + def send_pid(heartbeat_dir) + pid = Process.pid + send_msg_to_parent({'pid' => pid}) + File.open("#{heartbeat_dir}/#{pid}", "w").close + end + + def emit_bolt(tup, args = {}) + stream = args[:stream] + anchors = args[:anchors] || args[:anchor] || [] + anchors = [anchors] unless anchors.is_a? Enumerable + direct = args[:direct_task] + m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup} + m[:stream] = stream if stream + m[:task] = direct if direct + send_msg_to_parent m + read_task_ids unless direct + end + + def emit_spout(tup, args = {}) + stream = args[:stream] + id = args[:id] + direct = args[:direct_task] + m = {:command => :emit, :tuple => tup} + m[:id] = id if id + m[:stream] = stream if stream + m[:task] = direct if direct + send_msg_to_parent m + read_task_ids unless direct + end + + def emit(*args) + case Storm::Protocol.mode + when 'spout' + emit_spout(*args) + when 'bolt' + emit_bolt(*args) + end + end + + def ack(tup) + send_msg_to_parent :command => :ack, :id => tup.id + end + + def fail(tup) + send_msg_to_parent :command => :fail, :id => tup.id + end + + def reportError(msg) + send_msg_to_parent :command => :error, :msg => msg.to_s + end + + def log(msg, level=2) + send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level + end + + def logTrace(msg) + log(msg, 0) + end + + def logDebug(msg) + log(msg, 1) + end + + def logInfo(msg) + log(msg, 2) + end + + def logWarn(msg) + log(msg, 3) + end + + def logError(msg) + log(msg, 4) + end + + def handshake + setup_info = read_message + send_pid setup_info['pidDir'] + [setup_info['conf'], setup_info['context']] + end + end + + class Tuple + attr_accessor :id, :component, :stream, :task, :values + + def initialize(id, component, stream, task, values) + @id = id + @component = component + @stream = stream + @task = task + @values = values + end + + def self.from_hash(hash) + Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple")) + end + end + + class Bolt + include Storm::Protocol + + def prepare(conf, context); end + + def process(tuple); end + + def run + Storm::Protocol.mode = 'bolt' + prepare(*handshake) + begin + while true + process Tuple.from_hash(read_command) + end + rescue Exception => e + reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n') + end + end + end + + class Spout + include Storm::Protocol + + def open(conf, context); end + + def nextTuple; end + + def ack(id); end + + def fail(id); end + + def run + Storm::Protocol.mode = 'spout' + open(*handshake) + + begin + while true + msg = read_command + case msg['command'] + when 'next' + nextTuple + when 'ack' + ack(msg['id']) + when 'fail' + fail(msg['id']) + end + sync + end + rescue Exception => e + reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n') + end + end + end +end diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj index 5abfa8905a9..f98eeed1557 100644 --- a/storm-core/test/clj/backtype/storm/supervisor_test.clj +++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj @@ -247,7 +247,7 @@ (let [mock-port "42" mock-storm-id "fake-storm-id" mock-worker-id "fake-worker-id" - mock-cp "/base:/stormjar.jar" + mock-cp (str file-path-separator "base" class-path-separator file-path-separator "stormjar.jar") exp-args-fn (fn [opts topo-opts classpath] (concat [(supervisor/java-cmd) "-server"] opts @@ -255,8 +255,8 @@ ["-Djava.library.path=" (str "-Dlogfile.name=worker-" mock-port ".log") "-Dstorm.home=" - "-Dstorm.log.dir=/logs" - "-Dlogback.configurationFile=/logback/cluster.xml" + (str "-Dstorm.log.dir=" file-path-separator "logs") + (str "-Dlogback.configurationFile=" file-path-separator "logback" file-path-separator "cluster.xml") (str "-Dstorm.id=" mock-storm-id) (str "-Dworker.id=" mock-worker-id) (str "-Dworker.port=" mock-port) @@ -306,14 +306,14 @@ [0] exp-args)))) (testing "testing topology.classpath is added to classpath" - (let [topo-cp "/any/path" + (let [topo-cp (str file-path-separator "any" file-path-separator "path") exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp])) mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}] (stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp} supervisor-stormdist-root nil supervisor/jlp nil launch-process nil - current-classpath "/base"] + current-classpath (str file-path-separator "base")] (supervisor/launch-worker mock-supervisor mock-storm-id mock-port @@ -329,7 +329,7 @@ supervisor-stormdist-root nil supervisor/jlp nil launch-process nil - current-classpath "/base"] + current-classpath (str file-path-separator "base")] (supervisor/launch-worker mock-supervisor mock-storm-id mock-port