Skip to content

Commit

Permalink
Merge pull request #23947 from smorovic/newFileServer-10_2_X
Browse files Browse the repository at this point in the history
DAQ file broker client (10_2_X)
  • Loading branch information
cmsbuild committed Jul 25, 2018
2 parents 42cbdcc + 09f835c commit aa73fa4
Show file tree
Hide file tree
Showing 5 changed files with 974 additions and 204 deletions.
42 changes: 42 additions & 0 deletions EventFilter/Utilities/interface/EvFDaqDirector.h
Expand Up @@ -26,6 +26,8 @@
#include <cstdio>

#include <tbb/concurrent_hash_map.h>
#include <boost/filesystem.hpp>
#include <boost/asio.hpp>

class SystemBounds;
class GlobalContext;
Expand All @@ -43,6 +45,10 @@ namespace Json{
class Value;
}

namespace jsoncollector {
class DataPointDefinition;
}

namespace edm {
class ConfigurationDescriptions;
}
Expand Down Expand Up @@ -72,6 +78,7 @@ namespace evf{
std::string &baseRunDir(){return run_dir_;}
std::string &buBaseRunDir(){return bu_run_dir_;}
std::string &buBaseRunOpenDir(){return bu_run_open_dir_;}
bool useFileBroker() const {return useFileBroker_;}

std::string findCurrentRunDir(){ return dirManager_.findRunDir(run_);}
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
Expand All @@ -97,6 +104,7 @@ namespace evf{
std::string getBoLSFilePathOnFU(const unsigned int ls) const;
std::string getEoRFilePath() const;
std::string getEoRFilePathOnFU() const;
std::string getFFFParamsFilePathOnBU() const;
std::string getRunOpenDirPath() const {return run_dir_ +"/open";}
bool outputAdler32Recheck() const {return outputAdler32Recheck_;}
void removeFile(unsigned int ls, unsigned int index);
Expand All @@ -113,9 +121,21 @@ namespace evf{
void unlockFULocal();
void lockFULocal2();
void unlockFULocal2();
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS = true);
int grabNextJsonFile(std::string const& jsonSourcePath, std::string const& rawSourcePath, int64_t& fileSizeFromJson, bool& fileFound);
int grabNextJsonFileAndUnlock(boost::filesystem::path const& jsonSourcePath);

EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus, bool& serverState,
uint32_t& serverLS, uint32_t& closedServerLS,
std::string& nextFileJson, std::string& nextFileRaw, int maxLS);

FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int& ls, std::string& nextFile,
int& serverEventsInNewFile_, int64_t& fileSize, uint64_t& thisLockWaitTimeUs);
void createRunOpendirMaybe();
void createProcessingNotificationMaybe() const;
int readLastLSEntry(std::string const& file);
unsigned int getLumisectionToStart() const;
void setDeleteTracking( std::mutex* fileDeleteLock,std::list<std::pair<int,InputFile*>> *filesToDelete) {
fileDeleteLockPtr_=fileDeleteLock;
filesToDeletePtr_ = filesToDelete;
Expand All @@ -141,6 +161,12 @@ namespace evf{
std::string bu_base_dir_;
bool directorBu_;
unsigned int run_;
bool useFileBroker_;
std::string fileBrokerHost_;
std::string fileBrokerPort_;
bool fileBrokerKeepAlive_;
bool fileBrokerUseLocalLock_;
unsigned int startFromLS_ = 1;
bool outputAdler32Recheck_;
bool requireTSPSet_;
std::string selectedTransferMode_;
Expand All @@ -150,6 +176,8 @@ namespace evf{

std::string hostname_;
std::string run_string_;
std::string run_nstring_;
std::string pid_;
std::string run_dir_;
std::string bu_run_dir_;
std::string bu_run_open_dir_;
Expand Down Expand Up @@ -199,6 +227,20 @@ namespace evf{

//values initialized in .cc file
static const std::vector<std::string> MergeTypeNames_;


//json parser
jsoncollector::DataPointDefinition *dpd_;

boost::asio::io_service io_service_;
std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
//boost::asio::io_context io_context_;
//tcp::resolver resolver_;
//tcp::resolver::results_type endpoints_;

};
}

Expand Down
15 changes: 4 additions & 11 deletions EventFilter/Utilities/interface/FedRawDataInputSource.h
Expand Up @@ -33,11 +33,6 @@ namespace evf {
class FastMonitoringService;
}

namespace jsoncollector {
class DataPointDefinition;
}


class FedRawDataInputSource: public edm::RawInputSource {

friend struct InputFile;
Expand All @@ -57,12 +52,10 @@ friend struct InputChunk;
void rewind_() override;

void maybeOpenNewLumiSection(const uint32_t lumiSection);
void createBoLSFile(const uint32_t lumiSection,bool checkIfExists);
evf::EvFDaqDirector::FileStatus nextEvent();
evf::EvFDaqDirector::FileStatus getNextEvent();
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection&);
void deleteFile(std::string const&);
int grabNextJsonFile(boost::filesystem::path const&);

void readSupervisor();
void readWorker(unsigned int tid);
Expand Down Expand Up @@ -94,10 +87,12 @@ friend struct InputChunk;

// get LS from filename instead of event header
const bool getLSFromFilename_;
const bool alwaysStartFromFirstLS_;
const bool verifyAdler32_;
const bool verifyChecksum_;
const bool useL1EventID_;
std::vector<std::string> fileNames_;
bool useFileBroker_;
//std::vector<std::string> fileNamesSorted_;

const bool fileListMode_;
Expand All @@ -123,8 +118,6 @@ friend struct InputChunk;
unsigned int eventsThisLumi_;
unsigned long eventsThisRun_ = 0;

jsoncollector::DataPointDefinition *dpd_;

/*
*
* Multithreaded file reader
Expand Down Expand Up @@ -211,7 +204,7 @@ struct InputFile {
evf::EvFDaqDirector::FileStatus status_;
unsigned int lumi_;
std::string fileName_;
uint32_t fileSize_;
uint64_t fileSize_;
uint32_t nChunks_;
int nEvents_;
unsigned int nProcessed_;
Expand All @@ -223,7 +216,7 @@ struct InputFile {
unsigned int currentChunk_ = 0;

InputFile(evf::EvFDaqDirector::FileStatus status, unsigned int lumi = 0, std::string const& name = std::string(),
uint32_t fileSize =0, uint32_t nChunks=0, int nEvents=0, FedRawDataInputSource *parent = nullptr):
uint64_t fileSize =0, uint32_t nChunks=0, int nEvents=0, FedRawDataInputSource *parent = nullptr):
parent_(parent),
status_(status),
lumi_(lumi),
Expand Down

0 comments on commit aa73fa4

Please sign in to comment.