-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial dart release #698
Initial dart release #698
Conversation
def pre_write_entry_hook(packet) | ||
@sync_count += 1 | ||
if @sync_count > @sync_count_limit | ||
@file.fsync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Periodically syncing to the file system so that the data can be used by other Dart processes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After running I see a dart_cmd and dart_tlm file being created in outputs/dart/data. I assume this is what is flushing these. What is going into these files? Why do they exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DART stores the raw packets for decommutation (and streaming) in regular COSMOS log files (which is what these are). They are not stored directly the in SQL database. SQL databases are really junk for binary data. So instead they are stored in regular COSMOS files, and the database just keeps track of which packets are in the files and the offset in the file to each packet. To get at the raw data, DART actually just reads it from a regular COSMOS log file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So right now we're double booking stuff: outputs/logs and dart/logs. How do these log files differ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is double booking which isn't necessary bad. I think the ideal setup will be to have DART running on a different computer and in that case you want double booking. For the same computer, you can setup CmdTlmServer to use the dart_packet_log_writer and then nothing will be double booked. Not sure of the performance of doing that though and for cases where you are not using JRuby that probably won't work well. The files themselves don't actually differ in anyway except that the may start/stop at different points in time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, we've been running our COSMOS bin archival tool on a different machine anyway so that's our typical use case. We push the files over and then DART just grabs everything in it's DART_DATA directory and processes it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For DART, you should have the CmdTlmServer streaming the data, and never have to copy any files except to handle data loss due to DART not running for some reason.
lib/cosmos/dart/db/schema.rb
Outdated
t.string "name", null: false | ||
t.datetime "created_at", null: false | ||
t.datetime "updated_at", null: false | ||
t.boolean "ready", default: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this ready indicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ready indicates that the ItemToDecomTableMappings for the PacketConfig have been created and that the actual decomutated/reduced tables for the PacketConfig have been created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is the signal that the decomutation workers can begin to push decomutated data by processing the log packet data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and No. All of this stuff is created by one of the decommutation workers. It is mainly a Mutex to ensure that only one of the workers setups up the PacketConfig.
lib/cosmos/dart/db/schema.rb
Outdated
t.bigint "meta_id" | ||
t.boolean "is_tlm", null: false | ||
t.integer "decom_state", default: 0 | ||
t.boolean "ready", default: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this ready indicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ready indicates that the packet log entry has been flushed to the file system and can now be read.
|
||
build_lookups() | ||
|
||
# Check if first and last packet in the log are already in the database |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this optimization. You already check for the binary filename in the DB. How could the first and last packets already be in there? Also how could this check be comprehensive enough? Isn't the first packet always the SYSTEM META packet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First and last packets could already be in there because DART makes it own logs, and if you tried to import one of the files directly from the CmdTlmServer this could happen. The first packet is SYSTEM META, which is a good point, as that wasn't true when I first wrote this. It should probably check the first few packets and not just the first. It is also totally possible that only the middle set of packets in the log file is already in DART... A better check is probably to just check the time range of the packets in the file and see if DART already has data for that time range. I will reconsider this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can you import one of the files directly from the CmdTlmServer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So say DART wasn't running and then you want to import some data from the regular CmdTlmServer into DART. You copy the regular COSMOS log file to System.paths['DART_DATA'] and then you run dart_import to get it into DART. All the import does is create PacketLogEntry rows in the database. The raw data is stored as is, where is, in the file being imported. The file is now a part of the DART "database".
time_attribute = :min | ||
when :HOUR | ||
time_delta = 3600.0 | ||
model1_time_column = "start_time" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this time vs start_time was done for a reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, and I keep debating changing them all to just be start_time and then not doing it. The decom table and the reduced table do have different forms (decom needs to track ple_id, and the time is the time of the packet not the time of the first packet of the reduction --- reduced tables track the time of the first item in the reduction and the number of samples in the reduction). For now I am happy with the way it is. Feel free to argue differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment somewhere which explains the above would help. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improved commenting is definitely needed...
rows = [] | ||
model1.where("reduced_state = 0").order("meta_id ASC, #{model1_time_column} ASC").find_each do |row| | ||
rows << row | ||
if (rows[-1].send(model1_time_column) - rows[0].send(model1_time_column)) > time_delta or rows[0].send(model1_time_column).send(time_attribute) != rows[-1].send(model1_time_column).send(time_attribute) or rows[0].meta_id != rows[-1].meta_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we checking in this monstrosity? Can I get a newline? :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not insult my logical works of art ;)
It is checking if it has all the data to be ready to make the reduction.
Ie.
If greater than time units of time have passed (1 minute, 1 hour, or 1 day) in the accumulated samples OR
If the minute number or hour number or day number of the timestamp has changed (normal case - new row at beginning of hour/minute/day) OR
The meta id changed
new_row.write_attribute(min_item_name, min_value) | ||
new_row.write_attribute(avg_item_name, avg_value) | ||
end | ||
# Need to transaction the below as it is possible to create reduced and not update original |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a note which has not been completed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a TODO/Maybe. There is just a possible hole here if it gets shut down between creating the rows and marking the originals as being reduced.
unless system_config | ||
ple.decom_state = PacketLogEntry::NO_SYSTEM_CONFIG | ||
ple.save | ||
Cosmos::Logger.error("PLE:#{ple.id}:#{ple.decom_state_string}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a lot of things being logged here which is good. Does this go into the COSMOS server_messages log or some new log? We'll have to create some sort of help to interpret and fix any possible errors which are being logged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the message logs under outputs/dart/logs now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every DART process has its own new log file.
We're also going to need Mac Apps for DART. |
I don't think Mac Apps are required since DART is just a terminal program. |
Any thought as to how to integrate this with the rest of the tool chain? I was thinking we modify the "Open Log" dialog to point to DART rather than actual log files. It already has the time period start / stop built in. |
Haven't though enough about integrating it in yet. Definitely some sort of dialog with start time and stop time, also needs a way to filter based on meta data. Documentation and integrating DART are the goals for January. Hopefully the 4.2 release with this near the end of the month. Did you get it running on your computer? |
when :UINT | ||
if bit_size <= 31 | ||
db_type = :integer | ||
elsif bit_size <= 63 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it 32/64 for :INT, and 31/63 for :UINT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the SQL database doesn't have an unsigned integer type. So a 32bit unsigned number would not store correctly into the database type (integer or bigint)
Cosmos::Logger.level = Cosmos::Logger::DEBUG | ||
|
||
start_time = Time.utc(1970, 1, 1, 0, 0, 0) | ||
end_time = Time.utc(2020, 1, 1, 0, 0, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Time.now.utc instead of fixed time (that is depressingly close?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just an example script and meant to be hacked on, but that would be a good improvement.
|
||
def start_new_file_hook(packet) | ||
packet_log = PacketLog.new | ||
packet_log.filename = @filename.clone |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is recording the absolute path for the filename. Does that cause problems if someone changes the DART_DATA environment variable? Seems like it would have to re-import everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My cleanup code removes information from the database if files go missing. That may be overkill if all you have to do is remove an environment variable to destroy the raw data in the database. The DART_DATA value is new and moving to relative paths would probably be better at this point. I'll make this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rethinking this I think the absolute paths are safer. Is there any legitimate reason why the log files would move? If they need to be moved then we should add a database command to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They'd change because someone likes a new path better: "Data" vs "Archive" or something stupid like that. The main thing is that you wouldn't want the entire database to have to be re-built because of something like that. I think the safest thing is to abort the entire process with a warning message if that happens. Perhaps a command line option like --force to rebuilt the whole thing and --relocate to note a new directory.
Codecov Report
@@ Coverage Diff @@
## master #698 +/- ##
=========================================
Coverage ? 85.73%
=========================================
Files ? 159
Lines ? 14990
Branches ? 0
=========================================
Hits ? 12851
Misses ? 2139
Partials ? 0
Continue to review full report at Codecov.
|
lib/cosmos/dart/lib/dart_common.rb
Outdated
table_index = 0 | ||
data_types.each_slice(MAX_COLUMNS_PER_TABLE) do |table_data_types| | ||
begin | ||
ActiveRecord::Base.connection.drop_table("t#{packet_config.id}_#{table_index}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like setup_packet_config is called by cleanup which happens every time DART starts. Isn't this code dropping the decommutation tables? How does this not clear the DB every time DART starts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is only called for PacketConfigs that are not ready.
require File.expand_path('../../config/environment', __FILE__) | ||
require 'dart_common' | ||
require 'dart_logging' | ||
class DartStreamServer < Cosmos::TcpipServerInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DART decom server uses the JsonDRb class from COSMOS which is the same code which powers the CmdTlmServer. Why is this just using the TcpipServerInterface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it streams packets. JsonDRb is just a single command/response.
|
||
start_time = nil | ||
end_time = nil | ||
start_time = Time.at(start_time_sec, start_time_usec) if start_time_sec and start_time_usec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be Time.at(sec, usec).sys? @donaldatball changed this throughout the code a while back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe .sys only affects timestamps that are displayed, so I don't think so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It converts the timestamp to UTC or not depending on whether TIME_ZONE_UTC is set in system.txt. If the users have this set and request time using UTC this code will currently return local time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sec/usec values should always be UTC when sent on the line. If not that is a bug.
item = request['item'] | ||
raise "Item \"#{item}\" invalid" if item.length != 3 | ||
|
||
reduction = request['reduction'].to_s.upcase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get that you didn't want to put the reduction smarts in the server but how does a client know which table to use? DART is the only thing that knows how much data is in a given query. If you're collecting data at 1 per minute vs 10Hz you're talking a 600x difference in data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clients should know if they want reduced data or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure but which reduced table should they use? That's the part I don't think they know. Perhaps an API to get data volumes or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the reduced data at least, you can assume 60 samples per hour, 24 samples per day, and 365 samples per year accordingly. For the non-reduced you don't necessarily know, but in general that should only be used for queries of time less than an hour or so. In general, the client needs to look at how much time is being requested and get the right data set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An API to get sample count isn't a bad idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright ✋
raise "Unknown value_type: #{value_type}" | ||
end | ||
|
||
meta_ids = request['meta_ids'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meta_ids appear to be the PacketLogEntry database IDs. Is that correct? How does the user of this API know the meta_ids?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. You have to first query something about the SYSTEM META packet that you care about. For example you could get SYSTEM META SERIAL_NUMBER and then note the meta packets that have the serial number you care about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you create a concrete example of this in your dart_decom_client script because I'm still confused.
limit = request['limit'].to_i | ||
limit = 10000 if limit <= 0 or limit > 10000 | ||
|
||
offset = request['offset'].to_i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is offset and how is it used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Offset is how you overcome the 10000 result max. You can offset 0, then 10000, then 20000, etc to get the entire dataset.
@ryanatball are you going to add the standard COSMOS copyright header to all these new DART files? |
ple.time = packet.received_time | ||
ple.packet_log_id = packet_log.id | ||
ple.data_offset = data_offset | ||
ple.meta_id = meta_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially meta_id is set to nil. Does this work because below you check for SYSTEM META and since that's the first packet in a log file you then set the meta_id for the first packet as well as all subsequent packets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
# Update Current Value Table Used By Packet Log Writer | ||
cvt_packet = Cosmos::System.telemetry.update!(packet.target_name, packet.packet_name, packet.buffer) | ||
cvt_packet.received_time = packet.received_time | ||
@packet_log_writer.start |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So when a new SYSTEM META packet arrives we create a new log file. Is this just to prevent large files or do other parts of DART rely on this? For example the DartPacketLogWriter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just to keep all the data in a file to the same META configuration. It is not technically required as the readers and stream receivers should change configuration if the META configuration changes mid stream.
if packet.target_name == 'SYSTEM'.freeze and packet.packet_name == 'META'.freeze | ||
Cosmos::Logger.info("#{@log_type}: #{packet.target_name} #{packet.packet_name}") | ||
# Update Current Value Table Used By Packet Log Writer | ||
cvt_packet = Cosmos::System.telemetry.update!(packet.target_name, packet.packet_name, packet.buffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the point of updating the CVT right here? I see code in PacketLogWriter in start_new_file_hook which calls System.telemtry.packet('SYSTEM','META') but that's only if the first packet IS NOT a SYSTEM META packet which doesn't seem possible considering how everything else works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just more robust because PacketLogWriter does read the packet from System and it should be up to date. May not be required buy also shouldn't be removed.
|
||
def start | ||
begin | ||
while true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this code continuously find the same tables each cycle and send them for processing? Then the worker thread has to do some work to determine that no work needs to be done and we go around again. Seems like there's a bunch of overhead processing the same things each cycle which don't need to be processed anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It checks all the tables every minute. Could be optimized to only check the hour tables and the day tables on the hour/day transition (and probably always on startup).
end | ||
|
||
rows = [] | ||
model1.where("reduced_state = 0").order("meta_id ASC, #{model1_time_column} ASC").find_each do |row| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you order by meta_id?
end | ||
end | ||
|
||
unless (max_sample.is_a?(Float) and (avg_sample.nan? or !avg_sample.finite?)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this logic. If the max is a float and the avg is non a number we say min_nan_found. Otherwise we determine if the value should be the new maximum. Shouldn't the check simply be whether the max_sample is nan or not finite. What does the avg_sample have to do with it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I think this is a typo as well as the check for min. I'll fix in my documentation / refactor.
@ryanatball can you please setup spec/dart/dart_common_spec.rb which loads dart_common. I'm having problems getting this to work since it's trying to find rails. |
DartCommon.handle_argv | ||
|
||
Cosmos::Logger.level = Cosmos::Logger::INFO | ||
dart_logging = DartLogging.new('dart_decom_server') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to move the logging into DartDecomQuery like with DartTcpipServerInterface but didn't know when / where to call the logging.stop method. Or maybe we move the logging out of DartTcpipServerInterface into dart_stream_server to match dart_decom_server. I don't like the inconsistency right now.
lib/cosmos/dart/lib/dart_common.rb
Outdated
@@ -524,7 +524,7 @@ def create_reduction_table(table_name, table_data_types, table_index) | |||
when :integer, :bigint, :float, :decimal | |||
t.column "i#{item_index}max", data_type | |||
t.column "i#{item_index}min", data_type | |||
t.column "i#{item_index}avg", data_type | |||
t.column "i#{item_index}avg", :float # Average is always floating point |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ryanatball I found this during testing. I assume it was an oversight. I think we want floating point averages.
@@ -13,27 +13,25 @@ | |||
require 'dart_logging' | |||
require 'dart_decommutator' | |||
|
|||
module Dart |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how this still got stuck in there.
avg_value = avg_value.to_f / total_samples if total_samples != 0 | ||
min_value = Float::NAN if min_nan_found and !min_value | ||
max_value = Float::NAN if max_nan_found and !max_value | ||
mean, stddev_value = Math.stddev_population(values) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ryanatball I calculate the standard deviation on the population of the averaged values
@jasonatball @donaldatball I believe DART is ready for release. Please run it and play a little and see if there are any blatant issues. Thanks! |
@jasonatball @donaldatball Please review
Needs:
Should be ready for you to install and starting playing with.
Setup:
I will email out a very rough design document that I have now, and will be working on a complete design document next week.