Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial dart release #698

Merged
merged 63 commits into from Apr 5, 2018
Merged

Initial dart release #698

merged 63 commits into from Apr 5, 2018

Conversation

@ryanatball
Copy link
Member

@ryanatball ryanatball commented Dec 29, 2017

@jasonatball @donaldatball Please review

Needs:

  1. More Commenting
  2. More Testing

Should be ready for you to install and starting playing with.

Setup:

  1. Install postgresql
  2. Using pgadmin, username/password for dart user account
  3. Set environment variable DART_DB=dart
  4. Set environment variable DART_TEST_DB=dartTest
  5. Set environment variable DART_USERNAME=dart
  6. Set environment variable DART_PASSWORD=whatever
  7. From the demo folder run: bundle install
  8. From the demo folder run: rake db:setup
  9. Start a CmdTlmServer
  10. From the demo tools folder (or launcher), run: ruby Dart
  11. From the demo folder run: ruby ../lib/cosmos/dart/script/dart_stream_client.rb to test the streaming interface (hack away to try things)
  12. From the demo folder run: ruby ../lib/cosmos/dart/script/dart_decom_client.rb to test the decomutated and reduced interface (hack away to try things)

I will email out a very rough design document that I have now, and will be working on a complete design document next week.

def pre_write_entry_hook(packet)
@sync_count += 1
if @sync_count > @sync_count_limit
@file.fsync
Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

What is this doing?

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

Periodically syncing to the file system so that the data can be used by other Dart processes.

Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

After running I see a dart_cmd and dart_tlm file being created in outputs/dart/data. I assume this is what is flushing these. What is going into these files? Why do they exist?

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

DART stores the raw packets for decommutation (and streaming) in regular COSMOS log files (which is what these are). They are not stored directly the in SQL database. SQL databases are really junk for binary data. So instead they are stored in regular COSMOS files, and the database just keeps track of which packets are in the files and the offset in the file to each packet. To get at the raw data, DART actually just reads it from a regular COSMOS log file.

Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

So right now we're double booking stuff: outputs/logs and dart/logs. How do these log files differ?

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

It is double booking which isn't necessary bad. I think the ideal setup will be to have DART running on a different computer and in that case you want double booking. For the same computer, you can setup CmdTlmServer to use the dart_packet_log_writer and then nothing will be double booked. Not sure of the performance of doing that though and for cases where you are not using JRuby that probably won't work well. The files themselves don't actually differ in anyway except that the may start/stop at different points in time.

Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

Ok, we've been running our COSMOS bin archival tool on a different machine anyway so that's our typical use case. We push the files over and then DART just grabs everything in it's DART_DATA directory and processes it.

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

For DART, you should have the CmdTlmServer streaming the data, and never have to copy any files except to handle data loss due to DART not running for some reason.

t.string "name", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.boolean "ready", default: false
Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

What does this ready indicate?

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

This ready indicates that the ItemToDecomTableMappings for the PacketConfig have been created and that the actual decomutated/reduced tables for the PacketConfig have been created.

Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

So this is the signal that the decomutation workers can begin to push decomutated data by processing the log packet data.

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

Yes and No. All of this stuff is created by one of the decommutation workers. It is mainly a Mutex to ensure that only one of the workers setups up the PacketConfig.

t.bigint "meta_id"
t.boolean "is_tlm", null: false
t.integer "decom_state", default: 0
t.boolean "ready", default: false
Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

What does this ready indicate?

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

This ready indicates that the packet log entry has been flushed to the file system and can now be read.


build_lookups()

# Check if first and last packet in the log are already in the database
Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

I don't understand this optimization. You already check for the binary filename in the DB. How could the first and last packets already be in there? Also how could this check be comprehensive enough? Isn't the first packet always the SYSTEM META packet?

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

First and last packets could already be in there because DART makes it own logs, and if you tried to import one of the files directly from the CmdTlmServer this could happen. The first packet is SYSTEM META, which is a good point, as that wasn't true when I first wrote this. It should probably check the first few packets and not just the first. It is also totally possible that only the middle set of packets in the log file is already in DART... A better check is probably to just check the time range of the packets in the file and see if DART already has data for that time range. I will reconsider this.

Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

How can you import one of the files directly from the CmdTlmServer?

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

