From 627765c2bd48e659c16d3944581ca5f524c9dcc1 Mon Sep 17 00:00:00 2001 From: Kyle Sun Date: Thu, 3 Sep 2015 17:10:10 -0700 Subject: [PATCH] initial commit --- LICENSE | 202 +++++++++++++++++++++++++++++++++++++++++++++++ README.md | 87 ++++++++++++++++++++ index.coffee | 12 +++ package.json | 25 ++++++ script/bootstrap | 18 +++++ script/test | 6 ++ src/luigi.coffee | 202 +++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 552 insertions(+) create mode 100644 LICENSE create mode 100644 README.md create mode 100644 index.coffee create mode 100644 package.json create mode 100644 script/bootstrap create mode 100644 script/test create mode 100644 src/luigi.coffee diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..cbe17d8 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2015 Houzz Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..667d2c5 --- /dev/null +++ b/README.md @@ -0,0 +1,87 @@ +# Hubot Luigi Script + +Interact with luigi central scheduler, to get job status, worker status, etc. + + +## Installation + +Add the package `hubot-luigi` entry to the `external-scripts.json` file +(you may need to create this file). + + "dependencies": { + "hubot-luigi": "1.0.x" + } + +Run the following command to make sure the module is installed. + + #npm install hubot-luigi + +To enable the script, add the `hubot-luigi` entry to the `packages.json` +file (you may need to create this file). + + ["hubots-luigi"] + + +## Configuration +* HUBOT_LUIGI_ENDPOINT - specify luigi scheduler api endpoint, like 'http://localhost:8082/api/' + +## Usage Examples +### stats + user1> hubot luigi stats + hubot> 8 jobs running, 1315 jobs pending, 76 jobs failed, 5737 jobs disabled + + +### show + user1> hubot luigi show pending + hubot> Wed Sep 02 2015 13:27:36 GMT-0700 (PDT) Task(param1=value) p=50 + Wed Sep 02 2015 13:27:36 GMT-0700 (PDT) WrapperTask(run=all_tasks) p=50 + Wed Sep 02 2015 13:27:36 GMT-0700 (PDT) HadoopHourlyJob(jar=process_logs.jar, time=2015-08-30T00) p=100 + Wed Sep 02 2015 13:27:36 GMT-0700 (PDT) ScpTask(file=filename.csv, target=server) p=10 + +Show can be run with any task status, including pending, running, disabled, failed. If more than 20 +tasks are to be shown, it will truncate the list and state how many more there are. + + +### search + user1> hubot luigi search Hadoop + hubot> DONE Sun Aug 30 2015 00:12:21 GMT-0700 (PDT) HadoopHourlyJob(jar=process_logs.jar, time=2015-08-30T01) p=100 + DONE Sun Aug 30 2015 00:12:21 GMT-0700 (PDT) HadoopHourlyJob(jar=process_logs.jar, time=2015-08-30T00) p=100 + DONE Sun Aug 30 2015 00:12:21 GMT-0700 (PDT) HadoopHourlyJob(jar=process_logs.jar, time=2015-08-30T02) p=100 + DONE Sun Aug 30 2015 01:12:18 GMT-0700 (PDT) HadoopHourlyJob(jar=analytics_reports.jar, time=2015-08-30T00) p=15 + +Search matches the query against task ids and is case sensitive. Results are truncated as in show. + + +### resources + user1> hubot luigi resources + hubot> impala: 3/4 + hadoop_xl: 2/2 + mysql_access: 0/5 + +This will only show resources that are defined in luigi.cfg and not ones that are created in response +to task scheduling. + + +### refresh resources + user1> hubot luigi refreshresources + hubot> impala: 3/2 + hadoop_xl: 2/3 + mysql_access: 0/4 + +This causes the scheduler to dynamically reload the resource constraints from luigi.cfg. The current +resource usage is then displayed, which can violate the new constraints until existing jobs have had +a chance to finish. + + +### workers + user1> hubot luigi workers + hubot> 781608491 AllTasksFrontFill(days=1, end_time=2015-09-02T13:30) [10] 1 running, 637 pending + 377433553 AllTasksFrontFill(days=1, end_time=2015-09-02T13:00) [10] 6 running, 486 pending + 042179238 AllTasksFrontFill(days=1, end_time=2015-09-01T16:00) [10] 0 running, 46 pending + 208362896 AllTasksFrontFill(days=5, end_time=2015-09-01T03:00) [10] 2 running, 496 pending + +### worker + user1> hubot luigi worker 208362896 + hubot> 208362896 AllTasksFrontFill(days=5, end_time=2015-09-01T03:00) [10] 3 running, 496 pending, 478 unique pending + + running tasks: NumberedTask(n=7), NumberedTask(n=18), NumberedTask(n=3) diff --git a/index.coffee b/index.coffee new file mode 100644 index 0000000..d00a198 --- /dev/null +++ b/index.coffee @@ -0,0 +1,12 @@ +fs = require 'fs' +path = require 'path' + +module.exports = (robot, scripts) -> + scriptsPath = path.resolve(__dirname, 'src') + fs.exists scriptsPath, (exists) -> + if exists + for script in fs.readdirSync(scriptsPath) + if scripts? and '*' not in scripts + robot.loadFile(scriptsPath, script) if script in scripts + else + robot.loadFile(scriptsPath, script) diff --git a/package.json b/package.json new file mode 100644 index 0000000..0feb177 --- /dev/null +++ b/package.json @@ -0,0 +1,25 @@ +{ + "name": "hubot-luigi", + "description": "A hubot script that interact with luigi scheduler", + "version": "1.0.0", + "author": "Kyle Sun ", + "license": "MIT", + "keywords": "hubot, hubot-scripts, luigi", + "repository": { + "type": "git", + "url": "git://github.com/Houzz/hubot-luigi.git" + }, + "bugs": { + "url": "https://github.com/Houzz/hubot-luigi/issues" + }, + "dependencies": {}, + "peerDependencies": { + }, + "devDependencies": { + }, + "main": "index.coffee", + "scripts": { + "test": "grunt test" + } +} + diff --git a/script/bootstrap b/script/bootstrap new file mode 100644 index 0000000..30e87e1 --- /dev/null +++ b/script/bootstrap @@ -0,0 +1,18 @@ +#!/bin/bash + +# Make sure everything is development forever +export NODE_ENV=development + +# Load environment specific environment variables +if [ -f .env ]; then + source .env +fi + +if [ -f .env.${NODE_ENV} ]; then + source .env.${NODE_ENV} +fi + +npm install + +# Make sure coffee and mocha are on the path +export PATH="node_modules/.bin:$PATH" diff --git a/script/test b/script/test new file mode 100644 index 0000000..538f08b --- /dev/null +++ b/script/test @@ -0,0 +1,6 @@ +#!/bin/bash + +# bootstrap environment +source script/bootstrap + +mocha --compilers coffee:coffee-script diff --git a/src/luigi.coffee b/src/luigi.coffee new file mode 100644 index 0000000..14b61c6 --- /dev/null +++ b/src/luigi.coffee @@ -0,0 +1,202 @@ +# Description: +# +# Viewing luigi stats +# +# Commands: +# hubot luigi stats - Show overall stats +# hubot luigi show - List RUNNING/PENDING/DONE tasks +# hubot luigi search - Search task by task id +# hubot luigi resources - Luigi resource summary +# hubot luigi refresh resources - Luigi refresh resources from disk +# hubot luigi workers - Luigi worker summary +# hubot luigi worker - Show worker details +# +# Configuration: +# HUBOT_LUIGI_ENDPOINT - luigi scheduler api endpoint, like 'http://localhost:8082/api/' +# +# URLS: +# https://github.com/spotify/luigi/ +# +# Author: +# interskh + + +luigiApiEndpoint = process.env.HUBOT_LUIGI_ENDPOINT + +module.exports = (robot) -> + robot.respond /luigi statu?s(\s*)$/i, (msg) -> + callLuigiTaskList msg, "RUNNING", (res) -> + running = numberOfTask(res) + callLuigiTaskList msg, "PENDING", (res) -> + pending = numberOfTask(res) + callLuigiTaskList msg, "FAILED", (res) -> + failed = numberOfTask(res) + callLuigiTaskList msg, "DISABLED", (res) -> + disabled = numberOfTask(res) + msg.send running + " jobs running, " + pending + " jobs pending, " + failed + " jobs failed, " + disabled + " jobs disabled" + + robot.respond /luigi show( all)? (.*)(\s*)/i, (msg) -> + status = msg.match[2].toUpperCase() + callLuigiTaskList msg, status, (res) -> + results = [] + for t in sortTask(res) + results.push(formatTask(t[0], t[1])) + sendLimitedResult(msg, results, 20) + + robot.respond /luigi search (.*)(\s*)/i, (msg) -> + callLuigiTaskSearch msg, msg.match[1], (res) -> + results = [] + counts = {} + for status, d of res + counts[status] = Object.keys(res[status]).length + for t in sortTask(d) + results.push(status + " " + formatTask(t[0], t[1])) + if results.length > 0 + counts_message = "summary: " + for status, count of counts + counts_message += count + " " + status + ", " + msg.send counts_message + sendLimitedResult(msg, results, 20) + + robot.respond /luigi resources(\s*)/i, (msg) -> + callLuigiResources msg, (res) -> + results = [] + for r in sortResource(res) + resource = r[0] + d = res[resource] + results.push(resource + " : " + d.used + "/" + d.total) + msg.send results.join("\n") + + robot.respond /luigi workers(\s*)/i, (msg) -> + callLuigiWorkers msg, (res) -> + results = [] + for w in res + results.push(w.salt + " " + w.first_task + " [" + w.workers + "] " + w.num_running + " running, " + w.num_pending + " pending") + msg.send results.join("\n") + + robot.respond /luigi worker (.*)(\s*)/i, (msg) -> + search = msg.match[1] + callLuigiWorkers msg, (res) -> + for w in res + if w.salt == search + msg.send(w.salt + " " + w.first_task + " [" + w.workers + "] " + w.num_running + " running, " + w.num_pending + " pending, " + w.num_uniques + " uniq pending") + if w.num_running > 0 + tasks = [] + for task_id, task of w.running + tasks.push(task_id) + msg.send("running tasks: " + tasks.join(", ")) + + robot.respond /luigi refresh resources(\s*)/i, (msg) -> + callLuigiUpdateResources msg, (res) -> + callLuigiResources msg, (res) -> + results = [] + for r in sortResource(res) + resource = r[0] + d = res[resource] + results.push(resource + " : " + d.used + "/" + d.total) + msg.send results.join("\n") + +sendLimitedResult = (msg, results, n=0) -> + if results.length > 0 + if n > 0 + if results.length > n + msg.send results.slice(0, n).join("\n") + "... and " + (results.length - n) + " more" + else + msg.send results.slice(0, n).join("\n") + else + msg.send results.join("\n") + +callLuigiTaskList = (msg, jobType, cb) -> + msg.http(luigiApiEndpoint + "task_list") + .query(data: JSON.stringify({status: jobType, upstream_status: ""})) + .get() (err, res, body) -> + try + ret = JSON.parse body + cb ret.response + catch error + console.log body + console.log error + cb {} + +numberOfTask = (res) -> + if res.hasOwnProperty("num_tasks") + return res.num_tasks + else + return Object.keys(res).length + +callLuigiTaskSearch = (msg, str, cb) -> + msg.http(luigiApiEndpoint + "task_search") + .query(data: JSON.stringify({task_str: str})) + .get() (err, res, body) -> + try + ret = JSON.parse body + cb ret.response + catch error + console.log body + console.log error + cb {} + +callLuigiResources= (msg, cb) -> + msg.http(luigiApiEndpoint + "resources") + .get() (err, res, body) -> + try + ret = JSON.parse body + cb ret.response + catch error + console.log body + console.log error + cb {} + +callLuigiUpdateResources= (msg, cb) -> + msg.http(luigiApiEndpoint + "update_resources") + .get() (err, res, body) -> + try + ret = JSON.parse body + cb ret.response + catch error + console.log body + console.log error + cb {} + +callLuigiWorkers= (msg, cb) -> + msg.http(luigiApiEndpoint + "worker_list") + .get() (err, res, body) -> + try + ret = JSON.parse body + cb ret.response + catch error + console.log body + console.log error + cb {} + +sortTask = (taskDict) -> + # tasks in taskDict should be in the same status + sortable = [] + for task_id, task of taskDict + status = task.status + sortable.push([task_id, task]) + if status == 'RUNNING' + sortable.sort (a,b) -> a[1].time_running - b[1].time_running + else + sortable.sort (a,b) -> a[1].start_time - b[1].start_time + +sortResource = (resourcesDict) -> + sortable = [] + for resource, d of resourcesDict + if d.total == 0 + n = 0 + else if d.used == 0 + n = 0.1 / d.total + else + n = d.used / d.total + sortable.push([resource, n]) + sortable.sort (a,b) -> a[1] - b[1] + +formatTask = (task_id, task) -> + if task.status == 'RUNNING' + formatTime(task.time_running) + " " + task_id + " p=" + task.priority + else + formatTime(task.start_time) + " " + task_id + " p=" + task.priority + +formatTime = (ts) -> + new Date(Math.floor(ts*1000)).toLocaleString()