Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAQ file broker client (10_2_X) #23947

Merged
merged 20 commits into from Jul 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ded3225
reorganizing source and service to support both old locking and new f…
smorovic Jul 3, 2018
0ce25b3
2nd part of modifications to support new FFF file service interface
smorovic Jul 5, 2018
28d786f
handle max LS (cloud stopping) and updates to the API
smorovic Jul 5, 2018
742ae6e
fixes to file locking and changes to the file service protocol
smorovic Jul 9, 2018
602af5a
back off mechanism to reduce number of open sockets in BU service
smorovic Jul 10, 2018
0f0e8bd
allow stop LS 0
smorovic Jul 10, 2018
7ade6d4
logging for stop LS
smorovic Jul 11, 2018
26b08bf
pid added to HTTP attributes (for possible use in renaming)
smorovic Jul 13, 2018
76b9f15
support for optional unlocking
smorovic Jul 13, 2018
969847d
rename fileService to fileBroker
smorovic Jul 15, 2018
406703b
write EoLS files before index files from next LS are written locally
smorovic Jul 15, 2018
958ca03
remove shared lock implementation which doesn't work correctly
smorovic Jul 16, 2018
aa41c87
removing unused lock file function
smorovic Jul 16, 2018
7e85cf1
proper error recovery (avoiding assertions, segmentation faults) in c…
smorovic Jul 18, 2018
7ef6400
Use HTTP 1.1 as it will be needed for keep alive
smorovic Jul 18, 2018
163d1da
instead of boost::filesystem high-level functions using low level file
smorovic Jul 19, 2018
bac7414
fix for keep-alive
smorovic Jul 22, 2018
1ee9ac4
2nd round of keep-alive fixes
smorovic Jul 23, 2018
c807ce6
catching const& exceptions
smorovic Jul 23, 2018
09f835c
remove double const
smorovic Jul 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 42 additions & 0 deletions EventFilter/Utilities/interface/EvFDaqDirector.h
Expand Up @@ -26,6 +26,8 @@
#include <cstdio>

#include <tbb/concurrent_hash_map.h>
#include <boost/filesystem.hpp>
#include <boost/asio.hpp>

class SystemBounds;
class GlobalContext;
Expand All @@ -43,6 +45,10 @@ namespace Json{
class Value;
}

namespace jsoncollector {
class DataPointDefinition;
}