So say DART wasn't running and then you want to import some data from the regular CmdTlmServer into DART. You copy the regular COSMOS log file to System.paths['DART_DATA'] and then you run dart_import to get it into DART. All the import does is create PacketLogEntry rows in the database. The raw data is stored as is, where is, in the file being imported. The file is now a part of the DART "database".

time_attribute = :min
when :HOUR
time_delta = 3600.0
model1_time_column = "start_time"
Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

I assume this time vs start_time was done for a reason?

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

Yeah, and I keep debating changing them all to just be start_time and then not doing it. The decom table and the reduced table do have different forms (decom needs to track ple_id, and the time is the time of the packet not the time of the first packet of the reduction --- reduced tables track the time of the first item in the reduction and the number of samples in the reduction). For now I am happy with the way it is. Feel free to argue differently.

Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

A comment somewhere which explains the above would help. :-)

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

Improved commenting is definitely needed...

rows = []
model1.where("reduced_state = 0").order("meta_id ASC, #{model1_time_column} ASC").find_each do |row|
rows << row
if (rows[-1].send(model1_time_column) - rows[0].send(model1_time_column)) > time_delta or rows[0].send(model1_time_column).send(time_attribute) != rows[-1].send(model1_time_column).send(time_attribute) or rows[0].meta_id != rows[-1].meta_id
Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

What are we checking in this monstrosity? Can I get a newline? :-)

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

Do not insult my logical works of art ;)

It is checking if it has all the data to be ready to make the reduction.

Ie.
If greater than time units of time have passed (1 minute, 1 hour, or 1 day) in the accumulated samples OR
If the minute number or hour number or day number of the timestamp has changed (normal case - new row at beginning of hour/minute/day) OR
The meta id changed

new_row.write_attribute(min_item_name, min_value)
new_row.write_attribute(avg_item_name, avg_value)
end
# Need to transaction the below as it is possible to create reduced and not update original
Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

Is this a note which has not been completed?

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

It's a TODO/Maybe. There is just a possible hole here if it gets shut down between creating the rows and marking the originals as being reduced.

unless system_config
ple.decom_state = PacketLogEntry::NO_SYSTEM_CONFIG
ple.save
Cosmos::Logger.error("PLE:#{ple.id}:#{ple.decom_state_string}")
Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

There are a lot of things being logged here which is good. Does this go into the COSMOS server_messages log or some new log? We'll have to create some sort of help to interpret and fix any possible errors which are being logged.

Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

I see the message logs under outputs/dart/logs now.

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

Every DART process has its own new log file.

@jasonatball
Copy link
Collaborator

@jasonatball jasonatball commented Jan 2, 2018

We're also going to need Mac Apps for DART.

@ryanatball
Copy link
Member Author

@ryanatball ryanatball commented Jan 2, 2018

I don't think Mac Apps are required since DART is just a terminal program.

@jasonatball
Copy link
Collaborator

@jasonatball jasonatball commented Jan 2, 2018

Any thought as to how to integrate this with the rest of the tool chain? I was thinking we modify the "Open Log" dialog to point to DART rather than actual log files. It already has the time period start / stop built in.

@ryanatball
Copy link
Member Author

@ryanatball ryanatball commented Jan 2, 2018

Haven't though enough about integrating it in yet. Definitely some sort of dialog with start time and stop time, also needs a way to filter based on meta data. Documentation and integrating DART are the goals for January. Hopefully the 4.2 release with this near the end of the month. Did you get it running on your computer?

when :UINT
if bit_size <= 31
db_type = :integer
elsif bit_size <= 63
Copy link
Contributor

@donaldatball donaldatball Jan 2, 2018

Why is it 32/64 for :INT, and 31/63 for :UINT?

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

Because the SQL database doesn't have an unsigned integer type. So a 32bit unsigned number would not store correctly into the database type (integer or bigint)

Cosmos::Logger.level = Cosmos::Logger::DEBUG

start_time = Time.utc(1970, 1, 1, 0, 0, 0)
end_time = Time.utc(2020, 1, 1, 0, 0, 0)
Copy link
Contributor

@donaldatball donaldatball Jan 2, 2018

Use Time.now.utc instead of fixed time (that is depressingly close?)

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

This is just an example script and meant to be hacked on, but that would be a good improvement.


