Skip to content
This repository has been archived by the owner on Jan 4, 2021. It is now read-only.

Commit

Permalink
Merge 449efa6 into f740160
Browse files Browse the repository at this point in the history
  • Loading branch information
ripienaar committed Nov 16, 2017
2 parents f740160 + 449efa6 commit 4a51c3d
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 39 deletions.
2 changes: 1 addition & 1 deletion lib/mcollective/agent/bolt_task.ddl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ action "run_and_wait", :description => "Runs a Bolt ask that was previously down
:description => "JSON String containing input variables",
:type => :string,
:validation => '^.+$',
:optional => true,
:optional => false,
:default => "{}",
:maxlength => 102400

Expand Down
16 changes: 10 additions & 6 deletions lib/mcollective/agent/bolt_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
module MCollective
module Agent
class Bolt_task < RPC::Agent
activate_when do
Util::Choria.new.tasks_support.tasks_compatible?
end

action "download" do
reply[:downloads] = 0

Expand All @@ -29,6 +25,8 @@ class Bolt_task < RPC::Agent
action "run_and_wait" do
tasks = support_factory

reply.fail!("Cannot execute Bolt tasks as the node is not meed the compatability requirements") unless tasks.tasks_compatible?

reply[:task_id] = request.uniqid

task = {
Expand Down Expand Up @@ -59,6 +57,8 @@ class Bolt_task < RPC::Agent
action "run_no_wait" do
tasks = support_factory

reply.fail!("Cannot execute Bolt tasks as the node is not meed the compatability requirements") unless tasks.tasks_compatible?

reply[:task_id] = request.uniqid

task = {
Expand Down Expand Up @@ -119,15 +119,19 @@ def before_processing_hook(msg, connection)

def reply_task_status(status)
reply[:exitcode] = status["exitcode"]
reply[:stdout] = status["stdout"]
reply[:stdout] = status["stdout"].to_json
reply[:stderr] = status["stderr"]
reply[:completed] = status["completed"]
reply[:runtime] = status["runtime"]
reply[:start_time] = status["start_time"].to_i
reply[:task] = status["task"]
reply[:callerid] = status["caller"]

reply.fail("Task failed", 1) if status["exitcode"] != 0 && status["completed"]
if status["stdout"]["_error"]
reply.fail("%s: %s" % [status["stdout"]["_error"]["kind"], status["stdout"]["_error"]["msg"]])
elsif support_factory.task_failed?(status)
reply.fail("Task failed without any error details", 1)
end
end
end
end
Expand Down
131 changes: 102 additions & 29 deletions lib/mcollective/application/tasks.rb
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
module MCollective
class Application
class Tasks < Application
description "Bolt Task Orchestrator"
description "Puppet Task Orchestrator"

usage <<-USAGE
mco tasks [--detail]
mco tasks <TASK NAME>
mco tasks run <TASK NAME> [OPTIONS]
mco tasks status <REQUEST> [OPTIONS]
mco tasks status <REQUEST> [FLAGS]
The Bolt Task Orchestrator is designed to provide a consistent
management environment for Bolt Tasks.
The Bolt Task Orchestrator is designed to provide a consistent
management environment for Bolt Tasks.
It will download tasks from your Puppet Server onto all nodes
and after verifying they were able to correctly download the
same task across the entire fleet will run the task.
It will download tasks from your Puppet Server onto all nodes
and after verifying they were able to correctly download the
same task across the entire fleet will run the task.
Tasks are run in the backgroun, the CLI can wait for up to 60
seconds for your task to complete and show the status or you
can use the status comment to review a completed task later
Tasks are run in the background, the CLI can wait for up to 60
seconds for your task to complete and show the status or you
can use the status comment to review a completed task later.
USAGE

option :__environment,
Expand All @@ -43,6 +43,18 @@ def list_options
end

def status_options
application_options[:usage].clear

self.class.usage <<-USAGE
mco tasks status <REQUEST> [FLAGS]
Retrieves the status for a task you previously requested. It can be running or completed.
By default only failed exuecutions are shown, use --verbose to see them all.
USAGE

self.class.option :__summary,
:arguments => ["--summary"],
:description => "Only show a overall summary of the task",
Expand All @@ -57,6 +69,44 @@ def status_options
end

def run_options
application_options[:usage].clear

self.class.usage <<-USAGE
mco tasks run <TASK NAME> [OPTIONS]
Runs a task in the background and wait up to 50 seconds for it to complete.
Task inputs are handled using --argument=value for basic String, Numeric and Boolean
types, others can be passed using --input
Input can also be read from a file using "--input @file.json" or "--input @file.yaml".
For complex data types like Hashes, Arrays or Variants you have to supply input
as YAML or JSON.
Once a task is run the task ID will be displayed which can later be used with
the "mco tasks status" command to extract results.
Examples:
Run myapp::upgrade task in the background and wait for it to complete:
mco tasks run myapp::upgrade --version 1.0.0
Run myapp::upgrade task in the background and return immediately:
mco tasks run myapp::upgrade --version 1.0.0 --background
Supply complex data input to the task:
mco tasks run myapp::upgrade --version 1.0.0 --input \\
'{"source": {
"url": "http://repo/archive-1.0.0.tgz",
"hash": "68b329da9893e34099c7d8ad5cb9c940"}}'
USAGE