namespace edm {
class ConfigurationDescriptions;
}
Expand Down Expand Up @@ -72,6 +78,7 @@ namespace evf{
std::string &baseRunDir(){return run_dir_;}
std::string &buBaseRunDir(){return bu_run_dir_;}
std::string &buBaseRunOpenDir(){return bu_run_open_dir_;}
bool useFileBroker() const {return useFileBroker_;}

std::string findCurrentRunDir(){ return dirManager_.findRunDir(run_);}
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
Expand All @@ -97,6 +104,7 @@ namespace evf{
std::string getBoLSFilePathOnFU(const unsigned int ls) const;
std::string getEoRFilePath() const;
std::string getEoRFilePathOnFU() const;
std::string getFFFParamsFilePathOnBU() const;
std::string getRunOpenDirPath() const {return run_dir_ +"/open";}
bool outputAdler32Recheck() const {return outputAdler32Recheck_;}
void removeFile(unsigned int ls, unsigned int index);
Expand All @@ -113,9 +121,21 @@ namespace evf{
void unlockFULocal();
void lockFULocal2();
void unlockFULocal2();
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS = true);
int grabNextJsonFile(std::string const& jsonSourcePath, std::string const& rawSourcePath, int64_t& fileSizeFromJson, bool& fileFound);
int grabNextJsonFileAndUnlock(boost::filesystem::path const& jsonSourcePath);

EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus, bool& serverState,
uint32_t& serverLS, uint32_t& closedServerLS,
std::string& nextFileJson, std::string& nextFileRaw, int maxLS);

FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int& ls, std::string& nextFile,
int& serverEventsInNewFile_, int64_t& fileSize, uint64_t& thisLockWaitTimeUs);
void createRunOpendirMaybe();
void createProcessingNotificationMaybe() const;
int readLastLSEntry(std::string const& file);
unsigned int getLumisectionToStart() const;
void setDeleteTracking( std::mutex* fileDeleteLock,std::list<std::pair<int,InputFile*>> *filesToDelete) {
fileDeleteLockPtr_=fileDeleteLock;
filesToDeletePtr_ = filesToDelete;
Expand All @@ -141,6 +161,12 @@ namespace evf{
std::string bu_base_dir_;
bool directorBu_;
unsigned int run_;
bool useFileBroker_;
std::string fileBrokerHost_;
std::string fileBrokerPort_;
bool fileBrokerKeepAlive_;
bool fileBrokerUseLocalLock_;
unsigned int startFromLS_ = 1;
bool outputAdler32Recheck_;
bool requireTSPSet_;
std::string selectedTransferMode_;
Expand All @@ -150,6 +176,8 @@ namespace evf{

std::string hostname_;
std::string run_string_;
std::string run_nstring_;
std::string pid_;
std::string run_dir_;
std::string bu_run_dir_;
std::string bu_run_open_dir_;
Expand Down Expand Up @@ -199,6 +227,20 @@ namespace evf{

//values initialized in .cc file
static const std::vector<std::string> MergeTypeNames_;


//json parser
jsoncollector::DataPointDefinition *dpd_;

boost::asio::io_service io_service_;
std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
//boost::asio::io_context io_context_;
//tcp::resolver resolver_;
//tcp::resolver::results_type endpoints_;

};
}

Expand Down
15 changes: 4 additions & 11 deletions EventFilter/Utilities/interface/FedRawDataInputSource.h
Expand Up @@ -33,11 +33,6 @@ namespace evf {
class FastMonitoringService;
}

namespace jsoncollector {
class DataPointDefinition;
}


class FedRawDataInputSource: public edm::RawInputSource {

friend struct InputFile;
Expand All @@ -57,12 +52,10 @@ friend struct InputChunk;
void rewind_() override;

void maybeOpenNewLumiSection(const uint32_t lumiSection);
void createBoLSFile(const uint32_t lumiSection,bool checkIfExists);
evf::EvFDaqDirector::FileStatus nextEvent();
evf::EvFDaqDirector::FileStatus getNextEvent();
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection&);
void deleteFile(std::string const&);
int grabNextJsonFile(boost::filesystem::path const&);

void readSupervisor();
void readWorker(unsigned int tid);
Expand Down Expand Up @@ -94,10 +87,12 @@ friend struct InputChunk;

// get LS from filename instead of event header
const bool getLSFromFilename_;
const bool alwaysStartFromFirstLS_;
const bool verifyAdler32_;
const bool verifyChecksum_;
const bool useL1EventID_;
std::vector<std::string> fileNames_;
bool useFileBroker_;
//std::vector<std::string> fileNamesSorted_;

const bool fileListMode_;
Expand All @@ -123,8 +118,6 @@ friend struct InputChunk;
unsigned int eventsThisLumi_;
unsigned long eventsThisRun_ = 0;

jsoncollector::DataPointDefinition *dpd_;

/*
*
* Multithreaded file reader
Expand Down Expand Up @@ -211,7 +204,7 @@ struct InputFile {
evf::EvFDaqDirector::FileStatus status_;
unsigned int lumi_;
std::string fileName_;
uint32_t fileSize_;
uint64_t fileSize_;
uint32_t nChunks_;
int nEvents_;
unsigned int nProcessed_;
Expand All @@ -223,7 +216,7 @@ struct InputFile {
unsigned int currentChunk_ = 0;

InputFile(evf::EvFDaqDirector::FileStatus status, unsigned int lumi = 0, std::string const& name = std::string(),
uint32_t fileSize =0, uint32_t nChunks=0, int nEvents=0, FedRawDataInputSource *parent = nullptr):
uint64_t fileSize =0, uint32_t nChunks=0, int nEvents=0, FedRawDataInputSource *parent = nullptr):
parent_(parent),
status_(status),
lumi_(lumi),
Expand Down