def start_new_file_hook(packet)
packet_log = PacketLog.new
packet_log.filename = @filename.clone
Copy link
Collaborator

@jasonatball jasonatball Jan 2, 2018

This is recording the absolute path for the filename. Does that cause problems if someone changes the DART_DATA environment variable? Seems like it would have to re-import everything.

Copy link
Member Author

@ryanatball ryanatball Jan 2, 2018

My cleanup code removes information from the database if files go missing. That may be overkill if all you have to do is remove an environment variable to destroy the raw data in the database. The DART_DATA value is new and moving to relative paths would probably be better at this point. I'll make this change.

Copy link
Member Author

@ryanatball ryanatball Jan 3, 2018

Rethinking this I think the absolute paths are safer. Is there any legitimate reason why the log files would move? If they need to be moved then we should add a database command to do that.

Copy link
Collaborator

@jasonatball jasonatball Jan 3, 2018

They'd change because someone likes a new path better: "Data" vs "Archive" or something stupid like that. The main thing is that you wouldn't want the entire database to have to be re-built because of something like that. I think the safest thing is to abort the entire process with a warning message if that happens. Perhaps a command line option like --force to rebuilt the whole thing and --relocate to note a new directory.

@codecov-io
Copy link

@codecov-io codecov-io commented Jan 3, 2018

Codecov Report

No coverage uploaded for pull request base (master@3f48053). Click here to learn what that means.
The diff coverage is 25.47%.

Impacted file tree graph

@@            Coverage Diff            @@
##             master     #698   +/-   ##
=========================================
  Coverage          ?   85.73%           
=========================================
  Files             ?      159           
  Lines             ?    14990           
  Branches          ?        0           
=========================================
  Hits              ?    12851           
  Misses            ?     2139           
  Partials          ?        0
Impacted Files Coverage Δ
lib/cosmos/tools/cmd_tlm_server/api.rb 92.59% <10%> (ø)
lib/cosmos/utilities/message_log.rb 100% <100%> (ø)
lib/cosmos/interfaces/tcpip_server_interface.rb 66.85% <100%> (ø)
lib/cosmos/system/system.rb 92.54% <100%> (ø)
lib/cosmos/packets/packet.rb 96.1% <100%> (ø)
lib/cosmos/script/cmd_tlm_server.rb 85.43% <50%> (ø)
lib/cosmos/tools/cmd_tlm_server/replay_backend.rb 18.83% <9.32%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3f48053...380d26f. Read the comment docs.

table_index = 0
data_types.each_slice(MAX_COLUMNS_PER_TABLE) do |table_data_types|
begin
ActiveRecord::Base.connection.drop_table("t#{packet_config.id}_#{table_index}")
Copy link
Collaborator

@jasonatball jasonatball Jan 3, 2018

It looks like setup_packet_config is called by cleanup which happens every time DART starts. Isn't this code dropping the decommutation tables? How does this not clear the DB every time DART starts?

Copy link
Member Author

@ryanatball ryanatball Jan 3, 2018

It is only called for PacketConfigs that are not ready.

require File.expand_path('../../config/environment', __FILE__)
require 'dart_common'
require 'dart_logging'
class DartStreamServer < Cosmos::TcpipServerInterface
Copy link
Collaborator

@jasonatball jasonatball Jan 9, 2018

The DART decom server uses the JsonDRb class from COSMOS which is the same code which powers the CmdTlmServer. Why is this just using the TcpipServerInterface?

Copy link
Member Author

@ryanatball ryanatball Jan 9, 2018

Because it streams packets. JsonDRb is just a single command/response.


start_time = nil
end_time = nil
start_time = Time.at(start_time_sec, start_time_usec) if start_time_sec and start_time_usec
Copy link
Collaborator

@jasonatball jasonatball Jan 9, 2018

Should this be Time.at(sec, usec).sys? @donaldatball changed this throughout the code a while back.

Copy link
Member Author

@ryanatball ryanatball Jan 10, 2018

I believe .sys only affects timestamps that are displayed, so I don't think so.

Copy link
Collaborator

@jasonatball jasonatball Jan 10, 2018

It converts the timestamp to UTC or not depending on whether TIME_ZONE_UTC is set in system.txt. If the users have this set and request time using UTC this code will currently return local time.

Copy link
Member Author

@ryanatball ryanatball Jan 10, 2018