self.class.option :__summary,
:arguments => ["--summary"],
:description => "Only show a overall summary of the task",
Expand All @@ -72,13 +122,7 @@ def run_options
self.class.option :__json_input,
:arguments => ["--input INPUT"],
:description => "JSON input to pass to the task",
:required => true,
:type => String

self.class.option :__json_input,
:arguments => ["--input INPUT"],
:description => "JSON input to pass to the task",
:required => true,
:required => false,
:type => String
end

Expand All @@ -100,10 +144,11 @@ def run_command

request = {
:task => task,
:files => meta["files"].to_json,
:input => configuration[:__json_input]
:files => meta["files"].to_json
}

request[:input] = configuration[:__json_input] if configuration[:__json_input]

puts

if configuration[:__background]
Expand All @@ -129,14 +174,20 @@ def download_files(task, files)
original_batch_size = bolt_task.batch_size
bolt_task.batch_size = 50

puts("Downloading and verifying %d file(s) from the Puppet Server to all Nodes" % [files.size])

failed = false

downloads = bolt_task.download(:environment => configuration[:__environment], :task => task, :files => files.to_json)
downloads = []
cnt = bolt_task.discover.size
idx = 0

downloads.select {|d| d[:statuscode] > 0}.each_with_index do |download, idx|
puts if idx == 0
bolt_task.download(:environment => configuration[:__environment], :task => task, :files => files.to_json) do |_, s|
twirl("Downloading and verifying %d file(s) from the Puppet Server to all nodes:" % [files.size], cnt, idx + 1)
idx += 1
downloads << s
end

downloads.select {|d| d[:statuscode] > 0}.each_with_index do |download, i|
puts if i == 0
failed = true
puts(" %s: %s" % [Util.colorize(:red, "Could not download files onto %s" % download[:sender]), download[:statusmsg]])
end
Expand Down Expand Up @@ -208,18 +259,28 @@ def status_command

def print_result(result)
status = result[:data]
stdout_text = status[:stdout]

unless options[:verbose]
begin
stdout_text = JSON.parse(status[:stdout])
stdout_text.delete("_error")
stdout_text = stdout_text.to_json
rescue # rubocop:disable Lint/HandleExceptions
end
end

if result[:statuscode] != 0
puts("%-40s %s" % [
Util.colorize(:red, result[:sender]),
Util.colorize(:yellow, result[:statusmsg])
])

puts(" %s" % status[:stdout])
puts(" %s" % stdout_text)
puts(" %s" % status[:stderr]) if status[:stderr]
elsif result[:statuscode] == 0 && options[:verbose]
puts(result[:sender])
puts(" %s" % status[:stdout])
puts(" %s" % stdout_text)
puts(" %s" % status[:stderr]) if status[:stderr]
end
end
Expand Down Expand Up @@ -309,7 +370,6 @@ def list_command
puts "Known tasks in the %s environment" % configuration[:__environment]
puts

