Skip to content

Commit

Permalink
Add output-archive-items and output-archive-bytes parameters to tscli…
Browse files Browse the repository at this point in the history
…ent application.

- Remove previous output-archive-size parameter.
  • Loading branch information
cuveland committed Nov 21, 2016
1 parent 93999ab commit 2a7c0d4
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 8 deletions.
6 changes: 4 additions & 2 deletions app/tsclient/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ Application::Application(Parameters const& par) : par_(par)
}

if (!par_.output_archive().empty()) {
if (par_.output_archive_size() == SIZE_MAX) {
if (par_.output_archive_items() == SIZE_MAX &&
par_.output_archive_bytes() == SIZE_MAX) {
sinks_.push_back(std::unique_ptr<fles::TimesliceSink>(
new fles::TimesliceOutputArchive(par_.output_archive())));
} else {
sinks_.push_back(std::unique_ptr<fles::TimesliceSink>(
new fles::TimesliceOutputArchiveSequence(
par_.output_archive(), par_.output_archive_size())));
par_.output_archive(), par_.output_archive_items(),
par_.output_archive_bytes())));
}
}

Expand Down
6 changes: 5 additions & 1 deletion app/tsclient/Parameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ void Parameters::parse_options(int argc, char* argv[])
"name of an input file archive to read");
desc_add("output-archive,o", po::value<std::string>(&output_archive_),
"name of an output file archive to write");
desc_add("output-archive-size", po::value<size_t>(&output_archive_size_),
desc_add("output-archive-items", po::value<size_t>(&output_archive_items_),
"limit number of timeslices per file to given number, create "
"sequence of output archive files (use placeholder %n in "
"output-archive parameter)");
desc_add("output-archive-bytes", po::value<size_t>(&output_archive_bytes_),
"limit number of bytes per file to given number, create "
"sequence of output archive files (use placeholder %n in "
"output-archive parameter)");
desc_add("publish,P", po::value<std::string>(&publish_address_)
->implicit_value("tcp://*:5556"),
"enable timeslice publisher on given address");
Expand Down
7 changes: 5 additions & 2 deletions app/tsclient/Parameters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ class Parameters

std::string output_archive() const { return output_archive_; }

size_t output_archive_size() const { return output_archive_size_; }
size_t output_archive_items() const { return output_archive_items_; }

size_t output_archive_bytes() const { return output_archive_bytes_; }

bool analyze() const { return analyze_; }

Expand All @@ -53,7 +55,8 @@ class Parameters
std::string shm_identifier_;
std::string input_archive_;
std::string output_archive_;
size_t output_archive_size_ = SIZE_MAX;
size_t output_archive_items_ = SIZE_MAX;
size_t output_archive_bytes_ = SIZE_MAX;
bool analyze_ = false;
bool benchmark_ = false;
size_t verbosity_ = 0;
Expand Down
28 changes: 25 additions & 3 deletions lib/fles_ipc/OutputArchiveSequence.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ class OutputArchiveSequence : public Sink<Base>
* \param items_per_file Number of items to store in each file
*/
OutputArchiveSequence(const std::string& filename_template,
std::size_t items_per_file = SIZE_MAX)
: filename_template_(filename_template), items_per_file_(items_per_file)
std::size_t items_per_file = SIZE_MAX,
std::size_t bytes_per_file = SIZE_MAX)
: filename_template_(filename_template),
items_per_file_(items_per_file), bytes_per_file_(bytes_per_file)
{
if (items_per_file_ == 0) {
items_per_file_ = SIZE_MAX;
}
if (bytes_per_file_ == 0) {
bytes_per_file_ = SIZE_MAX;
}

// append sequence number to file name if missing in template
if (items_per_file_ < SIZE_MAX &&
Expand Down Expand Up @@ -74,13 +79,14 @@ class OutputArchiveSequence : public Sink<Base>

std::string filename_template_;
std::size_t items_per_file_;
std::size_t bytes_per_file_;
std::size_t file_count_ = 0;
std::size_t file_item_count_ = 0;

// TODO(Jan): Solve this without the additional alloc/copy operation
void do_put(const Derived& item)
{
if (file_item_count_ == items_per_file_) {
if (file_limit_reached()) {
next_file();
}
*oarchive_ << item;
Expand All @@ -94,6 +100,22 @@ class OutputArchiveSequence : public Sink<Base>
return boost::replace_all_copy(filename_template_, "%n", number.str());
}

bool file_limit_reached()
{
// check item limit
if (file_item_count_ == items_per_file_) {
return true;
}
// check byte limit if set
if (bytes_per_file_ < SIZE_MAX) {
auto pos = ofstream_->tellp();
if (pos > 0 && static_cast<std::size_t>(pos) >= bytes_per_file_) {
return true;
}
}
return false;
}

void next_file()
{
oarchive_ = nullptr;
Expand Down

0 comments on commit 2a7c0d4

Please sign in to comment.