From ca697785338aac67bf71c659112fca932b3f649d Mon Sep 17 00:00:00 2001 From: Brett Slatkin Date: Mon, 18 Feb 2013 15:31:56 -0800 Subject: [PATCH] Updated to use PEP8 styling --- LICENSE | 202 ++++++++ client/capture.js | 46 +- client/capture.py | 94 ++-- client/capture_worker.py | 59 +-- client/config.js | 10 +- client/pdiff_worker.py | 64 +-- client/site_diff.py | 422 ++++++++--------- client/tests/site_diff_test.py | 541 +++++++++++----------- client/tests/workers_test.py | 150 +++--- client/workers.py | 813 +++++++++++++++++---------------- server/dpxdt/__init__.py | 6 +- server/dpxdt/api.py | 495 ++++++++++---------- server/dpxdt/utils.py | 48 +- server/dpxdt/work_queue.py | 399 ++++++++-------- server/runserver.py | 8 +- 15 files changed, 1788 insertions(+), 1569 deletions(-) create mode 100644 LICENSE diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/client/capture.js b/client/capture.js index 839585a..947d11e 100644 --- a/client/capture.js +++ b/client/capture.js @@ -1,12 +1,12 @@ /* * Copyright 2013 Brett Slatkin - + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - + * * http://www.apache.org/licenses/LICENSE-2.0 - + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,34 +22,34 @@ var system = require('system'); var configPath = null; var outputPath = null; if (system.args.length == 3) { - configPath = system.args[1]; - outputPath = system.args[2]; + configPath = system.args[1]; + outputPath = system.args[2]; } else { - console.log('Usage: phantomjs capture.js '); - phantom.exit(1); + console.log('Usage: phantomjs capture.js '); + phantom.exit(1); } try { - var config = JSON.parse(fs.read(configPath)); + var config = JSON.parse(fs.read(configPath)); } catch (e) { - console.log('Could not read config at "' + configPath + '":\n' + e); - phantom.exit(1); + console.log('Could not read config at "' + configPath + '":\n' + e); + phantom.exit(1); } ['targetUrl'].forEach(function(field) { - if (!config[field]) { - console.log('Missing required field: ' + field); - phantom.exit(1); - } + if (!config[field]) { + console.log('Missing required field: ' + field); + phantom.exit(1); + } }); // Configure the page. var page = require('webpage').create(); if (config.viewportSize) { - page.viewportSize = { - width: config.viewportSize.width, - height: config.viewportSize.height - }; + page.viewportSize = { + width: config.viewportSize.width, + height: config.viewportSize.height + }; } // TODO: Custom headers @@ -62,9 +62,9 @@ if (config.viewportSize) { // Screenshot page.open(config.targetUrl, function(status) { - // Inject code - // Wait for completion - // Check status - page.render(outputPath); - phantom.exit(0); + // Inject code + // Wait for completion + // Check status + page.render(outputPath); + phantom.exit(0); }); diff --git a/client/capture.py b/client/capture.py index 1719713..bbbd40f 100755 --- a/client/capture.py +++ b/client/capture.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - +# # http://www.apache.org/licenses/LICENSE-2.0 - +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -29,59 +29,59 @@ class PdiffWorkflow(workers.WorkflowItem): - """TODO""" - - def run(self): - ref = yield workers.CaptureItem( - '/tmp/test_ref.log', - '/Users/bslatkin/projects/hostedpdiff/client/config.js', - '/tmp/test_ref.png') - run = yield workers.CaptureItem( - '/tmp/test_run.log', - '/Users/bslatkin/projects/hostedpdiff/client/config.js', - '/tmp/test_run.png') - diff = yield workers.DiffItem( - '/tmp/test_diff.log', ref.output_path, run.output_path, - '/tmp/test_diff.png') - # If the diff has an error and the output file is present, then - # we found a pixel diff. Otherwise if there is no error there is no diff. + """TODO""" + + def run(self): + ref = yield workers.CaptureItem( + '/tmp/test_ref.log', + '/Users/bslatkin/projects/hostedpdiff/client/config.js', + '/tmp/test_ref.png') + run = yield workers.CaptureItem( + '/tmp/test_run.log', + '/Users/bslatkin/projects/hostedpdiff/client/config.js', + '/tmp/test_run.png') + diff = yield workers.DiffItem( + '/tmp/test_diff.log', ref.output_path, run.output_path, + '/tmp/test_diff.png') + # If the diff has an error and the output file is present, then + # we found a pixel diff. Otherwise if there is no error there is no diff. def main(argv): - try: - argv = FLAGS(argv) - except gflags.FlagsError, e: - print '%s\nUsage: %s ARGS\n%s' % (e, sys.argv[0], FLAGS) - sys.exit(1) + try: + argv = FLAGS(argv) + except gflags.FlagsError, e: + print '%s\nUsage: %s ARGS\n%s' % (e, sys.argv[0], FLAGS) + sys.exit(1) - logging.getLogger().setLevel(logging.DEBUG) + logging.getLogger().setLevel(logging.DEBUG) - capture_queue = Queue.Queue() - diff_queue = Queue.Queue() - workflow_queue = Queue.Queue() - complete_queue = Queue.Queue() + capture_queue = Queue.Queue() + diff_queue = Queue.Queue() + workflow_queue = Queue.Queue() + complete_queue = Queue.Queue() - coordinator = workers.WorkflowThread(workflow_queue, complete_queue) - coordinator.register(workers.CaptureItem, capture_queue) - coordinator.register(workers.DiffItem, diff_queue) + coordinator = workers.WorkflowThread(workflow_queue, complete_queue) + coordinator.register(workers.CaptureItem, capture_queue) + coordinator.register(workers.DiffItem, diff_queue) - worker_threads = [ - coordinator, - workers.CaptureThread(capture_queue, workflow_queue), - workers.DiffThread(diff_queue, workflow_queue), - ] - for thread in worker_threads: - thread.start() + worker_threads = [ + coordinator, + workers.CaptureThread(capture_queue, workflow_queue), + workers.DiffThread(diff_queue, workflow_queue), + ] + for thread in worker_threads: + thread.start() - # Add in pdiff workers for new jobs - # Retire them on error + # Add in pdiff workers for new jobs + # Retire them on error - item = PdiffWorkflow() - workflow_queue.put(item) - result = complete_queue.get() - if result.error: - raise result.error[0], result.error[1], result.error[2] + item = PdiffWorkflow() + workflow_queue.put(item) + result = complete_queue.get() + if result.error: + raise result.error[0], result.error[1], result.error[2] if __name__ == '__main__': - main(sys.argv) + main(sys.argv) diff --git a/client/capture_worker.py b/client/capture_worker.py index 029620a..9aa885b 100644 --- a/client/capture_worker.py +++ b/client/capture_worker.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - +# # http://www.apache.org/licenses/LICENSE-2.0 - +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -42,38 +42,39 @@ class CaptureItem(workers.ProcessItem): - """Work item for capturing a website screenshot using PhantomJs.""" + """Work item for capturing a website screenshot using PhantomJs.""" - def __init__(self, log_path, config_path, output_path): - """Initializer. + def __init__(self, log_path, config_path, output_path): + """Initializer. - Args: - log_path: Where to write the verbose logging output. - config_path: Path to the screenshot config file to pass to PhantomJs. - output_path: Where the output screenshot should be written. - """ - workers.ProcessItem.__init__(self, log_path) - self.config_path = config_path - self.output_path = output_path + Args: + log_path: Where to write the verbose logging output. + config_path: Path to the screenshot config file to pass + to PhantomJs. + output_path: Where the output screenshot should be written. + """ + workers.ProcessItem.__init__(self, log_path) + self.config_path = config_path + self.output_path = output_path class CaptureThread(workers.ProcessThread): - """Worker thread that runs PhantomJs.""" + """Worker thread that runs PhantomJs.""" - def get_args(self, item): - return [ - FLAGS.phantomjs_binary, - '--disk-cache=false', - '--debug=true', - FLAGS.phantomjs_script, - item.config_path, - item.output_path, - ] + def get_args(self, item): + return [ + FLAGS.phantomjs_binary, + '--disk-cache=false', + '--debug=true', + FLAGS.phantomjs_script, + item.config_path, + item.output_path, + ] def register(coordinator): - """Registers this module as a worker with the given coordinator.""" - capture_queue = Queue.Queue() - coordinator.register(CaptureItem, capture_queue) - coordinator.worker_threads.append( - CaptureThread(capture_queue, coordinator.input_queue)) + """Registers this module as a worker with the given coordinator.""" + capture_queue = Queue.Queue() + coordinator.register(CaptureItem, capture_queue) + coordinator.worker_threads.append( + CaptureThread(capture_queue, coordinator.input_queue)) diff --git a/client/config.js b/client/config.js index b76b8fa..f18a075 100644 --- a/client/config.js +++ b/client/config.js @@ -1,7 +1,7 @@ { - "targetUrl": "http://www.google.com", - "viewportSize": { - "width": 1024, - "height": 768 - } + "targetUrl": "http://www.google.com", + "viewportSize": { + "width": 1024, + "height": 768 + } } diff --git a/client/pdiff_worker.py b/client/pdiff_worker.py index 69d1fad..10d1d68 100644 --- a/client/pdiff_worker.py +++ b/client/pdiff_worker.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - +# # http://www.apache.org/licenses/LICENSE-2.0 - +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -39,41 +39,41 @@ class PdiffItem(workers.ProcessItem): - """Work item for doing perceptual diffs using pdiff.""" + """Work item for doing perceptual diffs using pdiff.""" - def __init__(self, log_path, ref_path, run_path, output_path): - """Initializer. + def __init__(self, log_path, ref_path, run_path, output_path): + """Initializer. - Args: - log_path: Where to write the verbose logging output. - ref_path: Path to reference screenshot to diff. - run_path: Path to the most recent run screenshot to diff. - output_path: Where the diff image should be written, if any. - """ - workers.ProcessItem.__init__(self, log_path) - self.ref_path = ref_path - self.run_path = run_path - self.output_path = output_path + Args: + log_path: Where to write the verbose logging output. + ref_path: Path to reference screenshot to diff. + run_path: Path to the most recent run screenshot to diff. + output_path: Where the diff image should be written, if any. + """ + workers.ProcessItem.__init__(self, log_path) + self.ref_path = ref_path + self.run_path = run_path + self.output_path = output_path class PdiffThread(workers.ProcessThread): - """Worker thread that runs pdiff.""" + """Worker thread that runs pdiff.""" - def get_args(self, item): - return [ - FLAGS.pdiff_binary, - '-fov', - '85', - '-output', - item.output_path, - item.ref_path, - item.run_path, - ] + def get_args(self, item): + return [ + FLAGS.pdiff_binary, + '-fov', + '85', + '-output', + item.output_path, + item.ref_path, + item.run_path, + ] def register(coordinator): - """Registers this module as a worker with the given coordinator.""" - pdiff_queue = Queue.Queue() - coordinator.register(PdiffItem, pdiff_queue) - coordinator.worker_threads.append( - PdiffThread(pdiff_queue, coordinator.input_queue)) + """Registers this module as a worker with the given coordinator.""" + pdiff_queue = Queue.Queue() + coordinator.register(PdiffItem, pdiff_queue) + coordinator.worker_threads.append( + PdiffThread(pdiff_queue, coordinator.input_queue)) diff --git a/client/site_diff.py b/client/site_diff.py index aa185a1..d677685 100755 --- a/client/site_diff.py +++ b/client/site_diff.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - +# # http://www.apache.org/licenses/LICENSE-2.0 - +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,11 +18,11 @@ Example usage: ./site_diff.py \ - --phantomjs_binary=path/to/phantomjs-1.8.1-macosx/bin/phantomjs \ - --phantomjs_script=path/to/client/capture.js \ - --pdiff_binary=path/to/pdiff/perceptualdiff \ - --output_dir=path/to/your/output \ - http://www.example.com/my/website/here + --phantomjs_binary=path/to/phantomjs-1.8.1-macosx/bin/phantomjs \ + --phantomjs_script=path/to/client/capture.js \ + --pdiff_binary=path/to/pdiff/perceptualdiff \ + --output_dir=path/to/your/output \ + http://www.example.com/my/website/here """ @@ -67,10 +67,10 @@ class Error(Exception): - """Base class for exceptions in this module.""" + """Base class for exceptions in this module.""" class CaptureFailedError(Error): - """Capturing a page screenshot failed.""" + """Capturing a page screenshot failed.""" # URL regex rewriting code originally from mirrorrr @@ -98,71 +98,72 @@ class CaptureFailedError(Error): TAG_START + r"(?P(http(s?):)?//[^\"'> \t]+)") REPLACEMENT_REGEXES = [ - (TAG_START + SAME_DIR_URL_REGEX, + (TAG_START + SAME_DIR_URL_REGEX, "\g\g\g%(accessed_dir)s\g"), - (TAG_START + TRAVERSAL_URL_REGEX, + (TAG_START + TRAVERSAL_URL_REGEX, "\g\g\g%(accessed_dir)s/\g/\g"), - (TAG_START + BASE_RELATIVE_URL_REGEX, + (TAG_START + BASE_RELATIVE_URL_REGEX, "\g\g\g%(base)s/\g"), - (TAG_START + ROOT_DIR_URL_REGEX, + (TAG_START + ROOT_DIR_URL_REGEX, "\g\g\g%(base)s/"), - (TAG_START + ABSOLUTE_URL_REGEX, + (TAG_START + ABSOLUTE_URL_REGEX, "\g\g\g\g"), ] def clean_url(url, force_scheme=None): - """Cleans the given URL.""" - # Collapse ../../ and related - url_parts = urlparse.urlparse(url) - path_parts = [] - for part in url_parts.path.split('/'): - if part == '.': - continue - elif part == '..': - if path_parts: - path_parts.pop() - else: - path_parts.append(part) - - url_parts = list(url_parts) - if force_scheme: - url_parts[0] = force_scheme - url_parts[2] = '/'.join(path_parts) - url_parts[4] = '' # No query string - url_parts[5] = '' # No path - - # Always have a trailing slash - if not url_parts[2]: - url_parts[2] = '/' - - return urlparse.urlunparse(url_parts) + """Cleans the given URL.""" + # Collapse ../../ and related + url_parts = urlparse.urlparse(url) + path_parts = [] + for part in url_parts.path.split('/'): + if part == '.': + continue + elif part == '..': + if path_parts: + path_parts.pop() + else: + path_parts.append(part) + + url_parts = list(url_parts) + if force_scheme: + url_parts[0] = force_scheme + url_parts[2] = '/'.join(path_parts) + url_parts[4] = '' # No query string + url_parts[5] = '' # No path + + # Always have a trailing slash + if not url_parts[2]: + url_parts[2] = '/' + + return urlparse.urlunparse(url_parts) def extract_urls(url, data, unescape=HTMLParser.HTMLParser().unescape): - """Extracts the URLs from an HTML document.""" - parts = urlparse.urlparse(url) - prefix = '%s://%s' % (parts.scheme, parts.netloc) + """Extracts the URLs from an HTML document.""" + parts = urlparse.urlparse(url) + prefix = '%s://%s' % (parts.scheme, parts.netloc) - accessed_dir = os.path.dirname(parts.path) - if not accessed_dir.endswith('/'): - accessed_dir += '/' + accessed_dir = os.path.dirname(parts.path) + if not accessed_dir.endswith('/'): + accessed_dir += '/' - for pattern, replacement in REPLACEMENT_REGEXES: - fixed = replacement % { - 'base': prefix, - 'accessed_dir': accessed_dir, - } - data = re.sub(pattern, fixed, data) + for pattern, replacement in REPLACEMENT_REGEXES: + fixed = replacement % { + 'base': prefix, + 'accessed_dir': accessed_dir, + } + data = re.sub(pattern, fixed, data) - result = set() - for match in re.finditer(MAYBE_HTML_URL_REGEX, data): - found_url = unescape(match.groupdict()['absurl']) - # Use the main page's scheme - found_url = clean_url(found_url, force_scheme=parts[0]) - result.add(found_url) + result = set() + for match in re.finditer(MAYBE_HTML_URL_REGEX, data): + found_url = unescape(match.groupdict()['absurl']) + found_url = clean_url( + found_url, + force_scheme=parts[0]) # Use the main page's scheme + result.add(found_url) - return result + return result IGNORE_SUFFIXES = frozenset([ @@ -170,193 +171,192 @@ def extract_urls(url, data, unescape=HTMLParser.HTMLParser().unescape): def prune_urls(url_set, start_url, allowed_list, ignored_list): - """Prunes URLs that should be ignored.""" - result = set() + """Prunes URLs that should be ignored.""" + result = set() - for url in url_set: - allowed = False - for allow_url in allowed_list: - if url.startswith(allow_url): - allowed = True - break + for url in url_set: + allowed = False + for allow_url in allowed_list: + if url.startswith(allow_url): + allowed = True + break - if not allowed: - continue + if not allowed: + continue - ignored = False - for ignore_url in ignored_list: - if url.startswith(ignore_url): - ignored = True - break + ignored = False + for ignore_url in ignored_list: + if url.startswith(ignore_url): + ignored = True + break - if ignored: - continue + if ignored: + continue - prefix, suffix = (url.rsplit('.', 1) + [''])[:2] - if suffix.lower() in IGNORE_SUFFIXES: - continue + prefix, suffix = (url.rsplit('.', 1) + [''])[:2] + if suffix.lower() in IGNORE_SUFFIXES: + continue - result.add(url) + result.add(url) - return result + return result class PdiffWorkflow(workers.WorkflowItem): - """Workflow for generating Pdiffs.""" - - def run(self, url, output_dir, reference_dir): - parts = urlparse.urlparse(url) - clean_url = ( - parts.path.replace('/', '_').replace('\\', '_') - .replace(':', '_').replace('.', '_')) - output_base = os.path.join(output_dir, clean_url) - - config_path = output_base + '_config.js' - with open(config_path, 'w') as config_file: - # TODO: Take the base config from a standard file or flags. - config_file.write(json.dumps({ - 'targetUrl': url, - 'viewportSize': { - 'width': 1024, - 'height': 768, - } - })) - - capture = yield capture_worker.CaptureItem( - output_base + '_run.log', - config_path, - output_base + '_run.png') - - if capture.returncode != 0: - raise CaptureFailedError('Failed to capture url=%r' % url) - - print 'Captured: %s' % url - - if not reference_dir: - return - - ref_base = os.path.join(reference_dir, clean_url) - last_run = ref_base + '_run.png' - if not os.path.exists(last_run): - return - - last_log = ref_base + '_run.log' - if not os.path.exists(last_log): - return - - ref_output = output_base + '_ref.png' - ref_log = output_base + '_ref.log' - shutil.copy(last_run, ref_output) - shutil.copy(last_log, ref_log) - - diff_output = output_base + '_diff.png' - diff = yield pdiff_worker.PdiffItem( - output_base + '_diff.log', - ref_output, - capture.output_path, - diff_output) - - if diff.returncode != 0 and os.path.exists(diff_output): - print 'Found diff for path=%r, diff=%r' % (parts.path, diff_output) + """Workflow for generating Pdiffs.""" + + def run(self, url, output_dir, reference_dir): + parts = urlparse.urlparse(url) + clean_url = ( + parts.path.replace('/', '_').replace('\\', '_') + .replace(':', '_').replace('.', '_')) + output_base = os.path.join(output_dir, clean_url) + + config_path = output_base + '_config.js' + with open(config_path, 'w') as config_file: + # TODO: Take the base config from a standard file or flags. + config_file.write(json.dumps({ + 'targetUrl': url, + 'viewportSize': { + 'width': 1024, + 'height': 768, + } + })) + + capture = yield capture_worker.CaptureItem( + output_base + '_run.log', + config_path, + output_base + '_run.png') + + if capture.returncode != 0: + raise CaptureFailedError('Failed to capture url=%r' % url) + + print 'Captured: %s' % url + + if not reference_dir: + return + + ref_base = os.path.join(reference_dir, clean_url) + last_run = ref_base + '_run.png' + if not os.path.exists(last_run): + return + + last_log = ref_base + '_run.log' + if not os.path.exists(last_log): + return + + ref_output = output_base + '_ref.png' + ref_log = output_base + '_ref.log' + shutil.copy(last_run, ref_output) + shutil.copy(last_log, ref_log) + + diff_output = output_base + '_diff.png' + diff = yield pdiff_worker.PdiffItem( + output_base + '_diff.log', + ref_output, + capture.output_path, + diff_output) + + if diff.returncode != 0 and os.path.exists(diff_output): + print 'Found diff for path=%r, diff=%r' % (parts.path, diff_output) class SiteDiff(workers.WorkflowItem): - """Workflow for coordinating the site diff.""" + """Workflow for coordinating the site diff.""" - def run(self, start_url, ignore_prefixes, output_dir, reference_dir): - if not os.path.isdir(output_dir): - os.mkdir(output_dir) + def run(self, start_url, ignore_prefixes, output_dir, reference_dir): + if not os.path.isdir(output_dir): + os.mkdir(output_dir) - pending_urls = set([clean_url(start_url)]) - seen_urls = set() - good_urls = set() + pending_urls = set([clean_url(start_url)]) + seen_urls = set() + good_urls = set() - sys.stdout.write('Scanning for content') - sys.stdout.flush() + sys.stdout.write('Scanning for content') + sys.stdout.flush() - while pending_urls: - seen_urls.update(pending_urls) - output = yield [workers.FetchItem(u) for u in pending_urls] - pending_urls.clear() + while pending_urls: + seen_urls.update(pending_urls) + output = yield [workers.FetchItem(u) for u in pending_urls] + pending_urls.clear() - sys.stdout.write('.') - sys.stdout.flush() + sys.stdout.write('.') + sys.stdout.flush() - for item in output: - if not item.data: - logging.info('No data from url=%r', item.url) - continue + for item in output: + if not item.data: + logging.info('No data from url=%r', item.url) + continue - if item.headers.gettype() != 'text/html': - logging.info('Skipping non-HTML document url=%r', item.url) - continue + if item.headers.gettype() != 'text/html': + logging.info('Skipping non-HTML document url=%r', item.url) + continue - good_urls.add(item.url) - found = extract_urls(item.url, item.data) - pruned = prune_urls( - found, start_url, [start_url], ignore_prefixes) - new = pruned - seen_urls - pending_urls.update(new) + good_urls.add(item.url) + found = extract_urls(item.url, item.data) + pruned = prune_urls( + found, start_url, [start_url], ignore_prefixes) + new = pruned - seen_urls + pending_urls.update(new) - print - print 'Found %d total URLs, %d good HTML pages; starting screenshots' % ( - len(seen_urls), len(good_urls)) + print + print ('Found %d total URLs, %d good HTML pages; starting ' + 'screenshots' % (len(seen_urls), len(good_urls))) - found_urls = os.path.join(output_dir, 'url_paths.txt') - good_paths = set(urlparse.urlparse(u).path for u in good_urls) - with open(found_urls, 'w') as urls_file: - urls_file.write('\n'.join(sorted(good_paths))) + found_urls = os.path.join(output_dir, 'url_paths.txt') + good_paths = set(urlparse.urlparse(u).path for u in good_urls) + with open(found_urls, 'w') as urls_file: + urls_file.write('\n'.join(sorted(good_paths))) - results = [] - for url in good_urls: - results.append(PdiffWorkflow(url, output_dir, reference_dir)) - yield results + results = [] + for url in good_urls: + results.append(PdiffWorkflow(url, output_dir, reference_dir)) + yield results - print 'Results in %s' % output_dir + print 'Results in %s' % output_dir def real_main(url, ignore_prefixes, output_dir, reference_dir, - coordinator=None): - """Runs the site_diff.""" - if not coordinator: - coordinator = workers.GetCoordinator() - capture_worker.register(coordinator) - pdiff_worker.register(coordinator) - coordinator.start() - - try: - item = SiteDiff(url, ignore_prefixes, output_dir, reference_dir) - item.root = True - coordinator.input_queue.put(item) - result = coordinator.output_queue.get() - if result.error: - raise result.error[0], result.error[1], result.error[2] - finally: - coordinator.stop() + coordinator=None): + """Runs the site_diff.""" + if not coordinator: + coordinator = workers.GetCoordinator() + capture_worker.register(coordinator) + pdiff_worker.register(coordinator) + coordinator.start() + + try: + item = SiteDiff(url, ignore_prefixes, output_dir, reference_dir) + item.root = True + coordinator.input_queue.put(item) + result = coordinator.output_queue.get() + result.check_result() + finally: + coordinator.stop() def main(argv): - gflags.MarkFlagAsRequired('phantomjs_binary') - gflags.MarkFlagAsRequired('phantomjs_script') - gflags.MarkFlagAsRequired('pdiff_binary') - gflags.MarkFlagAsRequired('output_dir') + gflags.MarkFlagAsRequired('phantomjs_binary') + gflags.MarkFlagAsRequired('phantomjs_script') + gflags.MarkFlagAsRequired('pdiff_binary') + gflags.MarkFlagAsRequired('output_dir') - try: - argv = FLAGS(argv) - except gflags.FlagsError, e: - print '%s\nUsage: %s ARGS\n%s' % (e, sys.argv[0], FLAGS) - sys.exit(1) + try: + argv = FLAGS(argv) + except gflags.FlagsError, e: + print '%s\nUsage: %s ARGS\n%s' % (e, sys.argv[0], FLAGS) + sys.exit(1) - if len(argv) != 2: - print 'Must supply a website URL as the first argument.' - sys.exit(1) + if len(argv) != 2: + print 'Must supply a website URL as the first argument.' + sys.exit(1) - if FLAGS.verbose: - logging.getLogger().setLevel(logging.DEBUG) + if FLAGS.verbose: + logging.getLogger().setLevel(logging.DEBUG) - real_main( - argv[1], FLAGS.ignore_prefixes, FLAGS.output_dir, FLAGS.reference_dir) + real_main( + argv[1], FLAGS.ignore_prefixes, FLAGS.output_dir, FLAGS.reference_dir) if __name__ == '__main__': - main(sys.argv) + main(sys.argv) diff --git a/client/tests/site_diff_test.py b/client/tests/site_diff_test.py index f25a666..2e6bbe0 100755 --- a/client/tests/site_diff_test.py +++ b/client/tests/site_diff_test.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - +# +# http://www.apache.org/licenses/LICENSE-2.0 +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,9 +18,9 @@ To run: ./tests/site_diff_test.py \ - --phantomjs_binary=path/to/phantomjs-1.8.1-macosx/bin/phantomjs \ - --phantomjs_script=path/to/client/capture.js \ - --pdiff_binary=path/to/pdiff/perceptualdiff + --phantomjs_binary=path/to/phantomjs-1.8.1-macosx/bin/phantomjs \ + --phantomjs_script=path/to/client/capture.js \ + --pdiff_binary=path/to/pdiff/perceptualdiff """ import BaseHTTPServer @@ -47,296 +47,299 @@ class HtmlRewritingTest(unittest.TestCase): - """Tests the HTML rewriting functions.""" + """Tests the HTML rewriting functions.""" - def testAll(self): - """Tests all the variations.""" - base_url = 'http://www.example.com/my-url/here' - def test(test_url): - data = 'my link here' % test_url - result = site_diff.extract_urls(base_url, data) - if not result: - return None - return list(result)[0] + def testAll(self): + """Tests all the variations.""" + base_url = 'http://www.example.com/my-url/here' + def test(test_url): + data = 'my link here' % test_url + result = site_diff.extract_urls(base_url, data) + if not result: + return None + return list(result)[0] - self.assertEquals('http://www.example.com/', - test('/')) + self.assertEquals('http://www.example.com/', + test('/')) - self.assertEquals('http://www.example.com/mypath-here', - test('/mypath-here')) + self.assertEquals('http://www.example.com/mypath-here', + test('/mypath-here')) - self.assertEquals(None, test('#fragment-only')) + self.assertEquals(None, test('#fragment-only')) - self.assertEquals('http://www.example.com/my/path/over/here.html', - test('/my/path/01/13/../../over/here.html')) + self.assertEquals('http://www.example.com/my/path/over/here.html', + test('/my/path/01/13/../../over/here.html')) - self.assertEquals('http://www.example.com/my/path/01/over/here.html', - test('/my/path/01/13/./../over/here.html')) + self.assertEquals('http://www.example.com/my/path/01/over/here.html', + test('/my/path/01/13/./../over/here.html')) - self.assertEquals('http://www.example.com/my-url/same-directory.html', - test('same-directory.html')) + self.assertEquals('http://www.example.com/my-url/same-directory.html', + test('same-directory.html')) - self.assertEquals('http://www.example.com/relative-but-no/child', - test('../../relative-but-no/child')) + self.assertEquals('http://www.example.com/relative-but-no/child', + test('../../relative-but-no/child')) - self.assertEquals('http://www.example.com/too/many/relative/paths', - test('../../../../too/many/relative/paths')) + self.assertEquals('http://www.example.com/too/many/relative/paths', + test('../../../../too/many/relative/paths')) - self.assertEquals('http://www.example.com/this/is/scheme-relative.html', - test('//www.example.com/this/is/scheme-relative.html')) + self.assertEquals( + 'http://www.example.com/this/is/scheme-relative.html', + test('//www.example.com/this/is/scheme-relative.html')) - self.assertEquals('http://www.example.com/okay-then', # Scheme changed - test('https://www.example.com/okay-then#blah')) + self.assertEquals( + 'http://www.example.com/okay-then', # Scheme changed + test('https://www.example.com/okay-then#blah')) - self.assertEquals('http://www.example.com/another-one', - test('http://www.example.com/another-one')) + self.assertEquals('http://www.example.com/another-one', + test('http://www.example.com/another-one')) - self.assertEquals('http://www.example.com/this-has/a', - test('/this-has/a?query=string')) + self.assertEquals('http://www.example.com/this-has/a', + test('/this-has/a?query=string')) - self.assertEquals('http://www.example.com/this-also-has/a/', - test('/this-also-has/a/?query=string&but=more-complex')) + self.assertEquals( + 'http://www.example.com/this-also-has/a/', + test('/this-also-has/a/?query=string&but=more-complex')) - self.assertEquals( - 'http://www.example.com/relative-with/some-(parenthesis%20here)', - test('/relative-with/some-(parenthesis%20here)')) + self.assertEquals( + 'http://www.example.com/relative-with/some-(parenthesis%20here)', + test('/relative-with/some-(parenthesis%20here)')) - self.assertEquals( - 'http://www.example.com/relative-with/some-(parenthesis%20here)', - test('//www.example.com/relative-with/some-(parenthesis%20here)')) + self.assertEquals( + 'http://www.example.com/relative-with/some-(parenthesis%20here)', + test('//www.example.com/relative-with/some-(parenthesis%20here)')) - self.assertEquals( - 'http://www.example.com/relative-with/some-(parenthesis%20here)', - test('http://www.example.com/relative-with/some-(parenthesis%20here)')) + self.assertEquals( + 'http://www.example.com/relative-with/some-(parenthesis%20here)', + test('http://www.example.com/relative-with/some-' + '(parenthesis%20here)')) def webserver(func): - """Runs the given function as a webserver. - - Function should take one argument, the path of the request. Should - return tuple (status, content_type, content) or Nothing if there is no - response. - """ - class HandlerClass(BaseHTTPServer.BaseHTTPRequestHandler): - def do_GET(self): - output = func(self.path) - if output: - code, content_type, result = output - else: - code, content_type, result = 404, 'text/plain', 'Not found!' - - self.send_response(code) - self.send_header('Content-Type', content_type) - self.end_headers() - if result: - self.wfile.write(result) - - server = BaseHTTPServer.HTTPServer(('', 0), HandlerClass) - thread = threading.Thread(target=server.serve_forever) - thread.daemon = True - thread.start() - return server + """Runs the given function as a webserver. + + Function should take one argument, the path of the request. Should + return tuple (status, content_type, content) or Nothing if there is no + response. + """ + class HandlerClass(BaseHTTPServer.BaseHTTPRequestHandler): + def do_GET(self): + output = func(self.path) + if output: + code, content_type, result = output + else: + code, content_type, result = 404, 'text/plain', 'Not found!' + + self.send_response(code) + self.send_header('Content-Type', content_type) + self.end_headers() + if result: + self.wfile.write(result) + + server = BaseHTTPServer.HTTPServer(('', 0), HandlerClass) + thread = threading.Thread(target=server.serve_forever) + thread.daemon = True + thread.start() + return server class SiteDiffTest(unittest.TestCase): - """Tests for the SiteDiff workflow.""" - - def setUp(self): - """Sets up the test harness.""" - FLAGS.fetch_frequency = 100 - FLAGS.polltime = 0.01 - self.test_dir = tempfile.mkdtemp('site_diff_test') - self.output_dir = join(self.test_dir, 'output') - self.reference_dir = join(self.test_dir, 'reference') - self.coordinator = workers.GetCoordinator() - - def output_readlines(self, path): - """Reads the lines of an output file, stripping newlines.""" - return [ - x.strip() for x in open(join(self.output_dir, path)).xreadlines()] - - def testFirstSnapshot(self): - """Tests taking the very first snapshot.""" - @webserver - def test(path): - if path == '/': - return 200, 'text/html', 'Hello world!' - - site_diff.real_main( - 'http://%s:%d/' % test.server_address, [], self.output_dir, None, - coordinator=self.coordinator) - test.shutdown() - - self.assertTrue(exists(join(self.output_dir, '__run.log'))) - self.assertTrue(exists(join(self.output_dir, '__run.png'))) - self.assertTrue(exists(join(self.output_dir, '__config.js'))) - self.assertTrue(exists(join(self.output_dir, 'url_paths.txt'))) - - self.assertEquals( - ['/'], - self.output_readlines('url_paths.txt')) - - def testNoDifferences(self): - """Tests crawling the site end-to-end.""" - @webserver - def test(path): - if path == '/': - return 200, 'text/html', 'Hello world!' - - site_diff.real_main( - 'http://%s:%d/' % test.server_address, [], self.reference_dir, None, - coordinator=self.coordinator) - - self.coordinator = workers.GetCoordinator() - site_diff.real_main( - 'http://%s:%d/' % test.server_address, [], - self.output_dir, self.reference_dir, - coordinator=self.coordinator) - test.shutdown() - - self.assertTrue(exists(join(self.reference_dir, '__run.log'))) - self.assertTrue(exists(join(self.reference_dir, '__run.png'))) - self.assertTrue(exists(join(self.reference_dir, '__config.js'))) - self.assertTrue(exists(join(self.reference_dir, 'url_paths.txt'))) - - self.assertTrue(exists(join(self.output_dir, '__run.log'))) - self.assertTrue(exists(join(self.output_dir, '__run.png'))) - self.assertTrue(exists(join(self.output_dir, '__ref.log'))) - self.assertTrue(exists(join(self.output_dir, '__ref.png'))) - self.assertFalse(exists(join(self.output_dir, '__diff.png'))) # No diff - self.assertTrue(exists(join(self.output_dir, '__diff.log'))) - self.assertTrue(exists(join(self.output_dir, '__config.js'))) - self.assertTrue(exists(join(self.output_dir, 'url_paths.txt'))) - - def testOneDifference(self): - """Tests when there is one found difference.""" - @webserver - def test(path): - if path == '/': - return 200, 'text/html', 'Hello world!' - - site_diff.real_main( - 'http://%s:%d/' % test.server_address, [], self.reference_dir, None, - coordinator=self.coordinator) - test.shutdown() - - @webserver - def test(path): - if path == '/': - return 200, 'text/html', 'Hello world a little different!' - - self.coordinator = workers.GetCoordinator() - site_diff.real_main( - 'http://%s:%d/' % test.server_address, [], - self.output_dir, self.reference_dir, - coordinator=self.coordinator) - test.shutdown() - - self.assertTrue(exists(join(self.reference_dir, '__run.log'))) - self.assertTrue(exists(join(self.reference_dir, '__run.png'))) - self.assertTrue(exists(join(self.reference_dir, '__config.js'))) - self.assertTrue(exists(join(self.reference_dir, 'url_paths.txt'))) - - self.assertTrue(exists(join(self.output_dir, '__run.log'))) - self.assertTrue(exists(join(self.output_dir, '__run.png'))) - self.assertTrue(exists(join(self.output_dir, '__ref.log'))) - self.assertTrue(exists(join(self.output_dir, '__ref.png'))) - self.assertTrue(exists(join(self.output_dir, '__diff.png'))) # Diff!! - self.assertTrue(exists(join(self.output_dir, '__diff.log'))) - self.assertTrue(exists(join(self.output_dir, '__config.js'))) - self.assertTrue(exists(join(self.output_dir, 'url_paths.txt'))) - - def testCrawler(self): - """Tests that the crawler behaves well. - - Specifically: - - Finds new links in HTML data - - Avoids non-HTML pages - - Respects ignore patterns specified on flags - """ - @webserver - def test(path): - if path == '/': - return 200, 'text/html', ( - 'Hello world! x y') - elif path == '/stuff': - return 200, 'text/html', 'Stuff page x' - elif path == '/avoid': - return 200, 'text/plain', 'Ignore me!' - - site_diff.real_main( - 'http://%s:%d/' % test.server_address, ['/ignore'], - self.output_dir, None, - coordinator=self.coordinator) - test.shutdown() - - self.assertTrue(exists(join(self.output_dir, '__run.log'))) - self.assertTrue(exists(join(self.output_dir, '__run.png'))) - self.assertTrue(exists(join(self.output_dir, '__config.js'))) - self.assertTrue(exists(join(self.output_dir, 'url_paths.txt'))) - - self.assertEquals( - ['/', '/stuff'], - self.output_readlines('url_paths.txt')) - - def testNotFound(self): - """Tests when a URL in the crawl is not found.""" - @webserver - def test(path): - if path == '/': - return 200, 'text/html', ( - 'Hello world! x') - elif path == '/missing': - return 404, 'text/plain', 'Nope' - - site_diff.real_main( - 'http://%s:%d/' % test.server_address, ['/ignore'], - self.output_dir, None, - coordinator=self.coordinator) - test.shutdown() - - self.assertTrue(exists(join(self.output_dir, '__run.log'))) - self.assertTrue(exists(join(self.output_dir, '__run.png'))) - self.assertTrue(exists(join(self.output_dir, '__config.js'))) - self.assertTrue(exists(join(self.output_dir, 'url_paths.txt'))) - - self.assertEquals( - ['/'], - self.output_readlines('url_paths.txt')) - - self.fail() - - def testDiffNotLinkedUrlsFound(self): - """Tests when a URL in the old set exists but is not linked.""" - self.fail() - - def testDiffNotFound(self): - """Tests when a URL in the old set is a 404 in the new set.""" - self.fail() - - def testSuccessAfterRetry(self): - """Tests that URLs that return errors will be retried.""" - self.fail() - - def testFailureAfterRetry(self): - """Tests when repeated retries of a URL fail.""" - self.fail() + """Tests for the SiteDiff workflow.""" + + def setUp(self): + """Sets up the test harness.""" + FLAGS.fetch_frequency = 100 + FLAGS.polltime = 0.01 + self.test_dir = tempfile.mkdtemp('site_diff_test') + self.output_dir = join(self.test_dir, 'output') + self.reference_dir = join(self.test_dir, 'reference') + self.coordinator = workers.GetCoordinator() + + def output_readlines(self, path): + """Reads the lines of an output file, stripping newlines.""" + return [ + x.strip() for x in open(join(self.output_dir, path)).xreadlines()] + + def testFirstSnapshot(self): + """Tests taking the very first snapshot.""" + @webserver + def test(path): + if path == '/': + return 200, 'text/html', 'Hello world!' + + site_diff.real_main( + 'http://%s:%d/' % test.server_address, [], self.output_dir, None, + coordinator=self.coordinator) + test.shutdown() + + self.assertTrue(exists(join(self.output_dir, '__run.log'))) + self.assertTrue(exists(join(self.output_dir, '__run.png'))) + self.assertTrue(exists(join(self.output_dir, '__config.js'))) + self.assertTrue(exists(join(self.output_dir, 'url_paths.txt'))) + + self.assertEquals( + ['/'], + self.output_readlines('url_paths.txt')) + + def testNoDifferences(self): + """Tests crawling the site end-to-end.""" + @webserver + def test(path): + if path == '/': + return 200, 'text/html', 'Hello world!' + + site_diff.real_main( + 'http://%s:%d/' % test.server_address, [], self.reference_dir, + None, coordinator=self.coordinator) + + self.coordinator = workers.GetCoordinator() + site_diff.real_main( + 'http://%s:%d/' % test.server_address, [], + self.output_dir, self.reference_dir, + coordinator=self.coordinator) + test.shutdown() + + self.assertTrue(exists(join(self.reference_dir, '__run.log'))) + self.assertTrue(exists(join(self.reference_dir, '__run.png'))) + self.assertTrue(exists(join(self.reference_dir, '__config.js'))) + self.assertTrue(exists(join(self.reference_dir, 'url_paths.txt'))) + + self.assertTrue(exists(join(self.output_dir, '__run.log'))) + self.assertTrue(exists(join(self.output_dir, '__run.png'))) + self.assertTrue(exists(join(self.output_dir, '__ref.log'))) + self.assertTrue(exists(join(self.output_dir, '__ref.png'))) + self.assertFalse(exists(join(self.output_dir, '__diff.png'))) # No diff + self.assertTrue(exists(join(self.output_dir, '__diff.log'))) + self.assertTrue(exists(join(self.output_dir, '__config.js'))) + self.assertTrue(exists(join(self.output_dir, 'url_paths.txt'))) + + def testOneDifference(self): + """Tests when there is one found difference.""" + @webserver + def test(path): + if path == '/': + return 200, 'text/html', 'Hello world!' + + site_diff.real_main( + 'http://%s:%d/' % test.server_address, [], self.reference_dir, + None, coordinator=self.coordinator) + test.shutdown() + + @webserver + def test(path): + if path == '/': + return 200, 'text/html', 'Hello world a little different!' + + self.coordinator = workers.GetCoordinator() + site_diff.real_main( + 'http://%s:%d/' % test.server_address, [], + self.output_dir, self.reference_dir, + coordinator=self.coordinator) + test.shutdown() + + self.assertTrue(exists(join(self.reference_dir, '__run.log'))) + self.assertTrue(exists(join(self.reference_dir, '__run.png'))) + self.assertTrue(exists(join(self.reference_dir, '__config.js'))) + self.assertTrue(exists(join(self.reference_dir, 'url_paths.txt'))) + + self.assertTrue(exists(join(self.output_dir, '__run.log'))) + self.assertTrue(exists(join(self.output_dir, '__run.png'))) + self.assertTrue(exists(join(self.output_dir, '__ref.log'))) + self.assertTrue(exists(join(self.output_dir, '__ref.png'))) + self.assertTrue(exists(join(self.output_dir, '__diff.png'))) # Diff!! + self.assertTrue(exists(join(self.output_dir, '__diff.log'))) + self.assertTrue(exists(join(self.output_dir, '__config.js'))) + self.assertTrue(exists(join(self.output_dir, 'url_paths.txt'))) + + def testCrawler(self): + """Tests that the crawler behaves well. + + Specifically: + - Finds new links in HTML data + - Avoids non-HTML pages + - Respects ignore patterns specified on flags + """ + @webserver + def test(path): + if path == '/': + return 200, 'text/html', ( + 'Hello world! x ' + 'y') + elif path == '/stuff': + return 200, 'text/html', 'Stuff page x' + elif path == '/avoid': + return 200, 'text/plain', 'Ignore me!' + + site_diff.real_main( + 'http://%s:%d/' % test.server_address, ['/ignore'], + self.output_dir, None, coordinator=self.coordinator) + test.shutdown() + + self.assertTrue(exists(join(self.output_dir, '__run.log'))) + self.assertTrue(exists(join(self.output_dir, '__run.png'))) + self.assertTrue(exists(join(self.output_dir, '__config.js'))) + self.assertTrue(exists(join(self.output_dir, 'url_paths.txt'))) + + self.assertEquals( + ['/', '/stuff'], + self.output_readlines('url_paths.txt')) + + def testNotFound(self): + """Tests when a URL in the crawl is not found.""" + @webserver + def test(path): + if path == '/': + return 200, 'text/html', ( + 'Hello world! x') + elif path == '/missing': + return 404, 'text/plain', 'Nope' + + site_diff.real_main( + 'http://%s:%d/' % test.server_address, ['/ignore'], + self.output_dir, None, coordinator=self.coordinator) + test.shutdown() + + self.assertTrue(exists(join(self.output_dir, '__run.log'))) + self.assertTrue(exists(join(self.output_dir, '__run.png'))) + self.assertTrue(exists(join(self.output_dir, '__config.js'))) + self.assertTrue(exists(join(self.output_dir, 'url_paths.txt'))) + + self.assertEquals( + ['/'], + self.output_readlines('url_paths.txt')) + + self.fail() + + def testDiffNotLinkedUrlsFound(self): + """Tests when a URL in the old set exists but is not linked.""" + self.fail() + + def testDiffNotFound(self): + """Tests when a URL in the old set is a 404 in the new set.""" + self.fail() + + def testSuccessAfterRetry(self): + """Tests that URLs that return errors will be retried.""" + self.fail() + + def testFailureAfterRetry(self): + """Tests when repeated retries of a URL fail.""" + self.fail() def main(argv): - gflags.MarkFlagAsRequired('phantomjs_binary') - gflags.MarkFlagAsRequired('phantomjs_script') - gflags.MarkFlagAsRequired('pdiff_binary') + gflags.MarkFlagAsRequired('phantomjs_binary') + gflags.MarkFlagAsRequired('phantomjs_script') + gflags.MarkFlagAsRequired('pdiff_binary') - try: - argv = FLAGS(argv) - except gflags.FlagsError, e: - print '%s\nUsage: %s ARGS\n%s' % (e, sys.argv[0], FLAGS) - sys.exit(1) + try: + argv = FLAGS(argv) + except gflags.FlagsError, e: + print '%s\nUsage: %s ARGS\n%s' % (e, sys.argv[0], FLAGS) + sys.exit(1) - logging.getLogger().setLevel(logging.DEBUG) - unittest.main(argv=argv) + logging.getLogger().setLevel(logging.DEBUG) + unittest.main(argv=argv) if __name__ == '__main__': - main(sys.argv) + main(sys.argv) diff --git a/client/tests/workers_test.py b/client/tests/workers_test.py index 3d4c12b..e398591 100755 --- a/client/tests/workers_test.py +++ b/client/tests/workers_test.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - +# +# http://www.apache.org/licenses/LICENSE-2.0 +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -30,92 +30,92 @@ class EchoThread(workers.WorkerThread): - def handle_item(self, item): - if item.should_die: - raise Exception('Dying on %d' % item.input_number) - item.output_number = item.input_number - return item + def handle_item(self, item): + if item.should_die: + raise Exception('Dying on %d' % item.input_number) + item.output_number = item.input_number + return item class EchoItem(workers.WorkItem): - def __init__(self, number, should_die=False): - workers.WorkItem.__init__(self) - self.input_number = number - self.output_number = None - self.should_die = should_die + def __init__(self, number, should_die=False): + workers.WorkItem.__init__(self) + self.input_number = number + self.output_number = None + self.should_die = should_die class EchoChild(workers.WorkflowItem): - def run(self, number, should_die=False): - item = yield EchoItem(number, should_die=should_die) - self.result = item.output_number + def run(self, number, should_die=False): + item = yield EchoItem(number, should_die=should_die) + self.result = item.output_number class RootWorkflow(workers.WorkflowItem): - def run(self, child_count, die_on=-1): - total = 0 - for i in xrange(child_count): - workflow = yield EchoChild(i, should_die=(die_on == i)) - assert workflow.result == i - total += workflow.result - self.result = total + def run(self, child_count, die_on=-1): + total = 0 + for i in xrange(child_count): + workflow = yield EchoChild(i, should_die=(die_on == i)) + assert workflow.result == i + total += workflow.result + self.result = total class WorkflowThreadTest(unittest.TestCase): - """Tests for the WorkflowThread worker.""" - - def setUp(self): - """Sets up the test harness.""" - FLAGS.fetch_frequency = 100 - FLAGS.polltime = 0.01 - self.coordinator = workers.GetCoordinator() - - def tearDown(self): - """Cleans up the test harness.""" - self.coordinator.stop() - - def testMultiLevelWorkflow(self): - """Tests a multi-level workflow.""" - echo_queue = Queue.Queue() - self.coordinator.register(EchoItem, echo_queue) - self.coordinator.worker_threads.append( - EchoThread(echo_queue, self.coordinator.input_queue)) - self.coordinator.start() - - work = RootWorkflow(5) - work.root = True - self.coordinator.input_queue.put(work) - finished = self.coordinator.output_queue.get() - - self.assertTrue(work is finished) - finished.check_result() # Did not raise - self.assertEquals(4 + 3 + 2 + 1 + 0, work.result) - - def testMultiLevelWorkflowException(self): - """Tests when a child of a child raises an exception.""" - echo_queue = Queue.Queue() - self.coordinator.register(EchoItem, echo_queue) - self.coordinator.worker_threads.append( - EchoThread(echo_queue, self.coordinator.input_queue)) - self.coordinator.start() - - work = RootWorkflow(5, die_on=3) - work.root = True - self.coordinator.input_queue.put(work) - finished = self.coordinator.output_queue.get() - - self.assertTrue(work is finished) - try: - finished.check_result() - except Exception, e: - self.assertEquals('Dying on 3', str(e)) + """Tests for the WorkflowThread worker.""" + + def setUp(self): + """Sets up the test harness.""" + FLAGS.fetch_frequency = 100 + FLAGS.polltime = 0.01 + self.coordinator = workers.GetCoordinator() + + def tearDown(self): + """Cleans up the test harness.""" + self.coordinator.stop() + + def testMultiLevelWorkflow(self): + """Tests a multi-level workflow.""" + echo_queue = Queue.Queue() + self.coordinator.register(EchoItem, echo_queue) + self.coordinator.worker_threads.append( + EchoThread(echo_queue, self.coordinator.input_queue)) + self.coordinator.start() + + work = RootWorkflow(5) + work.root = True + self.coordinator.input_queue.put(work) + finished = self.coordinator.output_queue.get() + + self.assertTrue(work is finished) + finished.check_result() # Did not raise + self.assertEquals(4 + 3 + 2 + 1 + 0, work.result) + + def testMultiLevelWorkflowException(self): + """Tests when a child of a child raises an exception.""" + echo_queue = Queue.Queue() + self.coordinator.register(EchoItem, echo_queue) + self.coordinator.worker_threads.append( + EchoThread(echo_queue, self.coordinator.input_queue)) + self.coordinator.start() + + work = RootWorkflow(5, die_on=3) + work.root = True + self.coordinator.input_queue.put(work) + finished = self.coordinator.output_queue.get() + + self.assertTrue(work is finished) + try: + finished.check_result() + except Exception, e: + self.assertEquals('Dying on 3', str(e)) def main(argv): - logging.getLogger().setLevel(logging.DEBUG) - argv = FLAGS(argv) - unittest.main(argv=argv) + logging.getLogger().setLevel(logging.DEBUG) + argv = FLAGS(argv) + unittest.main(argv=argv) if __name__ == '__main__': - main(sys.argv) + main(sys.argv) diff --git a/client/workers.py b/client/workers.py index 991a5f4..a636ca3 100644 --- a/client/workers.py +++ b/client/workers.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - +# # http://www.apache.org/licenses/LICENSE-2.0 - +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -42,451 +42,462 @@ class Error(Exception): - """Base class for exceptions in this module.""" + """Base class for exceptions in this module.""" class TimeoutError(Exception): - """Subprocess has taken too long to complete and was terminated.""" + """Subprocess has taken too long to complete and was terminated.""" class WorkItem(object): - """Base work item that can be handled by a worker thread.""" - - def __init__(self): - self.error = None - - @staticmethod - def _print_tree(obj): - if isinstance(obj, dict): - result = [] - for key, value in obj.iteritems(): - result.append("'%s': %s" % (key, WorkItem._print_tree(value))) - return '{%s}' % ', '.join(result) - else: - value_str = repr(obj) - if len(value_str) > 100: - return '%s...%s' % (value_str[:100], value_str[-1]) - else: - return value_str - - def __repr__(self): - return '%s.%s(%s)' % ( - self.__class__.__module__, - self.__class__.__name__, - self._print_tree(self.__dict__)) - - def check_result(self): - # TODO: For WorkflowItems, remove generator.throw(*item.error) from - # the stack trace since it's noise. - if self.error: - raise self.error[0], self.error[1], self.error[2] - - -class WorkerThread(threading.Thread): - """Base worker thread that handles items one at a time.""" + """Base work item that can be handled by a worker thread.""" + + def __init__(self): + self.error = None + + @staticmethod + def _print_tree(obj): + if isinstance(obj, dict): + result = [] + for key, value in obj.iteritems(): + result.append("'%s': %s" % (key, WorkItem._print_tree(value))) + return '{%s}' % ', '.join(result) + else: + value_str = repr(obj) + if len(value_str) > 100: + return '%s...%s' % (value_str[:100], value_str[-1]) + else: + return value_str - def __init__(self, input_queue, output_queue): - """Initializer. + def __repr__(self): + return '%s.%s(%s)' % ( + self.__class__.__module__, + self.__class__.__name__, + self._print_tree(self.__dict__)) - Args: - input_queue: Queue this worker consumes work from. - output_queue: Queue where this worker puts new work items, if any. - """ - threading.Thread.__init__(self) - self.daemon = True - self.input_queue = input_queue - self.output_queue = output_queue - self.interrupted = False - - def run(self): - while not self.interrupted: - try: - item = self.input_queue.get(True, FLAGS.polltime) - except Queue.Empty: - continue - - try: - next_item = self.handle_item(item) - except Exception, e: - item.error = sys.exc_info() - logging.debug('%s error item=%r', self.worker_name, item) - self.output_queue.put(item) - else: - logging.debug('%s processed item=%r', self.worker_name, item) - if next_item: - self.output_queue.put(next_item) - finally: - self.input_queue.task_done() - - @property - def worker_name(self): - return '%s:%s' % (self.__class__.__name__, self.ident) - - def handle_item(self, item): - """Handles a single item. + def check_result(self): + # TODO: For WorkflowItems, remove generator.throw(*item.error) from + # the stack trace since it's noise. + if self.error: + raise self.error[0], self.error[1], self.error[2] - Args: - item: WorkItem to process. - Returns: - A WorkItem that should go on the output queue. If None, then the provided - work item is considered finished and no additional work is needed. - """ - raise NotImplemented +class WorkerThread(threading.Thread): + """Base worker thread that handles items one at a time.""" + + def __init__(self, input_queue, output_queue): + """Initializer. + + Args: + input_queue: Queue this worker consumes work from. + output_queue: Queue where this worker puts new work items, if any. + """ + threading.Thread.__init__(self) + self.daemon = True + self.input_queue = input_queue + self.output_queue = output_queue + self.interrupted = False + + def run(self): + while not self.interrupted: + try: + item = self.input_queue.get(True, FLAGS.polltime) + except Queue.Empty: + continue + + try: + next_item = self.handle_item(item) + except Exception, e: + item.error = sys.exc_info() + logging.debug('%s error item=%r', self.worker_name, item) + self.output_queue.put(item) + else: + logging.debug('%s processed item=%r', self.worker_name, item) + if next_item: + self.output_queue.put(next_item) + finally: + self.input_queue.task_done() + + @property + def worker_name(self): + return '%s:%s' % (self.__class__.__name__, self.ident) + + def handle_item(self, item): + """Handles a single item. + + Args: + item: WorkItem to process. + + Returns: + A WorkItem that should go on the output queue. If None, then + the provided work item is considered finished and no + additional work is needed. + """ + raise NotImplemented class FetchItem(WorkItem): - """Work item that is handled by fetching a URL.""" - - def __init__(self, url, post=None, timeout_seconds=30): - """Initializer. - - Args: - url: URL to fetch. - post: Optional. When supplied, a dictionary of post parameters to - include in the request. - timeout_seconds: Optional. How long until the fetch should timeout. - """ - WorkItem.__init__(self) - self.url = url - self.post = post - self.timeout_seconds = timeout_seconds - self.status_code = None - self.data = None - self.headers = None - self._data_json = None - - @property - def json(self): - """Returns the data de-JSONed or None if it's the wrong content type.""" - if self._data_json: - return self._data_json - - if not self.data or self.headers.gettype() != 'application/json': - return None - - self._data_json = json.loads(self.data) - return self._data_json + """Work item that is handled by fetching a URL.""" + + def __init__(self, url, post=None, timeout_seconds=30): + """Initializer. + + Args: + url: URL to fetch. + post: Optional. When supplied, a dictionary of post parameters to + include in the request. + timeout_seconds: Optional. How long until the fetch should timeout. + """ + WorkItem.__init__(self) + self.url = url + self.post = post + self.timeout_seconds = timeout_seconds + self.status_code = None + self.data = None + self.headers = None + self._data_json = None + + @property + def json(self): + """Returns de-JSONed data or None if it's a different content type.""" + if self._data_json: + return self._data_json + + if not self.data or self.headers.gettype() != 'application/json': + return None + + self._data_json = json.loads(self.data) + return self._data_json class FetchThread(WorkerThread): - """Worker thread for fetching URLs.""" - - def handle_item(self, item): - start_time = time.time() - data = None - if item.post: - data = urllib.urlencode(item.post) - - try: - try: - conn = urllib2.urlopen( - item.url, data=data, timeout=item.timeout_seconds) - item.status_code = conn.getcode() - item.headers = conn.info() - if item.status_code == 200: - item.data = conn.read() - except urllib2.HTTPError, e: - item.status_code = e.code - except urllib2.URLError: - # TODO: Do something smarter here, like report a 400 error. - pass - - return item - finally: - end_time = time.time() - wait_duration = (1.0 / FLAGS.fetch_frequency) - (end_time - start_time) - if wait_duration > 0: - logging.debug('Rate limiting URL fetch for %f seconds', wait_duration) - time.sleep(wait_duration) + """Worker thread for fetching URLs.""" + + def handle_item(self, item): + start_time = time.time() + data = None + if item.post: + data = urllib.urlencode(item.post) + + try: + try: + conn = urllib2.urlopen( + item.url, data=data, timeout=item.timeout_seconds) + item.status_code = conn.getcode() + item.headers = conn.info() + if item.status_code == 200: + item.data = conn.read() + except urllib2.HTTPError, e: + item.status_code = e.code + except urllib2.URLError: + # TODO: Do something smarter here, like report a 400 error. + pass + + return item + finally: + end_time = time.time() + wait_duration = (1.0 / FLAGS.fetch_frequency) - ( + end_time - start_time) + if wait_duration > 0: + logging.debug('Rate limiting URL fetch for %f seconds', + wait_duration) + time.sleep(wait_duration) class ProcessItem(WorkItem): - """Work item that is handled by running a subprocess.""" + """Work item that is handled by running a subprocess.""" - def __init__(self, log_path, timeout_seconds=30): - """Initializer. + def __init__(self, log_path, timeout_seconds=30): + """Initializer. - Args: - log_path: Path to where output from this subprocess should be written. - timeout_seconds: How long before the process should be force killed. - """ - WorkItem.__init__(self) - self.log_path = log_path - self.timeout_seconds = timeout_seconds - self.return_code = None + Args: + log_path: Path to where output from this subprocess should be + written. + timeout_seconds: How long before the process should be force + killed. + """ + WorkItem.__init__(self) + self.log_path = log_path + self.timeout_seconds = timeout_seconds + self.return_code = None class ProcessThread(WorkerThread): - """Worker thread that runs subprocesses.""" - - def get_args(self, item): - raise NotImplemented + """Worker thread that runs subprocesses.""" + + def get_args(self, item): + raise NotImplemented + + def handle_item(self, item): + start_time = time.time() + with open(item.log_path, 'w') as output_file: + args = self.get_args(item) + logging.debug('%s item=%r Running subprocess: %r', + self.worker_name, item, args) + process = subprocess.Popen( + args, + stderr=subprocess.STDOUT, + stdout=output_file, + close_fds=True) + + while True: + process.poll() + if process.returncode is None: + now = time.time() + run_time = now - start_time + if run_time > item.timeout_seconds or self.interrupted: + process.kill() + raise TimeoutError( + 'Sent SIGKILL to item=%r, pid=%s, run_time=%s' % + (item, process.pid, run_time)) + + time.sleep(FLAGS.polltime) + continue + + item.returncode = process.returncode + + return item - def handle_item(self, item): - start_time = time.time() - with open(item.log_path, 'w') as output_file: - args = self.get_args(item) - logging.debug('%s item=%r Running subprocess: %r', - self.worker_name, item, args) - process = subprocess.Popen( - args, - stderr=subprocess.STDOUT, - stdout=output_file, - close_fds=True) - while True: - process.poll() - if process.returncode is None: - now = time.time() - run_time = now - start_time - if run_time > item.timeout_seconds or self.interrupted: - process.kill() - raise TimeoutError('Sent SIGKILL to item=%r, pid=%s, run_time=%s' % - (item, process.pid, run_time)) - - time.sleep(FLAGS.polltime) - continue - - item.returncode = process.returncode +class WorkflowItem(WorkItem): + """Work item for coordinating other work items. + + To use: Sub-class and override run(). Yield WorkItems you want processed + as part of this workflow. Exceptions in child workflows will be reinjected + into the run() generator at the yield point. Results will be available on + the WorkItems returned by yield statements. Yield a list of WorkItems + to do them in parallel. The first error encountered for the whole list + will be raised if there's an exception. + """ - return item + def __init__(self, *args, **kwargs): + WorkItem.__init__(self) + self.args = args + self.kwargs = kwargs + self.result = None + self.done = False + self.root = False + def run(self, *args, **kwargs): + yield 'Yo dawg' -class WorkflowItem(WorkItem): - """Work item for coordinating other work items. - To use: Sub-class and override run(). Yield WorkItems you want processed - as part of this workflow. Exceptions in child workflows will be reinjected - into the run() generator at the yield point. Results will be available on - the WorkItems returned by yield statements. Yield a list of WorkItems - to do them in parallel. The first error encountered for the whole list - will be raised if there's an exception. - """ +class Barrier(list): + """Barrier for running multiple WorkItems in parallel.""" + + def __init__(self, workflow, generator, work): + """Initializer. + + Args: + workflow: WorkflowItem instance this is for. + generator: Current state of the WorkflowItem's generator. + work: Next set of work to do. May be a single WorkItem object or + a list or tuple that contains a set of WorkItems to run in + parallel. + """ + list.__init__(self) + self.workflow = workflow + self.generator = generator + if isinstance(work, (list, tuple)): + self[:] = list(work) + self.was_list = True + else: + self[:] = [work] + self.was_list = False + self.remaining = len(self) + self.error = None + + def get_item(self): + """Returns the item to send back into the workflow generator.""" + if self.was_list: + return self + else: + return self[0] - def __init__(self, *args, **kwargs): - WorkItem.__init__(self) - self.args = args - self.kwargs = kwargs - self.result = None - self.done = False - self.root = False + def finish(self, item): + """Marks the given item that is part of the barrier as done.""" + self.remaining -= 1 + if item.error and not self.error: + self.error = item.error - def run(self, *args, **kwargs): - yield 'Yo dawg' +class WorkflowThread(WorkerThread): + """Worker thread for running workflows.""" + + def __init__(self, input_queue, output_queue): + """Initializer. + + Args: + input_queue: Queue this worker consumes work from. These should be + WorkflowItems to process, or any WorkItems registered with this + class using the register() method. + output_queue: Queue where this worker puts finished work items, + if any. + """ + WorkerThread.__init__(self, input_queue, output_queue) + self.pending = {} + self.work_map = {} + self.worker_threads = [] + self.register(WorkflowItem, input_queue) + + # TODO: Implement drain, to let all existing work finish but no new work + # allowed at the top of the funnel. + + def start(self): + """Starts the coordinator thread and all related worker threads.""" + assert not self.interrupted + for thread in self.worker_threads: + thread.start() + WorkerThread.start(self) + + def stop(self): + """Stops the coordinator thread and all related threads.""" + if self.interrupted: + return + for thread in self.worker_threads: + thread.interrupted = True + self.interrupted = True + for thread in self.worker_threads: + thread.join() + self.join() + + def register(self, work_type, queue): + """Registers where work for a specific type can be executed. + + Args: + work_type: Sub-class of WorkItem to register. + queue: Queue instance where WorkItems of the work_type should be + enqueued when they are yielded by WorkflowItems being run by + this worker. + """ + self.work_map[work_type] = queue + + def handle_item(self, item): + if isinstance(item, WorkflowItem) and not item.done: + workflow = item + generator = item.run(*item.args, **item.kwargs) + item = None + else: + barrier = self.pending.pop(item) + barrier.finish(item) + if barrier.remaining and not barrier.error: + return + item = barrier.get_item() + workflow = barrier.workflow + generator = barrier.generator + + while True: + logging.debug('Transitioning workflow=%r, generator=%r, item=%r', + workflow, generator, item) + try: + if item is not None and item.error: + next_item = generator.throw(*item.error) + else: + next_item = generator.send(item) + except StopIteration: + workflow.done = True + if workflow.root: + return workflow + else: + self.input_queue.put(workflow) + return + except Exception, e: + # Sub-workflow re-raised an exception. Reinject it into the + # workflow so a pending parent can catch it. + workflow.done = True + workflow.error = sys.exc_info() + if workflow.root: + return workflow + else: + self.input_queue.put(workflow) + return + + # If a returned barrier is empty, immediately progress the + # workflow. + barrier = Barrier(workflow, generator, next_item) + if barrier: + break + else: + item = None + + for item in barrier: + if isinstance(item, WorkflowItem): + target_queue = self.input_queue + else: + target_queue = self.work_map[type(item)] + self.pending[item] = barrier + target_queue.put(item) -class Barrier(list): - """Barrier for running multiple WorkItems in parallel.""" - def __init__(self, workflow, generator, work): - """Initializer. +class RemoteQueueWorkflow(WorkflowItem): + """Runs a local workflow based on work items in a remote queue. Args: - workflow: WorkflowItem instance this is for. - generator: Current state of the WorkflowItem's generator. - work: Next set of work to do. May be a single WorkItem object or - a list or tuple that contains a set of WorkItems to run in parallel. + queue_lease_url: URL to POST to for leasing new tasks. + queue_finish_url: URL to POST to for finishing an existing task. + local_queue_workflow: WorkflowItem sub-class to create using parameters + from the remote work payload that will execute the task. """ - list.__init__(self) - self.workflow = workflow - self.generator = generator - if isinstance(work, (list, tuple)): - self[:] = list(work) - self.was_list = True - else: - self[:] = [work] - self.was_list = False - self.remaining = len(self) - self.error = None - - def get_item(self): - """Returns the item to send back into the workflow generator.""" - if self.was_list: - return self - else: - return self[0] - - def finish(self, item): - """Marks the given item that is part of the barrier as done.""" - self.remaining -= 1 - if item.error and not self.error: - self.error = item.error + def run(self, queue_lease_url, queue_finish_url, local_queue_workflow): + while True: + next_item = yield workers.FetchItem(queue_lease_url, post={}) -class WorkflowThread(WorkerThread): - """Worker thread for running workflows.""" + if next_item.json and next_item.json['error']: + logging.error('Could not fetch work from queue_lease_url=%r. ' + '%s', queue_lease_url, next_item.json['error']) - def __init__(self, input_queue, output_queue): - """Initializer. + if (not next_item.json or + not next_item.json['tasks'] or + next_item.json['error']): + # TODO: yield a delay, sleep + continue - Args: - input_queue: Queue this worker consumes work from. These should be - WorkflowItems to process, or any WorkItems registered with this - class using the register() method. - output_queue: Queue where this worker puts finished work items, if any. - """ - WorkerThread.__init__(self, input_queue, output_queue) - self.pending = {} - self.work_map = {} - self.worker_threads = [] - self.register(WorkflowItem, input_queue) - - # TODO: Implement drain, to let all existing work finish but no new work - # allowed at the top of the funnel. - - def start(self): - """Starts the coordinator thread and all related worker threads.""" - assert not self.interrupted - for thread in self.worker_threads: - thread.start() - WorkerThread.start(self) - - def stop(self): - """Stops the coordinator thread and all related threads.""" - if self.interrupted: - return - for thread in self.worker_threads: - thread.interrupted = True - self.interrupted = True - for thread in self.worker_threads: - thread.join() - self.join() - - def register(self, work_type, queue): - """Registers where work for a specific type can be executed. + task_list = next_item.json['tasks'] + assert len(task_list) == 1 + task = task_list[0] - Args: - work_type: Sub-class of WorkItem to register. - queue: Queue instance where WorkItems of the work_type should be - enqueued when they are yielded by WorkflowItems being run by - this worker. - """ - self.work_map[work_type] = queue - - def handle_item(self, item): - if isinstance(item, WorkflowItem) and not item.done: - workflow = item - generator = item.run(*item.args, **item.kwargs) - item = None - else: - barrier = self.pending.pop(item) - barrier.finish(item) - if barrier.remaining and not barrier.error: - return - item = barrier.get_item() - workflow = barrier.workflow - generator = barrier.generator - - while True: - logging.debug('Transitioning workflow=%r, generator=%r, item=%r', - workflow, generator, item) - try: - if item is not None and item.error: - next_item = generator.throw(*item.error) - else: - next_item = generator.send(item) - except StopIteration: - workflow.done = True - if workflow.root: - return workflow - else: - self.input_queue.put(workflow) - return - except Exception, e: - # Sub-workflow re-raised an exception. Reinject it into the workflow - # so a pending parent can catch it. - workflow.done = True - workflow.error = sys.exc_info() - if workflow.root: - return workflow - else: - self.input_queue.put(workflow) - return + task_id = task.pop('task_id') + logging.debug('Starting work item from queue_lease_url=%r, ' + 'task_id=%r, payload=%r, workflow=%r', + queue_finish_url, task_id, task, + local_queue_workflow) - # If a returned barrier is empty, immediately progress the workflow. - barrier = Barrier(workflow, generator, next_item) - if barrier: - break - else: - item = None + try: + yield local_queue_workflow(**task) + except Exception: + logging.exception('Exception while processing work from ' + 'queue_lease_url=%r, task_id=%r', + queue_lease_url, task_id) + continue - for item in barrier: - if isinstance(item, WorkflowItem): - target_queue = self.input_queue - else: - target_queue = self.work_map[type(item)] - self.pending[item] = barrier - target_queue.put(item) + finish_item = yield workers.FetchItem( + queue_finish_url, post={'task_id': task_id}) + if finish_item.json and finish_item.json['error']: + logging.error('Could not finish work with ' + 'queue_finish_url=%r, task_id=%r. %s', + queue_finish_url, finish_item.json['error'], + task_id) - -class RemoteQueueWorkflow(WorkflowItem): - """Runs a local workflow based on work items in a remote queue. - - Args: - queue_lease_url: URL to POST to for leasing new tasks. - queue_finish_url: URL to POST to for finishing an existing task. - local_queue_workflow: WorkflowItem sub-class to create using parameters - from the remote work payload that will execute the task. - """ - - def run(self, queue_lease_url, queue_finish_url, local_queue_workflow): - while True: - next_item = yield workers.FetchItem(queue_lease_url, post={}) - - if next_item.json and next_item.json['error']: - logging.error('Could not fetch work from queue_lease_url=%r. %s', - next_item.json['error']) - - if (not next_item.json or - not next_item.json['tasks'] or - next_item.json['error']): - # TODO: yield a delay, sleep - continue - - task_list = next_item.json['tasks'] - assert len(task_list) == 1 - task = task_list[0] - - task_id = task.pop('task_id') - logging.debug('Starting work item from queue_lease_url=%r, task_id=%r, ' - 'payload=%r, workflow=%r', - queue_finish_url, task_id, task, local_queue_workflow) - - try: - yield local_queue_workflow(**task) - except Exception: - logging.exception('Exception while processing work from ' - 'queue_lease_url=%r, task_id=%r', - queue_lease_url, task_id) - continue - - finish_item = yield workers.FetchItem( - queue_finish_url, post={'task_id': task_id}) - if finish_item.json and finish_item.json['error']: - logging.error('Could not finish work with queue_finish_url=%r, ' - 'task_id=%r. %s', - queue_finish_url, finish_item.json['error'], task_id) - - logging.debug('Finished work item with queue_finish_url=%r, task_id=%r', - queue_finish_url, task_id) + logging.debug('Finished work item with queue_finish_url=%r, ' + 'task_id=%r', queue_finish_url, task_id) def GetCoordinator(): - """Creates a coordinator and returns it.""" - fetch_queue = Queue.Queue() - workflow_queue = Queue.Queue() - complete_queue = Queue.Queue() - - coordinator = WorkflowThread(workflow_queue, complete_queue) - coordinator.register(FetchItem, fetch_queue) - - # TODO: Make number of threads configurable. - # TODO: Enable multiple coodinator threads. - coordinator.worker_threads = [ - FetchThread(fetch_queue, workflow_queue), - FetchThread(fetch_queue, workflow_queue), - ] - - return coordinator + """Creates a coordinator and returns it.""" + fetch_queue = Queue.Queue() + workflow_queue = Queue.Queue() + complete_queue = Queue.Queue() + + coordinator = WorkflowThread(workflow_queue, complete_queue) + coordinator.register(FetchItem, fetch_queue) + + # TODO: Make number of threads configurable. + # TODO: Enable multiple coodinator threads. + coordinator.worker_threads = [ + FetchThread(fetch_queue, workflow_queue), + FetchThread(fetch_queue, workflow_queue), + ] + + return coordinator diff --git a/server/dpxdt/__init__.py b/server/dpxdt/__init__.py index e5a3447..014bea6 100644 --- a/server/dpxdt/__init__.py +++ b/server/dpxdt/__init__.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - +# # http://www.apache.org/licenses/LICENSE-2.0 - +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/server/dpxdt/api.py b/server/dpxdt/api.py index e1e9ceb..08e65fb 100644 --- a/server/dpxdt/api.py +++ b/server/dpxdt/api.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - +# # http://www.apache.org/licenses/LICENSE-2.0 - +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -69,7 +69,7 @@ the last release is marked as good if there is no other good candidate. This lets the API establish a "baseline" release easily for first-time users. -- Only one release candidate may be receiving runs for a build at a time. +- Only one release candidate may be receiving runs for a build at a time. """ import datetime @@ -94,323 +94,324 @@ class Build(db.Model): - """A single repository of artifacts and diffs owned by someone. + """A single repository of artifacts and diffs owned by someone. - Queries: - - Get all builds for a specific owner. - - Can this user read this build. - - Can this user write this build. - """ + Queries: + - Get all builds for a specific owner. + - Can this user read this build. + - Can this user write this build. + """ - id = db.Column(db.Integer, primary_key=True) - created = db.Column(db.DateTime, default=datetime.datetime.utcnow) - name = db.Column(db.String) - # TODO: Add owner + id = db.Column(db.Integer, primary_key=True) + created = db.Column(db.DateTime, default=datetime.datetime.utcnow) + name = db.Column(db.String) + # TODO: Add owner class Release(db.Model): - """A set of runs that are part of a build, grouped by a user-supplied name. - - Queries: - - For a build, find me the active release with this name. - - Mark this release as abandoned. - - Show me all active releases for this build by unique name in order - of creation date descending. - """ - - RECEIVING = 'receiving' - PROCESSING = 'processing' - REVIEWING = 'reviewing' - BAD = 'bad' - GOOD = 'good' - STATES = frozenset([RECEIVING, PROCESSING, REVIEWING, BAD, GOOD]) - - id = db.Column(db.Integer, primary_key=True) - name = db.Column(db.String, nullable=False) - number = db.Column(db.Integer, nullable=False) - created = db.Column(db.DateTime, default=datetime.datetime.utcnow) - status = db.Column(db.Enum(*STATES), default=RECEIVING, nullable=False) - build_id = db.Column(db.Integer, db.ForeignKey('build.id'), nullable=False) + """A set of runs that are part of a build, grouped by a user-supplied name. + + Queries: + - For a build, find me the active release with this name. + - Mark this release as abandoned. + - Show me all active releases for this build by unique name in order + of creation date descending. + """ + + RECEIVING = 'receiving' + PROCESSING = 'processing' + REVIEWING = 'reviewing' + BAD = 'bad' + GOOD = 'good' + STATES = frozenset([RECEIVING, PROCESSING, REVIEWING, BAD, GOOD]) + + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String, nullable=False) + number = db.Column(db.Integer, nullable=False) + created = db.Column(db.DateTime, default=datetime.datetime.utcnow) + status = db.Column(db.Enum(*STATES), default=RECEIVING, nullable=False) + build_id = db.Column(db.Integer, db.ForeignKey('build.id'), nullable=False) class Artifact(db.Model): - """Contains a single file uploaded by a diff worker.""" + """Contains a single file uploaded by a diff worker.""" - id = db.Column(db.String(40), primary_key=True) - created = db.Column(db.DateTime, default=datetime.datetime.utcnow) - data = db.Column(db.LargeBinary) - content_type = db.Column(db.String) + id = db.Column(db.String(40), primary_key=True) + created = db.Column(db.DateTime, default=datetime.datetime.utcnow) + data = db.Column(db.LargeBinary) + content_type = db.Column(db.String) class Run(db.Model): - """Contains a set of screenshot records uploaded by a diff worker. + """Contains a set of screenshot records uploaded by a diff worker. - Queries: - - Show me all runs for the given release. - - Show me all runs with the given name for all releases that are live. - """ + Queries: + - Show me all runs for the given release. + - Show me all runs with the given name for all releases that are live. + """ - id = db.Column(db.Integer, primary_key=True) - release_id = db.Column(db.Integer, nullable=False) - name = db.Column(db.String, nullable=False) + id = db.Column(db.Integer, primary_key=True) + release_id = db.Column(db.Integer, nullable=False) + name = db.Column(db.String, nullable=False) - created = db.Column(db.DateTime, default=datetime.datetime.utcnow) - image = db.Column(db.String, db.ForeignKey('artifact.id')) - log = db.Column(db.String, db.ForeignKey('artifact.id')) - config = db.Column(db.String, db.ForeignKey('artifact.id')) + created = db.Column(db.DateTime, default=datetime.datetime.utcnow) + image = db.Column(db.String, db.ForeignKey('artifact.id')) + log = db.Column(db.String, db.ForeignKey('artifact.id')) + config = db.Column(db.String, db.ForeignKey('artifact.id')) - previous_id = db.Column(db.Integer, db.ForeignKey('run.id')) + previous_id = db.Column(db.Integer, db.ForeignKey('run.id')) - needs_diff = db.Column(db.Boolean) - diff_image = db.Column(db.String, db.ForeignKey('artifact.id')) - diff_log = db.Column(db.String, db.ForeignKey('artifact.id')) + needs_diff = db.Column(db.Boolean) + diff_image = db.Column(db.String, db.ForeignKey('artifact.id')) + diff_log = db.Column(db.String, db.ForeignKey('artifact.id')) @app.route('/api/build', methods=['POST']) def create_build(): - """Creates a new build for a user.""" - # TODO: Make sure the requesting user is logged in - name = request.form.get('name') - utils.jsonify_assert(name, 'name required') + """Creates a new build for a user.""" + # TODO: Make sure the requesting user is logged in + name = request.form.get('name') + utils.jsonify_assert(name, 'name required') - build = Build(name=name) - db.session.add(build) - db.session.commit() + build = Build(name=name) + db.session.add(build) + db.session.commit() - logging.info('Created build: build_id=%s, name=%r', build.id, name) + logging.info('Created build: build_id=%s, name=%r', build.id, name) - return flask.jsonify(build_id=build.id, name=name) + return flask.jsonify(build_id=build.id, name=name) @app.route('/api/release', methods=['POST']) def create_release(): - """Creates a new release candidate for a build.""" - build_id = request.form.get('build_id', type=int) - utils.jsonify_assert(build_id is not None, 'build_id required') - name = request.form.get('name') - utils.jsonify_assert(name, 'name required') - # TODO: Make sure build_id exists - # TODO: Make sure requesting user is owner of the build_id - - release = Release( - name=name, - number=1, - build_id=build_id) - - last_candidate = ( - Release.query - .filter_by(build_id=build_id, name=name) - .order_by(Release.number.desc()) - .first()) - if last_candidate: - release.number += last_candidate.number + """Creates a new release candidate for a build.""" + build_id = request.form.get('build_id', type=int) + utils.jsonify_assert(build_id is not None, 'build_id required') + name = request.form.get('name') + utils.jsonify_assert(name, 'name required') + # TODO: Make sure build_id exists + # TODO: Make sure requesting user is owner of the build_id + + release = Release( + name=name, + number=1, + build_id=build_id) + + last_candidate = ( + Release.query + .filter_by(build_id=build_id, name=name) + .order_by(Release.number.desc()) + .first()) + if last_candidate: + release.number += last_candidate.number - db.session.add(release) - db.session.commit() + db.session.add(release) + db.session.commit() - logging.info('Created release: build_id=%s, name=%r, number=%d', - build_id, name, release.number) + logging.info('Created release: build_id=%s, name=%r, number=%d', + build_id, name, release.number) - return flask.jsonify(build_id=build_id, name=name, number=release.number) + return flask.jsonify(build_id=build_id, name=name, number=release.number) def _check_release_done_processing(release_id): - """Moves a release candidate to reviewing if all runs are done processing.""" - release = Release.query.get(release_id) - if not release: - logging.error('Could not find release_id=%s', release_id) - return False + """Moves a release candidate to reviewing if all runs are done processing.""" + release = Release.query.get(release_id) + if not release: + logging.error('Could not find release_id=%s', release_id) + return False - if release.status != Release.PROCESSING: - logging.error('Already done processing: release_id=%s', release_id) - return False + if release.status != Release.PROCESSING: + logging.error('Already done processing: release_id=%s', release_id) + return False - query = Run.query.filter_by(release_id=release.id) - for run in query: - if run.needs_diff: - return False + query = Run.query.filter_by(release_id=release.id) + for run in query: + if run.needs_diff: + return False - logging.info('Release done processing, now reviewing: build_id=%s, name=%s, ' - 'number=%d', release.build_id, release.name, release.number) + logging.info('Release done processing, now reviewing: build_id=%s, ' + 'name=%s, number=%d', release.build_id, release.name, + release.number) - release.status = Release.REVIEWING - db.session.add(release) - return True + release.status = Release.REVIEWING + db.session.add(release) + return True def _get_release_params(): - """Gets the release params from the current request.""" - build_id = request.form.get('build_id', type=int) - utils.jsonify_assert(build_id is not None, 'build_id required') - name = request.form.get('name') - utils.jsonify_assert(name, 'name required') - number = request.form.get('number', type=int) - utils.jsonify_assert(number is not None, 'number required') - return build_id, name, number + """Gets the release params from the current request.""" + build_id = request.form.get('build_id', type=int) + utils.jsonify_assert(build_id is not None, 'build_id required') + name = request.form.get('name') + utils.jsonify_assert(name, 'name required') + number = request.form.get('number', type=int) + utils.jsonify_assert(number is not None, 'number required') + return build_id, name, number @app.route('/api/report_run', methods=['POST']) def report_run(): - """Reports a new run for a release candidate.""" - build_id, name, number = _get_release_params() - - release = ( - Release.query - .filter_by(build_id=build_id, name=name, number=number) - .first()) - utils.jsonify_assert(release, 'release does not exist') - # TODO: Make sure requesting user is owner of the build_id - - current_image = request.form.get('image', type=str) - utils.jsonify_assert(current_image, 'image must be supplied') - current_log = request.form.get('log', type=str) - current_config = request.form.get('config', type=str) - no_diff = request.form.get('no_diff') - diff_image = request.form.get('diff_image', type=str) - diff_log = request.form.get('diff_log', type=str) - needs_diff = not (no_diff or diff_image or diff_log) - - # Find the previous corresponding run and automatically connect it. - last_good_release = ( - Release.query - .filter_by(build_id=build_id, name=name, status=Release.GOOD) - .order_by(Release.created.desc()) - .first()) - previous_id = None - if last_good_release: - last_good_run = ( - Run.query - .filter_by(release_id=last_good_release.id, name=name) - .first()) - if last_good_run: - previous_id = last_good_run.id - - fields = dict( - name=name, - release_id=release.id, - image=current_image, - log=current_log, - config=current_config, - previous_id=previous_id, - needs_diff=needs_diff, - diff_image=diff_image, - diff_log=diff_log) - run = Run(**fields) - db.session.add(run) - db.session.flush() - - fields.update(run_id=run.id) + """Reports a new run for a release candidate.""" + build_id, name, number = _get_release_params() - # Schedule pdiff if there isn't already an image. - if needs_diff: - work_queue.add('run-pdiff', dict(run_id=run.id)) - - db.session.commit() - - logging.info('Created run: build_id=%s, name=%r, number=%d', - build_id, name, number) - - return flask.jsonify(**fields) + release = ( + Release.query + .filter_by(build_id=build_id, name=name, number=number) + .first()) + utils.jsonify_assert(release, 'release does not exist') + # TODO: Make sure requesting user is owner of the build_id + + current_image = request.form.get('image', type=str) + utils.jsonify_assert(current_image, 'image must be supplied') + current_log = request.form.get('log', type=str) + current_config = request.form.get('config', type=str) + no_diff = request.form.get('no_diff') + diff_image = request.form.get('diff_image', type=str) + diff_log = request.form.get('diff_log', type=str) + needs_diff = not (no_diff or diff_image or diff_log) + + # Find the previous corresponding run and automatically connect it. + last_good_release = ( + Release.query + .filter_by(build_id=build_id, name=name, status=Release.GOOD) + .order_by(Release.created.desc()) + .first()) + previous_id = None + if last_good_release: + last_good_run = ( + Run.query + .filter_by(release_id=last_good_release.id, name=name) + .first()) + if last_good_run: + previous_id = last_good_run.id + + fields = dict( + name=name, + release_id=release.id, + image=current_image, + log=current_log, + config=current_config, + previous_id=previous_id, + needs_diff=needs_diff, + diff_image=diff_image, + diff_log=diff_log) + run = Run(**fields) + db.session.add(run) + db.session.flush() + + fields.update(run_id=run.id) + + # Schedule pdiff if there isn't already an image. + if needs_diff: + work_queue.add('run-pdiff', dict(run_id=run.id)) + + db.session.commit() + + logging.info('Created run: build_id=%s, name=%r, number=%d', + build_id, name, number) + + return flask.jsonify(**fields) @app.route('/api/report_pdiff', methods=['POST']) def report_pdiff(): - """Reports a pdiff for a run. + """Reports a pdiff for a run. - When there is no diff to report, supply the "no_diff" parameter. - """ - run_id = request.form.get('run_id', type=int) - utils.jsonify_assert(run_id is not None, 'run_id required') - no_diff = request.form.get('no_diff') + When there is no diff to report, supply the "no_diff" parameter. + """ + run_id = request.form.get('run_id', type=int) + utils.jsonify_assert(run_id is not None, 'run_id required') + no_diff = request.form.get('no_diff') - run = Run.query.get(run_id) - utils.jsonify_assert(run, 'Run does not exist') + run = Run.query.get(run_id) + utils.jsonify_assert(run, 'Run does not exist') - run.needs_diff = not (no_diff or run.diff_image or run.diff_log) - run.diff_image = request.form.get('diff_image', type=int) - run.diff_log = request.form.get('diff_log', type=int) + run.needs_diff = not (no_diff or run.diff_image or run.diff_log) + run.diff_image = request.form.get('diff_image', type=int) + run.diff_log = request.form.get('diff_log', type=int) - db.session.add(run) + db.session.add(run) - logging.info('Saved pdiff: run_id=%s, no_diff=%s, diff_image=%s, ' - 'diff_log=%s', run_id, no_diff, run.diff_image, run.diff_log) + logging.info('Saved pdiff: run_id=%s, no_diff=%s, diff_image=%s, ' + 'diff_log=%s', run_id, no_diff, run.diff_image, run.diff_log) - _check_release_done_processing(run.release_id) - db.session.commit() + _check_release_done_processing(run.release_id) + db.session.commit() - return flask.jsonify(success=True) + return flask.jsonify(success=True) @app.route('/api/runs_done', methods=['POST']) def runs_done(): - """Marks a release candidate as having all runs reported.""" - build_id, name, number = _get_release_params() + """Marks a release candidate as having all runs reported.""" + build_id, name, number = _get_release_params() - release = ( - Release.query - .filter_by(build_id=build_id, name=name, number=number) - .first()) - utils.jsonify_assert(release, 'Release does not exist') + release = ( + Release.query + .filter_by(build_id=build_id, name=name, number=number) + .first()) + utils.jsonify_assert(release, 'Release does not exist') - release.status = Release.PROCESSING - db.session.add(release) - _check_release_done_processing(release) - db.session.commit() + release.status = Release.PROCESSING + db.session.add(release) + _check_release_done_processing(release) + db.session.commit() - logging.info('Runs done for release: build_id=%s, name=%s, number=%d', - build_id, name, number) + logging.info('Runs done for release: build_id=%s, name=%s, number=%d', + build_id, name, number) - return flask.jsonify(success=True) + return flask.jsonify(success=True) @app.route('/api/release_done', methods=['POST']) def release_done(): - """Marks a release candidate as good or bad.""" - build_id, name, number = _get_release_params() - status = request.form.get('status') - valid_statuses = (Release.GOOD, Release.BAD) - utils.jsonify_assert(status in valid_statuses, - 'status must be in %r' % valid_statuses) - - release = ( - Release.query - .filter_by(build_id=build_id, name=name, number=number) - .first()) - utils.jsonify_assert(release, 'Release does not exist') + """Marks a release candidate as good or bad.""" + build_id, name, number = _get_release_params() + status = request.form.get('status') + valid_statuses = (Release.GOOD, Release.BAD) + utils.jsonify_assert(status in valid_statuses, + 'status must be in %r' % valid_statuses) + + release = ( + Release.query + .filter_by(build_id=build_id, name=name, number=number) + .first()) + utils.jsonify_assert(release, 'Release does not exist') - release.status = status - db.session.add(release) - db.session.commit() + release.status = status + db.session.add(release) + db.session.commit() - logging.info('Release marked as %s: build_id=%s, name=%s, number=%d', - status, build_id, name, number) + logging.info('Release marked as %s: build_id=%s, name=%s, number=%d', + status, build_id, name, number) - return flask.jsonify(success=True) + return flask.jsonify(success=True) @app.route('/api/upload', methods=['POST']) def upload(): - """Uploads an artifact referenced by a run.""" - # TODO: Require an API key on the basic auth header - utils.jsonify_assert(len(request.files) == 1, - 'Need exactly one uploaded file') - - file_storage = request.files.values()[0] - data = file_storage.read() - sha1sum = hashlib.sha1(data).hexdigest() - exists = Artifact.query.filter_by(id=sha1sum).first() - if exists: - logging.info('Upload already exists: artifact_id=%s', sha1sum) + """Uploads an artifact referenced by a run.""" + # TODO: Require an API key on the basic auth header + utils.jsonify_assert(len(request.files) == 1, + 'Need exactly one uploaded file') + + file_storage = request.files.values()[0] + data = file_storage.read() + sha1sum = hashlib.sha1(data).hexdigest() + exists = Artifact.query.filter_by(id=sha1sum).first() + if exists: + logging.info('Upload already exists: artifact_id=%s', sha1sum) + return flask.jsonify(sha1sum=sha1sum) + + content_type, _ = mimetypes.guess_type(file_storage.filename) + artifact = Artifact( + id=sha1sum, + content_type=content_type, + data=data) + db.session.add(artifact) + db.session.commit() + + logging.info('Upload received: artifact_id=%s, content_type=%s', + sha1sum, content_type) return flask.jsonify(sha1sum=sha1sum) - - content_type, _ = mimetypes.guess_type(file_storage.filename) - artifact = Artifact( - id=sha1sum, - content_type=content_type, - data=data) - db.session.add(artifact) - db.session.commit() - - logging.info('Upload received: artifact_id=%s, content_type=%s', - sha1sum, content_type) - return flask.jsonify(sha1sum=sha1sum) diff --git a/server/dpxdt/utils.py b/server/dpxdt/utils.py index 35c41f2..45bbd01 100644 --- a/server/dpxdt/utils.py +++ b/server/dpxdt/utils.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - +# # http://www.apache.org/licenses/LICENSE-2.0 - +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,27 +23,27 @@ def jsonify_assert(asserted, message, status_code=400): - """Asserts something is true, aborts the request if not.""" - if asserted: - return - try: - raise AssertionError(message) - except AssertionError, e: - stack = traceback.extract_stack() - stack.pop() - logging.error('Assertion failed: %s\n%s', - str(e), ''.join(traceback.format_list(stack))) - flask.abort(jsonify_error(e, status_code=status_code)) + """Asserts something is true, aborts the request if not.""" + if asserted: + return + try: + raise AssertionError(message) + except AssertionError, e: + stack = traceback.extract_stack() + stack.pop() + logging.error('Assertion failed: %s\n%s', + str(e), ''.join(traceback.format_list(stack))) + flask.abort(jsonify_error(e, status_code=status_code)) def jsonify_error(message_or_exception, status_code=400): - """Returns a JSON payload that indicates the request had an error.""" - if isinstance(message_or_exception, Exception): - message = '%s: %s' % ( - message_or_exception.__class__.__name__, message_or_exception) - else: - message = message_or_exception - - response = flask.jsonify(error=message) - response.status_code = status_code - return response + """Returns a JSON payload that indicates the request had an error.""" + if isinstance(message_or_exception, Exception): + message = '%s: %s' % ( + message_or_exception.__class__.__name__, message_or_exception) + else: + message = message_or_exception + + response = flask.jsonify(error=message) + response.status_code = status_code + return response diff --git a/server/dpxdt/work_queue.py b/server/dpxdt/work_queue.py index b1fe173..c839a03 100644 --- a/server/dpxdt/work_queue.py +++ b/server/dpxdt/work_queue.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - +# # http://www.apache.org/licenses/LICENSE-2.0 - +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,244 +32,245 @@ class Error(Exception): - """Base class for exceptions in this module.""" + """Base class for exceptions in this module.""" class TaskDoesNotExistError(Error): - """Task with the given ID does not exist and cannot be finished.""" + """Task with the given ID does not exist and cannot be finished.""" class LeaseExpiredError(Error): - """Owner's lease on the task has expired, not completing task.""" + """Owner's lease on the task has expired, not completing task.""" class NotOwnerError(Error): - """Requestor is no longer the owner of the task.""" + """Requestor is no longer the owner of the task.""" class WorkQueue(db.Model): - """Represents a single item of work to do in a specific queue. + """Represents a single item of work to do in a specific queue. - Queries: - - By task_id for finishing a task or extending a lease. - - By Index(queue_name, live, eta) for finding the oldest task for a queue - that is still pending. - - By Index(live, create) for finding old tasks that should be deleted from - the table periodically to free up space. - """ + Queries: + - By task_id for finishing a task or extending a lease. + - By Index(queue_name, live, eta) for finding the oldest task for a queue + that is still pending. + - By Index(live, create) for finding old tasks that should be deleted from + the table periodically to free up space. + """ - task_id = db.Column(db.String(100), primary_key=True, nullable=False) - queue_name = db.Column(db.String(100), primary_key=True, nullable=False) - live = db.Column(db.Boolean, default=True, nullable=False) - eta = db.Column(db.DateTime, default=datetime.datetime.utcnow, - nullable=False) + task_id = db.Column(db.String(100), primary_key=True, nullable=False) + queue_name = db.Column(db.String(100), primary_key=True, nullable=False) + live = db.Column(db.Boolean, default=True, nullable=False) + eta = db.Column(db.DateTime, default=datetime.datetime.utcnow, + nullable=False) - source = db.Column(db.String) - created = db.Column(db.DateTime, default=datetime.datetime.utcnow) + source = db.Column(db.String) + created = db.Column(db.DateTime, default=datetime.datetime.utcnow) - lease_attempts = db.Column(db.Integer, default=0, nullable=False) - last_owner = db.Column(db.String) - last_lease = db.Column(db.DateTime, default=datetime.datetime.utcnow) + lease_attempts = db.Column(db.Integer, default=0, nullable=False) + last_owner = db.Column(db.String) + last_lease = db.Column(db.DateTime, default=datetime.datetime.utcnow) - payload = db.Column(db.LargeBinary) - content_type = db.Column(db.String) + payload = db.Column(db.LargeBinary) + content_type = db.Column(db.String) - __table_args__ = ( - db.Index('lease_index', 'queue_name', 'live', 'eta'), - db.Index('reap_index', 'live', 'created'), - ) + __table_args__ = ( + db.Index('lease_index', 'queue_name', 'live', 'eta'), + db.Index('reap_index', 'live', 'created'), + ) def add(queue_name, payload=None, content_type=None, - source=None, task_id=None): - """Adds a work item to a queue. - - Args: - queue_name: Name of the queue to add the work item to. - payload: Optional. Payload that describes the work to do as a string. - If not a string and content_type is not provided, then this function - assumes the payload is a JSON-able Python object. - content_type: Optional. Content type of the payload. - source: Optional. Who or what originally created the task. - task_id: Optional. When supplied, only enqueue this task if a task - with this ID does not already exist. If a task with this ID already - exists, then this function will do nothing. - - Returns: - ID of the task that was added. - """ - if task_id: - task = WorkQueue.query.filter_by(task_id=task_id).first() - if task: - return task.task_id - - if payload and not content_type and not isinstance(payload, basestring): - payload = json.dumps(payload) - content_type = 'application/json' - - now = datetime.datetime.utcnow() - task = WorkQueue( - task_id=uuid.uuid4().hex, - queue_name=queue_name, - eta=now, - source=source, - payload=payload, - content_type=content_type) - db.session.add(task) - - return task.task_id + source=None, task_id=None): + """Adds a work item to a queue. + + Args: + queue_name: Name of the queue to add the work item to. + payload: Optional. Payload that describes the work to do as a string. + If not a string and content_type is not provided, then this + function assumes the payload is a JSON-able Python object. + content_type: Optional. Content type of the payload. + source: Optional. Who or what originally created the task. + task_id: Optional. When supplied, only enqueue this task if a task + with this ID does not already exist. If a task with this ID already + exists, then this function will do nothing. + + Returns: + ID of the task that was added. + """ + if task_id: + task = WorkQueue.query.filter_by(task_id=task_id).first() + if task: + return task.task_id + + if payload and not content_type and not isinstance(payload, basestring): + payload = json.dumps(payload) + content_type = 'application/json' + + now = datetime.datetime.utcnow() + task = WorkQueue( + task_id=uuid.uuid4().hex, + queue_name=queue_name, + eta=now, + source=source, + payload=payload, + content_type=content_type) + db.session.add(task) + + return task.task_id def _datetime_to_epoch_seconds(dt): - """Converts a datetime.datetime to seconds since the epoch.""" - if dt is None: - return None - return int(time.mktime(dt.utctimetuple())) + """Converts a datetime.datetime to seconds since the epoch.""" + if dt is None: + return None + return int(time.mktime(dt.utctimetuple())) def _task_to_dict(task): - """Converts a WorkQueue to a JSON-able dictionary.""" - return dict( - task_id=task.task_id, - queue_name=task.queue_name, - eta=_datetime_to_epoch_seconds(task.eta), - source=task.source, - created=_datetime_to_epoch_seconds(task.created), - lease_attempts=task.lease_attempts, - last_lease=_datetime_to_epoch_seconds(task.last_lease), - payload=task.payload, - content_type=task.content_type) + """Converts a WorkQueue to a JSON-able dictionary.""" + return dict( + task_id=task.task_id, + queue_name=task.queue_name, + eta=_datetime_to_epoch_seconds(task.eta), + source=task.source, + created=_datetime_to_epoch_seconds(task.created), + lease_attempts=task.lease_attempts, + last_lease=_datetime_to_epoch_seconds(task.last_lease), + payload=task.payload, + content_type=task.content_type) def lease(queue_name, owner, timeout): - """Leases a work item from a queue. - - Args: - queue_name: Name of the queue to lease work from. - owner: Who or what is leasing the task. - timeout: Number of seconds to lock the task for before allowing - another owner to lease it. - - Returns: - Dictionary representing the task that was leased, or None if no - task is available to be leased. - """ - now = datetime.datetime.utcnow() - query = ( - WorkQueue.query - .filter_by(queue_name=queue_name, live=True) - .filter(WorkQueue.eta <= now)) - task = query.first() - if not task: - return None - - task.eta += datetime.timedelta(seconds=timeout) - task.lease_attempts += 1 - task.last_owner = owner - task.last_lease = now - db.session.add(task) - - return _task_to_dict(task) + """Leases a work item from a queue. + + Args: + queue_name: Name of the queue to lease work from. + owner: Who or what is leasing the task. + timeout: Number of seconds to lock the task for before allowing + another owner to lease it. + + Returns: + Dictionary representing the task that was leased, or None if no + task is available to be leased. + """ + now = datetime.datetime.utcnow() + query = ( + WorkQueue.query + .filter_by(queue_name=queue_name, live=True) + .filter(WorkQueue.eta <= now)) + task = query.first() + if not task: + return None + + task.eta += datetime.timedelta(seconds=timeout) + task.lease_attempts += 1 + task.last_owner = owner + task.last_lease = now + db.session.add(task) + + return _task_to_dict(task) def finish(queue_name, task_id, owner): - """Marks a work item on a queue as finished. - - Args: - queue_name: Name of the queue the work item is on. - task_id: ID of the task that is finished. - owner: Who or what has the current lease on the task. - - Returns: - True if the task has been finished for the first time; False if the - task was already finished. - - Raises: - TaskDoesNotExistError if the task does not exist. - LeaseExpiredError if the lease is no longer active. - NotOwnerError if the specified owner no longer owns the task. - """ - now = datetime.datetime.utcnow() - task = WorkQueue.query.filter_by(queue_name=queue_name, - task_id=task_id).first() - if not task: - raise TaskDoesNotExistError('task_id=%s' % task_id) - - delta = task.eta - now - if delta < datetime.timedelta(0): - raise LeaseExpiredError('queue=%s, task_id=%s expired %s' % ( - task.queue_name, task_id, delta)) - - if task.last_owner != owner: - raise NotOwnerError('queue=%s, task_id=%s, owner=%s' % ( - task.queue_name, task_id, task.last_owner)) - - if not task.live: - logging.warning('Finishing already dead task. queue=%s, task_id=%s, ' - 'owner=%s', task.queue_name, task_id, owner) - return False - - task.live = False - db.session.add(task) - return True + """Marks a work item on a queue as finished. + + Args: + queue_name: Name of the queue the work item is on. + task_id: ID of the task that is finished. + owner: Who or what has the current lease on the task. + + Returns: + True if the task has been finished for the first time; False if the + task was already finished. + + Raises: + TaskDoesNotExistError if the task does not exist. + LeaseExpiredError if the lease is no longer active. + NotOwnerError if the specified owner no longer owns the task. + """ + now = datetime.datetime.utcnow() + task = WorkQueue.query.filter_by( + queue_name=queue_name, + task_id=task_id).first() + if not task: + raise TaskDoesNotExistError('task_id=%s' % task_id) + + delta = task.eta - now + if delta < datetime.timedelta(0): + raise LeaseExpiredError('queue=%s, task_id=%s expired %s' % ( + task.queue_name, task_id, delta)) + + if task.last_owner != owner: + raise NotOwnerError('queue=%s, task_id=%s, owner=%s' % ( + task.queue_name, task_id, task.last_owner)) + + if not task.live: + logging.warning('Finishing already dead task. queue=%s, task_id=%s, ' + 'owner=%s', task.queue_name, task_id, owner) + return False + + task.live = False + db.session.add(task) + return True @app.route('/api/work_queue//add', methods=['POST']) def handle_add(queue_name): - """Adds a task to a queue.""" - # TODO: Require an API key on the basic auth header - try: - task_id = add( - queue_name, - payload=request.form.get('payload', type=str), - content_type=request.form.get('content_type', type=str), - source=request.form.get('source', request.remote_addr, type=str), - task_id=request.form.get('task_id', type=str)) - except Error, e: - error = '%s: %s' % (e.__class__.__name__, e) - logging.error('Could not add task request=%r. %s', request, error) - response = flask.jsonify(error=error) - response.status_code = 400 - return response - - db.session.commit() - logging.info('Task added: queue=%s, task_id=%s', queue_name, task_id) - return flask.jsonify(task_id=task_id) + """Adds a task to a queue.""" + # TODO: Require an API key on the basic auth header + try: + task_id = add( + queue_name, + payload=request.form.get('payload', type=str), + content_type=request.form.get('content_type', type=str), + source=request.form.get('source', request.remote_addr, type=str), + task_id=request.form.get('task_id', type=str)) + except Error, e: + error = '%s: %s' % (e.__class__.__name__, e) + logging.error('Could not add task request=%r. %s', request, error) + response = flask.jsonify(error=error) + response.status_code = 400 + return response + + db.session.commit() + logging.info('Task added: queue=%s, task_id=%s', queue_name, task_id) + return flask.jsonify(task_id=task_id) @app.route('/api/work_queue//lease', methods=['POST']) def handle_lease(queue_name): - """Leases a task from a queue.""" - # TODO: Require an API key on the basic auth header - task = lease( - queue_name, - request.form.get('owner', request.remote_addr, type=str), - request.form.get('timeout', 60, type=int)) + """Leases a task from a queue.""" + # TODO: Require an API key on the basic auth header + task = lease( + queue_name, + request.form.get('owner', request.remote_addr, type=str), + request.form.get('timeout', 60, type=int)) - if not task: - return flask.jsonify(tasks=[]) + if not task: + return flask.jsonify(tasks=[]) - if task['payload'] and task['content_type'] == 'application/json': - task['payload'] = json.loads(task['payload']) + if task['payload'] and task['content_type'] == 'application/json': + task['payload'] = json.loads(task['payload']) - db.session.commit() - logging.info('Task leased: queue=%s, task_id=%s', - queue_name, task['task_id']) - return flask.jsonify(tasks=[task]) + db.session.commit() + logging.info('Task leased: queue=%s, task_id=%s', + queue_name, task['task_id']) + return flask.jsonify(tasks=[task]) @app.route('/api/work_queue//finish', methods=['POST']) def handle_finish(queue_name): - """Marks a task on a queue as finished.""" - # TODO: Require an API key on the basic auth header - try: - finish( - queue_name, - request.form.get('task_id', type=str), - request.form.get('owner', request.remote_addr, type=str)) - except Error, e: - error = '%s: %s' % (e.__class__.__name__, e) - logging.error('Could not add task request=%r. %s', request, error) - response = flask.jsonify(error=error) - response.status_code = 400 - return response - - db.session.commit() - return flask.jsonify(success=True) + """Marks a task on a queue as finished.""" + # TODO: Require an API key on the basic auth header + try: + finish( + queue_name, + request.form.get('task_id', type=str), + request.form.get('owner', request.remote_addr, type=str)) + except Error, e: + error = '%s: %s' % (e.__class__.__name__, e) + logging.error('Could not add task request=%r. %s', request, error) + response = flask.jsonify(error=error) + response.status_code = 400 + return response + + db.session.commit() + return flask.jsonify(success=True) diff --git a/server/runserver.py b/server/runserver.py index 7af7a0c..020a0ff 100755 --- a/server/runserver.py +++ b/server/runserver.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # Copyright 2013 Brett Slatkin - +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - +# # http://www.apache.org/licenses/LICENSE-2.0 - +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,4 +22,4 @@ import dpxdt if __name__ == '__main__': - dpxdt.app.run(debug=True, use_debugger=True) + dpxdt.app.run(debug=True, use_debugger=True)