print("Retrieving tasks....")
known_tasks = task_list(configuration[:__detail], configuration[:__environment])

print("\r")
Expand Down Expand Up @@ -344,12 +404,15 @@ def run
def task_list(descriptions, environment)
tasks = {}

print("Retrieving tasks....")

known_tasks = tasks_support.tasks(environment)

known_tasks.each do |task|
known_tasks.each_with_index do |task, idx|
description = nil

if descriptions
twirl("Retrieving tasks....", known_tasks.size, idx)
meta = tasks_support.task_metadata(task["name"], environment)
description = meta["metadata"]["description"]
end
Expand All @@ -374,7 +437,7 @@ def show_task_help(task)
puts("%s - %s" % [Util.colorize(:bold, task), meta["metadata"]["description"]])
puts

if meta["metadata"]["parameters"].empty?
if meta["metadata"]["parameters"].nil? || meta["metadata"]["parameters"].empty?
puts("The task takes no parameters or have none defined")
puts
else
Expand All @@ -395,6 +458,16 @@ def show_task_help(task)
puts("Use 'mco tasks run %s' to run this task" % [task])
end

def twirl(msg, max, current)
charset = ["▖", "▘", "▝", "▗"]
index = current % charset.size
char = charset[index]
char = Util.colorize(:green, "✓") if max == current

format = "\r%s %s %#{@max.to_s.size}d / %d"
print(format % [msg, char, current, max])
end

def bolt_task
@__bolt_task ||= rpcclient("bolt_task")
end
Expand Down
73 changes: 73 additions & 0 deletions lib/mcollective/util/tasks_support.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def task_command(task)
def task_environment(task)
environment = {}

return environment unless task["input"]
return environment unless ["both", "environment"].include?(task_input_method(task))

JSON.parse(task["input"]).each do |k, v|
Expand Down Expand Up @@ -305,6 +306,71 @@ def task_runtime(requestid)
end
end

# Parses the stdout and turns it into a JSON object
#
# If the output is JSON parsable the output is returned else
# it's wrapped in _output as per the Tasks spec version 1
#
# @note https://github.com/puppetlabs/puppet-specifications/blob/730a2aa23e58b93387d194dbac64af508bdeab01/tasks/README.md#output-handling
# @param stdout [String] the stdout from the script
# @param completed [Boolean] if the task is done running
# @param exitcode [Integer] the exitcode from the script
# @param wrapper_output [String] the wrapper output
# @return [Object] the new stdout according to spec and the stdout object, not JSON encoded
def create_task_stdout(stdout, completed, exitcode, wrapper_output)
result = {}

unless wrapper_output.empty?
result["_error"] = {
"kind" => "choria.tasks/wrapper-error",
"msg" => "The task wrapper failed to run",
"details" => {
"wrapper_output" => wrapper_output
}
}

return result.to_json
end

begin
data = JSON.parse(stdout)

if data.is_a?(Hash)
result = data
else
result["_output"] = stdout
end
rescue
result["_output"] = stdout
end

if exitcode != 0 && completed && !result["_error"]
result["_error"] = {
"kind" => "choria.tasks/task-error",
"msg" => "The task errored with a code %d" % exitcode,
"details" => {
"exitcode" => exitcode
}
}
end

result
end

# Determines if a task failed based on its status
#
# @param status [Hash] the status as produced by {#task_status}
# @return [Boolean]
def task_failed?(status)
return true unless status["wrapper_spawned"]
return true unless status["wrapper_pid"]
return true unless status["wrapper_error"].empty?
return true if status["exitcode"] != 0 && status["completed"]
return true if status["stdout"].include?("_error")

false
end

# Determines the task status for given request
#
# @param requestid [String] request id for the task
Expand Down Expand Up @@ -360,6 +426,13 @@ def task_status(requestid)
result["task"] = choria_metadata["task"]
end

result["stdout"] = create_task_stdout(
result["stdout"],
result["completed"],
result["exitcode"],
result["wrapper_error"]
)

result
end

Expand Down
Loading

0 comments on commit 4a51c3d

Please sign in to comment.