The sec/usec values should always be UTC when sent on the line. If not that is a bug.

item = request['item']
raise "Item \"#{item}\" invalid" if item.length != 3

reduction = request['reduction'].to_s.upcase
Copy link
Collaborator

@jasonatball jasonatball Jan 9, 2018

I get that you didn't want to put the reduction smarts in the server but how does a client know which table to use? DART is the only thing that knows how much data is in a given query. If you're collecting data at 1 per minute vs 10Hz you're talking a 600x difference in data.

Copy link
Member Author

@ryanatball ryanatball Jan 10, 2018

Clients should know if they want reduced data or not.

Copy link
Collaborator

@jasonatball jasonatball Jan 10, 2018

Sure but which reduced table should they use? That's the part I don't think they know. Perhaps an API to get data volumes or something?

Copy link
Member Author

@ryanatball ryanatball Jan 10, 2018

For the reduced data at least, you can assume 60 samples per hour, 24 samples per day, and 365 samples per year accordingly. For the non-reduced you don't necessarily know, but in general that should only be used for queries of time less than an hour or so. In general, the client needs to look at how much time is being requested and get the right data set.

Copy link
Member Author

@ryanatball ryanatball Jan 10, 2018

An API to get sample count isn't a bad idea

Copy link
Collaborator

@jasonatball jasonatball Jan 10, 2018

Alright

raise "Unknown value_type: #{value_type}"
end

meta_ids = request['meta_ids']
Copy link
Collaborator

@jasonatball jasonatball Jan 9, 2018

meta_ids appear to be the PacketLogEntry database IDs. Is that correct? How does the user of this API know the meta_ids?

Copy link
Member Author

@ryanatball ryanatball Jan 10, 2018

That's correct. You have to first query something about the SYSTEM META packet that you care about. For example you could get SYSTEM META SERIAL_NUMBER and then note the meta packets that have the serial number you care about.

Copy link
Collaborator

@jasonatball jasonatball Jan 10, 2018

Can you create a concrete example of this in your dart_decom_client script because I'm still confused.

limit = request['limit'].to_i
limit = 10000 if limit <= 0 or limit > 10000

offset = request['offset'].to_i
Copy link
Collaborator

@jasonatball jasonatball Jan 9, 2018

What is offset and how is it used?

Copy link
Member Author

@ryanatball ryanatball Jan 10, 2018

Offset is how you overcome the 10000 result max. You can offset 0, then 10000, then 20000, etc to get the entire dataset.

@jasonatball
Copy link
Collaborator

@jasonatball jasonatball commented Jan 10, 2018

@ryanatball are you going to add the standard COSMOS copyright header to all these new DART files?

ple.time = packet.received_time
ple.packet_log_id = packet_log.id
ple.data_offset = data_offset
ple.meta_id = meta_id
Copy link
Collaborator

@jasonatball jasonatball Jan 10, 2018

Initially meta_id is set to nil. Does this work because below you check for SYSTEM META and since that's the first packet in a log file you then set the meta_id for the first packet as well as all subsequent packets?

Copy link
Member Author

@ryanatball ryanatball Jan 11, 2018

Yes

# Update Current Value Table Used By Packet Log Writer
cvt_packet = Cosmos::System.telemetry.update!(packet.target_name, packet.packet_name, packet.buffer)
cvt_packet.received_time = packet.received_time
@packet_log_writer.start
Copy link
Collaborator

@jasonatball jasonatball Jan 10, 2018

So when a new SYSTEM META packet arrives we create a new log file. Is this just to prevent large files or do other parts of DART rely on this? For example the DartPacketLogWriter?

Copy link
Member Author

@ryanatball ryanatball Jan 11, 2018

It's just to keep all the data in a file to the same META configuration. It is not technically required as the readers and stream receivers should change configuration if the META configuration changes mid stream.

if packet.target_name == 'SYSTEM'.freeze and packet.packet_name == 'META'.freeze
Cosmos::Logger.info("#{@log_type}: #{packet.target_name} #{packet.packet_name}")
# Update Current Value Table Used By Packet Log Writer
cvt_packet = Cosmos::System.telemetry.update!(packet.target_name, packet.packet_name, packet.buffer)
Copy link
Collaborator

@jasonatball jasonatball Jan 10, 2018

