Skip to content
This repository has been archived by the owner on Jan 4, 2021. It is now read-only.

Commit

Permalink
Merge pull request #429 from ripienaar/423
Browse files Browse the repository at this point in the history
(#423) Remove the federation observe command
  • Loading branch information
ripienaar committed Mar 8, 2018
2 parents eb1ddd2 + d78c9b7 commit 5cee942
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 104 deletions.
72 changes: 2 additions & 70 deletions lib/mcollective/application/federation.rb
Expand Up @@ -8,7 +8,6 @@ class Federation < Application
The ACTION can be one of the following:
observe - view the published Federation Broker statistics
trace - trace the path to a client
broker - start a Federation Broker instance
Expand All @@ -18,12 +17,12 @@ class Federation < Application

option :cluster,
:arguments => ["--cluster CLUSTER"],
:description => "Cluster name to observe or serve",
:description => "Cluster name to serve",
:type => String

option :instance,
:arguments => ["--instance INSTANCE"],
:description => "Instance name to observe or serve",
:description => "Instance name to serve",
:type => String

option :stats_port,
Expand Down Expand Up @@ -227,69 +226,6 @@ def broker_command
exit(1)
end

def observe_command
abort("Cannot observe using a client that is not configured for Federation, please set choria.federation.collectives or CHORIA_FED_COLLECTIVE") unless choria.federated?

puts "Waiting for cluster stats to be published ...."

choria.federation_broker(configuration[:cluster]).observe_stats do |stats|
next if stats.empty?

print "\e[H\e[2J"

puts "Federation Broker: %s" % Util.colorize(:bold, configuration[:cluster])
puts

["federation", "collective"].each do |type|
type_stats = {"sent" => 0, "received" => 0, "instances" => {}}

stats.keys.sort.each do |instance|
type_stats["sent"] += stats[instance][type]["sent"]
type_stats["received"] += stats[instance][type]["received"]
type_stats["instances"][instance] ||= {}
type_stats["instances"][instance][type] = {
"sent" => stats[instance][type]["sent"],
"received" => stats[instance][type]["received"],
"last" => stats[instance][type]["lasst_message"]
}
end

puts "%s" % Util.colorize(:bold, type.capitalize)
puts " Totals:"
puts " Received: %d Sent: %d" % [type_stats["received"], type_stats["sent"]]
puts
puts " Instances:"

padding = type_stats["instances"].keys.map(&:length).max + 4

type_stats["instances"].keys.sort.each do |instance|
puts "%#{padding}s: Received: %d (%.1f%%) Sent: %d (%.1f%%)" % [
instance,
type_stats["instances"][instance][type]["received"],
type_stats["instances"][instance][type]["received"] / type_stats["received"].to_f * 100,
type_stats["instances"][instance][type]["sent"],
type_stats["instances"][instance][type]["sent"] / type_stats["sent"].to_f * 100
]
end

puts
end

puts Util.colorize(:bold, "Instances:")

stats.keys.sort.each do |instance|
puts " %s: version %s started %s" % [
instance,
stats[instance]["version"],
Time.at(stats[instance]["start_time"]).strftime("%F %T")
]
end

puts
puts "Updated: %s" % Time.now.strftime("%F %T")
end
end

# Creates and cache a Choria helper class
#
# @return [Util::Choria]
Expand Down Expand Up @@ -323,10 +259,6 @@ def validate_configuration(configuration)
if !choria.has_client_public_cert? && !["request_cert", "show_config"].include?(configuration[:command])
abort("A certificate is needed from the Puppet CA for `%s`, please use the `request_cert` command" % choria.certname)
end

if configuration[:command] == "observe"
abort("When observing a Federation Broker the --cluster option is required") unless configuration[:cluster]
end
end

def main
Expand Down
34 changes: 0 additions & 34 deletions lib/mcollective/util/federation_broker.rb
Expand Up @@ -88,40 +88,6 @@ def thread_status
end
end

# Connect to the federation and observes published stats
#
# This will yield a hash of stats for every instance in the
# Federation Broker in {cluster_name}
def observe_stats
ENV["CHORIA_FED_COLLECTIVE"] = @cluster_name

servers = Choria.new(nil, nil, false).middleware_servers("puppet", "4222").map do |host, port|
URI("nats://%s:%s" % [host, port])
end.map(&:to_s)

lock = Mutex.new

cluster_stats = {}

federation = FederationProcessor.new(self)
federation.start_connection(servers)

Thread.new do
federation.consume_from(:name => stats.stats_target) do |msg|
stats = JSON.parse(msg)

if stats["cluster"] == @cluster_name
lock.synchronize { cluster_stats[stats["instance"]] = stats }
end
end
end

loop do
lock.synchronize { yield(cluster_stats) }
sleep 1
end
end

# Starts the broker
#
# @note this method is non blocking, the federation continues to run in the background in threads
Expand Down

0 comments on commit 5cee942

Please sign in to comment.