Skip to content

Commit

Permalink
test for basicbolt
Browse files Browse the repository at this point in the history
  • Loading branch information
kbourgoin committed Jun 18, 2014
1 parent c67d4ce commit 557f5a2
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 1 deletion.
1 change: 1 addition & 0 deletions dev-requirements.txt
Expand Up @@ -2,3 +2,4 @@ pyflakes
pep8
nose
sphinx_rtd_theme
mock
2 changes: 1 addition & 1 deletion streamparse/bolt.py
Expand Up @@ -142,7 +142,7 @@ class BasicBolt(Bolt):

def emit(self, tup, stream=None, anchors=[], direct_task=None):
"""Override to anchor to the current tuple if no anchors are specified"""
anchors = anchors of [self.__current_tup]
anchors = anchors or [self.__current_tup]
super(BasicBolt, self).emit(
tup, stream=stream, anchors=anchors, direct_task=direct_task
)
Expand Down
19 changes: 19 additions & 0 deletions test/ipc/dummy_basic_bolt.py
@@ -0,0 +1,19 @@
import sys
import os

here = os.path.split(os.path.abspath(__file__))[0]
root = os.path.abspath(os.path.join(here, '../../'))
sys.path[0:0] = [root]

from streamparse.bolt import BasicBolt


class DummyBolt(BasicBolt):

def process(self, tup):
if tup.id == "emit":
self.emit(tup.values)


if __name__ == '__main__':
DummyBolt().run()
80 changes: 80 additions & 0 deletions test/ipc/test_basic_bolt.py
@@ -0,0 +1,80 @@
from __future__ import print_function, absolute_import
import subprocess
import unittest
import os
import sys
import time

from util import ShellProcess


_ROOT = os.path.dirname(os.path.realpath(__file__))
def here(*x):
return os.path.join(_ROOT, *x)


class BasicBoltTester(unittest.TestCase):

@classmethod
def setUpClass(cls):
args = ["python", here("dummy_basic_bolt.py")]
cls.proc = subprocess.Popen(args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
print("Waiting for subprocess to start...")
time.sleep(1) # time for the subprocess to start
if cls.proc.poll() is not None:
raise Exception("Could not create subprocess.\n{}"
.format("".join(cls.proc.stderr.readlines())))
cls.shell_proc = ShellProcess(cls.proc.stdout, cls.proc.stdin)

def test_1_initial_handshake(self):
msg = {
"conf": {},
"context": {},
"pidDir": here()
}
BasicBoltTester.shell_proc.write_message(msg)
res = BasicBoltTester.shell_proc.read_message()

self.assertIsInstance(res, dict)
self.assertEqual(res.get("pid"), BasicBoltTester.proc.pid)
pid = str(res["pid"])
self.assertTrue(os.path.exists(here(pid)))
self.assertTrue(os.path.isfile(here(pid)))

def test_2_auto_ack(self):
msg = {
"id": "noop",
"comp": "word-spout",
"stream": "default",
"task": 0,
"tuple": ["snow white and the seven dwarfs", "field2", 3, 4.252]
}
BasicBoltTester.shell_proc.write_message(msg)
res = BasicBoltTester.shell_proc.read_message()
self.assertEqual(res, {"command": "ack", "id": msg["id"]})

def test_3_auto_anchor(self):
msg = {
"id": "emit",
"comp": "word-spout",
"stream": "default",
"task": 0,
"tuple": ["snow white and the seven dwarfs", "field2", 3, 4.252]
}

BasicBoltTester.shell_proc.write_message(msg)
res = BasicBoltTester.shell_proc.read_message()
self.assertEqual(res.get("command"), "emit")
self.assertEqual(msg["tuple"], res.get("tuple"))
self.assertEqual([msg["id"],], res.get("anchors"))


@classmethod
def tearDownClass(cls):
os.remove(here(str(cls.proc.pid)))
cls.proc.kill()

if __name__ == '__main__':
unittest.main()

0 comments on commit 557f5a2

Please sign in to comment.