Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ o2_add_library(Framework
src/ResourcesMonitoringHelper.cxx
src/ServiceRegistry.cxx
src/SimpleResourceManager.cxx
src/SimpleRawDeviceService.cxx
src/StreamOperators.cxx
src/TMessageSerializer.cxx
src/TableBuilder.cxx
Expand Down
18 changes: 10 additions & 8 deletions Framework/Core/include/Framework/RawDeviceService.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef FRAMEWORK_RAWDEVICESERVICE_H
#define FRAMEWORK_RAWDEVICESERVICE_H
#ifndef O2_FRAMEWORK_RAWDEVICESERVICE_H_
#define O2_FRAMEWORK_RAWDEVICESERVICE_H_

#include "Framework/ServiceHandle.h"

class FairMQDevice;

namespace o2
{
namespace framework
namespace o2::framework
{
class DeviceSpec;

Expand All @@ -33,8 +31,12 @@ class RawDeviceService
virtual FairMQDevice* device() = 0;
virtual void setDevice(FairMQDevice* device) = 0;
virtual DeviceSpec const& spec() = 0;
/// Expose FairMQDevice::WaitFor method to avoid having to include
/// FairMQDevice.h.
///
/// @a time in millisecond to sleep
virtual void waitFor(unsigned int time) = 0;
};

} // namespace framework
} // namespace o2
#endif // FRAMEWORK_RAWDEVICESERVICE_H
} // namespace o2::framework
#endif // O2_FRAMEWORK_RAWDEVICESERVICE_H_
15 changes: 7 additions & 8 deletions Framework/Core/include/Framework/SimpleRawDeviceService.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef FRAMEWORK_SIMPLERAWDEVICESERVICE_H
#define FRAMEWORK_SIMPLERAWDEVICESERVICE_H
#ifndef O2_FRAMEWORK_SIMPLERAWDEVICESERVICE_H_
#define O2_FRAMEWORK_SIMPLERAWDEVICESERVICE_H_

#include "Framework/RawDeviceService.h"
#include "Framework/DeviceSpec.h"

namespace o2
{
namespace framework
namespace o2::framework
{

/// Fairly unsophisticated service which simply stores and returns the
Expand Down Expand Up @@ -43,11 +41,12 @@ class SimpleRawDeviceService : public RawDeviceService
return mSpec;
}

void waitFor(unsigned int ms) final;

private:
FairMQDevice* mDevice;
DeviceSpec const& mSpec;
};

} // namespace framework
} // namespace o2
#endif // FRAMEWORK_SIMPLERAWDEVICESERVICE_H
} // namespace o2::framework
#endif // O2_FRAMEWORK_SIMPLERAWDEVICESERVICE_H__
1 change: 0 additions & 1 deletion Framework/Core/src/AODReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "Framework/ChannelInfo.h"
#include "Framework/Logger.h"

#include <FairMQDevice.h>
#include <ROOT/RDataFrame.hxx>
#include <TGrid.h>
#include <TFile.h>
Expand Down
23 changes: 23 additions & 0 deletions Framework/Core/src/SimpleRawDeviceService.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include "Framework/SimpleRawDeviceService.h"

#include <FairMQDevice.h>

namespace o2::framework
{

void SimpleRawDeviceService::waitFor(unsigned int ms)
{
mDevice->WaitFor(std::chrono::milliseconds(ms));
}

} // namespace o2::framework
3 changes: 1 addition & 2 deletions Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "Framework/RawDeviceService.h"
#include "Framework/StringHelpers.h"

#include "fairmq/FairMQDevice.h"
#include "Headers/DataHeader.h"
#include <algorithm>
#include <list>
Expand Down Expand Up @@ -166,7 +165,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext

return [](ProcessingContext& pc) {
// this callback is never called since there is no expiring input
pc.services().get<RawDeviceService>().device()->WaitFor(std::chrono::seconds(2));
pc.services().get<RawDeviceService>().waitFor(2000);
};
}};

Expand Down
3 changes: 1 addition & 2 deletions Framework/Core/test/test_SimpleDataProcessingDevice01.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include "Framework/RawDeviceService.h"
#include "Framework/runDataProcessing.h"
#include <Monitoring/Monitoring.h>
#include <FairMQDevice.h>

using namespace o2::framework;

Expand All @@ -40,7 +39,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
{OutputSpec{"TPC", "CLUSTERS"},
OutputSpec{"ITS", "CLUSTERS"}},
adaptStateless([](DataAllocator& outputs, ControlService& control, RawDeviceService& service) {
service.device()->WaitFor(std::chrono::milliseconds(1000));
service.waitFor(1000);
// Creates a new message of size 1000 which
// has "TPC" as data origin and "CLUSTERS" as data description.
auto& tpcClusters = outputs.make<FakeCluster>(Output{"TPC", "CLUSTERS", 0}, 1000);
Expand Down
6 changes: 2 additions & 4 deletions Framework/Core/test/test_StaggeringWorkflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
#include "Framework/DispatchPolicy.h"
#include "Framework/DeviceSpec.h"
#include "Framework/Output.h"
#include <FairMQDevice.h>
#include <chrono>
#include <cstring>
#include <iostream>
#include <regex>
Expand Down Expand Up @@ -88,9 +86,9 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
// because of the CompletionPolicy trigger matcher. This message will be
// sent together with the second message.
outputs.snapshot(Output{"PROD", "CHANNEL", subspec, Lifetime::Timeframe}, subspec);
device.device()->WaitFor(std::chrono::milliseconds(100));
device.waitFor(100);
outputs.snapshot(Output{"PROD", "TRIGGER", subspec, Lifetime::Timeframe}, subspec);
device.device()->WaitFor(std::chrono::milliseconds(100));
device.waitFor(100);
}
control.endOfStream();
control.readyToQuit(QuitRequest::Me);
Expand Down