Skip to content

Commit

Permalink
Add support for stale packets #138
Browse files Browse the repository at this point in the history
  • Loading branch information
jmthomas committed May 28, 2015
1 parent d218b7c commit ad79e86
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 170 deletions.
9 changes: 6 additions & 3 deletions lib/cosmos/packets/limits.rb
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,12 @@ def get_packet(target_name, packet_name)
return packet
end

def includes_item?(array, target_name, packet_name, item_name)
array.each do |array_target_name, array_packet_name, array_item_name|
if ((array_target_name == target_name) && (array_packet_name == packet_name) && (array_item_name == item_name))
def includes_item?(ignored_items, target_name, packet_name, item_name)
ignored_items.each do |array_target_name, array_packet_name, array_item_name|
if ((array_target_name == target_name) &&
(array_packet_name == packet_name) &&
# If the item name is nil we're ignoring an entire packet
(array_item_name == item_name || array_item_name.nil?))
return true
end
end
Expand Down
10 changes: 8 additions & 2 deletions lib/cosmos/packets/telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,20 @@ def limits_change_callback=(limits_change_callback)

# Iterates through all the telemetry packets and marks them stale if they
# haven't been received for over the System.staleness_seconds value.
#
# @return [Array(Packet)] Array of the stale packets
def check_stale
stale = []
time = Time.now
@config.telemetry.each do |target_name, target_packets|
target_packets.each do |packet_name, packet|
packet.set_stale if packet.received_time and (!packet.stale) and (time - packet.received_time > System.staleness_seconds)
if packet.received_time and (!packet.stale) and (time - packet.received_time > System.staleness_seconds)
packet.set_stale
stale << packet
end
end
end
stale
end

# @param with_limits_only [Boolean] Return only the stale packets
Expand All @@ -262,7 +269,6 @@ def stale(with_limits_only = false, target = nil)
if target && !target_names.include?(target)
raise "Telemetry target '#{target.upcase}' does not exist"
end
check_stale() # Update stale status
stale = []
@config.telemetry.each do |target_name, target_packets|
next if (target && target != target_name)
Expand Down
2 changes: 2 additions & 0 deletions lib/cosmos/script/limits.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def get_limits_event(id, non_block = false)
result[1][4] = result[1][4].to_s.intern if result[1][4]
elsif result[0] == :LIMITS_SETTINGS
result[1][3] = result[1][3].to_s.intern if result[1][3]
elsif result[0] == :STALE_PACKET
# Nothing extra to do
else
result[1] = result[1].to_s.intern
end
Expand Down
12 changes: 8 additions & 4 deletions lib/cosmos/tools/cmd_tlm_server/cmd_tlm_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ def start(production = false)
@staleness_monitor_thread = Thread.new do
begin
while true
System.telemetry.check_stale
stale = System.telemetry.check_stale
stale.each do |packet|
post_limits_event(:STALE_PACKET, [packet.target_name, packet.packet_name])
end
broken = @sleeper.sleep(10)
break if broken
end
Expand Down Expand Up @@ -281,8 +284,9 @@ def limits_change_callback(packet, item, old_limits_state, value, log_change)
#
# @param event_type [Symbol] The type of limits event that occurred. Must
# be one of :LIMITS_SET which means the system limits set has changed,
# :LIMITS_CHANGE which means an individual item has changed limits state, or
# :LIMITS_SETTINGS which means an individual item has new settings
# :LIMITS_CHANGE which means an individual item has changed limits state,
# :LIMITS_SETTINGS which means an individual item has new settings, or
# :STALE_PACKET which means a packet with limits has gone stale
# @param event_data [Symbol|Array<String,String,String,Symbol,Symbol>]
# Returns the current limits set name for event_type == :LIMITS_SET.
# Returns an array containing the target name, packet name, item name,
Expand Down Expand Up @@ -354,7 +358,7 @@ def self.unsubscribe_limits_events(id)
# Get a limits event from the queue created by {#subscribe_limits_events}.
#
# Each limits event consists of an Array with two elements:
# \[:LIMITS_CHANGE, [target, packet, item, old state, current state]]
# The Symbol name of the event and an Array of data
#
# @param id [Integer] The queue ID received from calling
# {#subscribe_limits_events}
Expand Down
Loading

0 comments on commit ad79e86

Please sign in to comment.