What is the point of updating the CVT right here? I see code in PacketLogWriter in start_new_file_hook which calls System.telemtry.packet('SYSTEM','META') but that's only if the first packet IS NOT a SYSTEM META packet which doesn't seem possible considering how everything else works.

Copy link
Member Author

@ryanatball ryanatball Jan 11, 2018

It is just more robust because PacketLogWriter does read the packet from System and it should be up to date. May not be required buy also shouldn't be removed.


def start
begin
while true
Copy link
Collaborator

@jasonatball jasonatball Jan 11, 2018

Doesn't this code continuously find the same tables each cycle and send them for processing? Then the worker thread has to do some work to determine that no work needs to be done and we go around again. Seems like there's a bunch of overhead processing the same things each cycle which don't need to be processed anymore.

Copy link
Member Author

@ryanatball ryanatball Jan 11, 2018

It checks all the tables every minute. Could be optimized to only check the hour tables and the day tables on the hour/day transition (and probably always on startup).

end

rows = []
model1.where("reduced_state = 0").order("meta_id ASC, #{model1_time_column} ASC").find_each do |row|
Copy link
Collaborator

@jasonatball jasonatball Jan 11, 2018

Why do you order by meta_id?

end
end

unless (max_sample.is_a?(Float) and (avg_sample.nan? or !avg_sample.finite?))
Copy link
Collaborator

@jasonatball jasonatball Jan 11, 2018

I don't understand this logic. If the max is a float and the avg is non a number we say min_nan_found. Otherwise we determine if the value should be the new maximum. Shouldn't the check simply be whether the max_sample is nan or not finite. What does the avg_sample have to do with it?

Copy link
Collaborator

@jasonatball jasonatball Jan 11, 2018

Ok I think this is a typo as well as the check for min. I'll fix in my documentation / refactor.

@jasonatball
Copy link
Collaborator

@jasonatball jasonatball commented Jan 12, 2018

@ryanatball can you please setup spec/dart/dart_common_spec.rb which loads dart_common. I'm having problems getting this to work since it's trying to find rails.

DartCommon.handle_argv

Cosmos::Logger.level = Cosmos::Logger::INFO
dart_logging = DartLogging.new('dart_decom_server')
Copy link
Collaborator

@jasonatball jasonatball Feb 14, 2018

I'd like to move the logging into DartDecomQuery like with DartTcpipServerInterface but didn't know when / where to call the logging.stop method. Or maybe we move the logging out of DartTcpipServerInterface into dart_stream_server to match dart_decom_server. I don't like the inconsistency right now.

jasonatball
Copy link
Collaborator

jasonatball commented on 896361d Feb 20, 2018

@ryanatball What are these first three statements cleaning? How can you get a REDUCED state but no reduced_id?

@@ -524,7 +524,7 @@ def create_reduction_table(table_name, table_data_types, table_index)
when :integer, :bigint, :float, :decimal
t.column "i#{item_index}max", data_type
t.column "i#{item_index}min", data_type
t.column "i#{item_index}avg", data_type
t.column "i#{item_index}avg", :float # Average is always floating point
Copy link
Collaborator

@jasonatball jasonatball Feb 22, 2018

@ryanatball I found this during testing. I assume it was an oversight. I think we want floating point averages.

@@ -13,27 +13,25 @@
require 'dart_logging'
require 'dart_decommutator'

module Dart
Copy link
Collaborator

@jasonatball jasonatball Feb 22, 2018

Not sure how this still got stuck in there.

avg_value = avg_value.to_f / total_samples if total_samples != 0
min_value = Float::NAN if min_nan_found and !min_value
max_value = Float::NAN if max_nan_found and !max_value
mean, stddev_value = Math.stddev_population(values)
Copy link
Collaborator

@jasonatball jasonatball Mar 6, 2018

@ryanatball I calculate the standard deviation on the population of the averaged values

@ryanatball
Copy link
Member Author

@ryanatball ryanatball commented Apr 3, 2018

@jasonatball @donaldatball I believe DART is ready for release. Please run it and play a little and see if there are any blatant issues. Thanks!

@ryanatball ryanatball merged commit 1ce692f into master Apr 5, 2018
0 of 3 checks passed
@ryanatball ryanatball deleted the dart branch Apr 5, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

4 participants