Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove std::memcpy while processing data in compressor task #5302

Merged
merged 1 commit into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,14 @@ template <typename RDH, bool verbose>
class CompressorTask : public Task
{
public:
CompressorTask() { mBufferOut = new char[mBufferOutSize]; };
~CompressorTask() override { delete[] mBufferOut; };
CompressorTask() = default;
~CompressorTask() override = default;
void init(InitContext& ic) final;
void run(ProcessingContext& pc) final;

private:
Compressor<RDH, verbose> mCompressor;

char* mBufferOut = nullptr;
const int mBufferOutSize = 33554432;
int mOutputBufferSize;
};

} // namespace tof
Expand Down
15 changes: 9 additions & 6 deletions Detectors/TOF/compression/src/CompressorTask.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void CompressorTask<RDH, verbose>::init(InitContext& ic)
auto decoderVerbose = ic.options().get<bool>("tof-compressor-decoder-verbose");
auto encoderVerbose = ic.options().get<bool>("tof-compressor-encoder-verbose");
auto checkerVerbose = ic.options().get<bool>("tof-compressor-checker-verbose");
mOutputBufferSize = ic.options().get<bool>("tof-compressor-output-buffer-size");

mCompressor.setDecoderCONET(decoderCONET);
mCompressor.setDecoderVerbose(decoderVerbose);
Expand All @@ -56,10 +57,6 @@ void CompressorTask<RDH, verbose>::run(ProcessingContext& pc)
{
LOG(DEBUG) << "Compressor run";

/** set encoder output buffer **/
mCompressor.setEncoderBuffer(mBufferOut);
mCompressor.setEncoderBufferSize(mBufferOutSize);

auto device = pc.services().get<o2::framework::RawDeviceService>().device();
auto outputRoutes = pc.services().get<o2::framework::RawDeviceService>().spec().outputs;
auto fairMQChannel = outputRoutes.at(0).channel;
Expand All @@ -76,18 +73,24 @@ void CompressorTask<RDH, verbose>::run(ProcessingContext& pc)
/** loop over input parts **/
for (auto const& ref : iit) {

/** input **/
auto headerIn = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
auto dataProcessingHeaderIn = DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(ref);
auto payloadIn = ref.payload;
auto payloadInSize = headerIn->payloadSize;

/** prepare **/
auto bufferSize = mOutputBufferSize > 0 ? mOutputBufferSize : payloadInSize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@preghenella in which situation you would provide explicitly the buffer size, rather then relying on automatic one?
Isn't this prone providing too small size?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shahor02
normal operation in principle better works with an explicit buffer size (via cmdline argument) in case the automatic size is found too small.
it can indeed come with risk of providing too small size, but we will check carefully (and ERRORS will be issued if too small)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw let me add that this PR is part of an incremental update.
eventually we want to try to remove creating these many messages (one for each input part) and possibly lower the number of shm allocations.
This will mean that the automatic buffer allocation will not be usable anymore (we do not know how large are the multi-part messages in beforehand) and we must rely on setting externally a buffer which is large enough to accommodate the output data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@preghenella thanks for explanations. Isn't output size guaranteed to be smaller than the input, in which case the input payload size would be a safe bet? As for merging multiple input parts into 1 output: since all inputs headers at available, one can estimate the output size upper limit in a same way as for single part.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the automatic buffer size is indeed a safe bet (currently it is the default setting in fact).
re. the multi-part, you are probably right and an automatic buffer can be done also in that case.

maybe in the future we can just remove the possibility to define the buffer size from command line.
but for the time being, given that we have no freedom during tests on the FLP to update code on a quick basis, we keep it such that we have an adaptable soluion

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, wanted just to understand if it is really necessary.

auto payloadMessage = device->NewMessage(bufferSize);
mCompressor.setDecoderBuffer(payloadIn);
mCompressor.setDecoderBufferSize(payloadInSize);
mCompressor.setEncoderBuffer((char*)payloadMessage->GetData());
mCompressor.setEncoderBufferSize(bufferSize);

/** run **/
mCompressor.run();
auto payloadOutSize = mCompressor.getEncoderByteCounter();
auto payloadMessage = device->NewMessage(payloadOutSize);
std::memcpy(payloadMessage->GetData(), mBufferOut, payloadOutSize);
payloadMessage->SetUsedSize(payloadOutSize);

/** output **/
auto headerOut = *headerIn;
Expand Down
1 change: 1 addition & 0 deletions Detectors/TOF/compression/src/tof-compressor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
outputs,
algoSpec,
Options{
{"tof-compressor-output-buffer-size", VariantType::Int, 0, {"Encoder output buffer size (in bytes). Zero = automatic (careful)."}},
{"tof-compressor-conet-mode", VariantType::Bool, false, {"Decoder CONET flag"}},
{"tof-compressor-decoder-verbose", VariantType::Bool, false, {"Decoder verbose flag"}},
{"tof-compressor-encoder-verbose", VariantType::Bool, false, {"Encoder verbose flag"}},
Expand Down