Skip to content

Commit

Permalink
Adding v0.0.3
Browse files Browse the repository at this point in the history
 - Refactoring some names like: next_workers
  • Loading branch information
dperezrada committed Jul 15, 2015
1 parent c86e115 commit f13aec0
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
14 changes: 8 additions & 6 deletions poliglo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ def write_outputs(connection, data, process_data, worker_script_data):
data = prepare_write_output(data, process_data, data['process']['worker_id'])
update_process(connection, data['process']['type'], data['process']['id'])
pipe = connection.pipeline()
workers_outputs_types = worker_script_data.get('__outputs_types', [])
for i, output_worker_id in enumerate(worker_script_data.get('outputs', [])):
workers_outputs_types = worker_script_data.get('__next_workers_types', [])
for i, output_worker_id in enumerate(worker_script_data.get('next_workers', [])):
write_one_output(connection, workers_outputs_types[i], output_worker_id, data)
pipe.execute()

Expand Down Expand Up @@ -233,9 +233,9 @@ def default_main_inside(connection, worker_scripts, queue_message, process_func,
nodata = False
if not process_data:
process_data = {}
if process_data.get('__outputs'):
worker_script_data['outputs'] = process_data.get('__outputs', [])
if len(worker_script_data.get('outputs', [])) == 0:
if process_data.get('__next_workers'):
worker_script_data['next_workers'] = process_data.get('__next_workers', [])
if len(worker_script_data.get('next_workers', [])) == 0:
write_finalized_job(data, process_data, worker_id, connection)
continue
write_outputs(connection, data, process_data, worker_script_data)
Expand Down Expand Up @@ -264,10 +264,12 @@ def pre_default_main_inside(
def default_main(master_mind_url, worker_type, process_func, *args, **kwargs):
worker_scripts, connection = prepare_worker(master_mind_url, worker_type)
if os.environ.get('TRY_INPUT'):
import pprint
script_id = os.environ.get('SCRIPT_ID')
raw_data = open(os.environ.get('TRY_INPUT')).read()
data = get_job_data(raw_data)
print list(process_func(worker_scripts.get(script_id), data, *args, **kwargs))
worker_script_data = get_worker_script_data(worker_scripts, data, data['process']['worker_id'])
pprint.pprint(list(process_func(worker_script_data, data, *args, **kwargs)))
return None
print ' [*] Waiting for data. To exit press CTRL+C'
while True:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages
import sys, os

version = '0.0.2'
version = '0.0.3'

setup(name='poliglo',
version=version,
Expand Down
16 changes: 8 additions & 8 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def setUp(self):
"template_file": "workers_output.assign_variable.template_file"
}
},
"outputs": ["worker1"]
"next_workers": ["worker1"]
}
self.data = {
'workers_output': {
Expand Down Expand Up @@ -146,8 +146,8 @@ def setUp(self):
}
self.process_data = {'message': 'hello'}
self.worker_script_data = {
'__outputs_types': ['write'],
'outputs': ['worker_2']
'__next_workers_types': ['write'],
'next_workers': ['worker_2']
}
self.connection = Mock()

Expand All @@ -165,7 +165,7 @@ def test_set_process_variables(self, mock_add_data_to_next_worker):
data_for_next_worker = json_loads(mock_add_data_to_next_worker.call_args[0][2])
self.assertEqual([self.worker_id, ], data_for_next_worker['process']['workers'])
self.assertEqual(
self.worker_script_data['outputs'][0],
self.worker_script_data['next_workers'][0],
data_for_next_worker['process']['worker_id']
)

Expand Down Expand Up @@ -252,8 +252,8 @@ def setUp(self):
self.worker_scripts = {
'example_process': {
'worker_1': {
'__outputs_types': ['worker'],
'outputs': ['worker_2']
'__next_workers_types': ['worker'],
'next_workers': ['worker_2']
}
}
}
Expand All @@ -275,13 +275,13 @@ def my_func(_, data):
def test_with_process_change_output(self, write_outputs_mock):
def my_func(_, data):
return [
{'value': data['process']['id'], '__outputs': ['worker_3']},
{'value': data['process']['id'], '__next_workers': ['worker_3']},
]
poliglo.pre_default_main_inside(
self.connection, self.worker_scripts, self.worker_type, my_func
)
worker_script_data = write_outputs_mock.call_args[0][3]
self.assertEqual(['worker_3',], worker_script_data['outputs'])
self.assertEqual(['worker_3',], worker_script_data['next_workers'])

@patch('poliglo.write_outputs')
def test_with_no_process_data(self, write_outputs_mock):
Expand Down

0 comments on commit f13aec0

Please sign in to comment.