Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

output adler32 checksumming propagated to JSON file #5236

Merged
merged 1 commit into from Sep 17, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion DQMServices/Components/src/DQMFileSaver.cc
Expand Up @@ -263,7 +263,7 @@ DQMFileSaver::fillJson(int run, int lumi, const std::string& dataFilePathName, b
std::string dataFileName = bfs::path(dataFilePathName).filename().string();
// The availability test of the FastMonitoringService was done in the ctor.
bpt::ptree data;
bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize, inputFiles;
bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize, inputFiles, fileAdler32;

processedEvents.put("", fms_ ? (fms_->getEventsProcessedForLumi(lumi)) : -1); // Processed events
acceptedEvents.put("", fms_ ? (fms_->getEventsProcessedForLumi(lumi)) : -1); // Accepted events, same as processed for our purposes
Expand All @@ -273,6 +273,7 @@ DQMFileSaver::fillJson(int run, int lumi, const std::string& dataFilePathName, b
fileList.put("", dataFileName); // Data file the information refers to
fileSize.put("", dataFileStat.st_size); // Size in bytes of the data file
inputFiles.put("", ""); // We do not care about input files!
fileAdler32.put("", -1); // placeholder to match output json definition

data.push_back(std::make_pair("", processedEvents));
data.push_back(std::make_pair("", acceptedEvents));
Expand All @@ -281,6 +282,7 @@ DQMFileSaver::fillJson(int run, int lumi, const std::string& dataFilePathName, b
data.push_back(std::make_pair("", fileList));
data.push_back(std::make_pair("", fileSize));
data.push_back(std::make_pair("", inputFiles));
data.push_back(std::make_pair("", fileAdler32));

pt.add_child("data", data);

Expand Down
1 change: 1 addition & 0 deletions EventFilter/Utilities/interface/DataPointDefinition.h
Expand Up @@ -67,6 +67,7 @@ class DataPointDefinition: public JsonSerializable {
static const std::string CAT;
static const std::string MERGE;
static const std::string BINARYOR;
static const std::string ADLER32;

// JSON field names
static const std::string LEGEND;
Expand Down
3 changes: 3 additions & 0 deletions EventFilter/Utilities/interface/EvFDaqDirector.h
Expand Up @@ -62,6 +62,7 @@ namespace evf{
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
std::string getInitFilePath(std::string const& stream) const;
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
Expand All @@ -74,6 +75,7 @@ namespace evf{
std::string getEoRFilePath() const;
std::string getEoRFilePathOnFU() const;
std::string getRunOpenDirPath() const {return run_dir_ +"/open";}
bool outputAdler32Recheck() const {return outputAdler32Recheck_;}
void removeFile(unsigned int ls, unsigned int index);
void removeFile(std::string );

Expand Down Expand Up @@ -120,6 +122,7 @@ namespace evf{
std::string bu_base_dir_;
bool directorBu_;
unsigned int run_;
bool outputAdler32Recheck_;

std::string hostname_;
std::string run_string_;
Expand Down
9 changes: 9 additions & 0 deletions EventFilter/Utilities/interface/FFFNamingSchema.h
Expand Up @@ -79,6 +79,15 @@ namespace fffnaming {
return ss.str();
}

inline std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const& stream, std::string const& instance) {
std::stringstream ss;
runLumiPrefixFill(ss,run,ls);
ss << "_" << stream
<< "_" << instance
<< ".checksum";
return ss.str();
}

inline std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const& stream) {
std::stringstream ss;
runLumiPrefixFill(ss,run,ls);
Expand Down
1 change: 1 addition & 0 deletions EventFilter/Utilities/plugins/BuildFile.xml
Expand Up @@ -3,6 +3,7 @@
<use name="FWCore/ServiceRegistry"/>
<use name="FWCore/Framework"/>
<use name="FWCore/Sources"/>
<use name="FWCore/Utilities"/>
<use name="IOPool/Streamer"/>
<use name="EventFilter/Utilities"/>
<use name="DataFormats/FEDRawData"/>
Expand Down
61 changes: 56 additions & 5 deletions EventFilter/Utilities/plugins/RecoEventOutputModuleForFU.h
Expand Up @@ -8,13 +8,15 @@

#include <sstream>
#include <iomanip>
#include "boost/filesystem.hpp"
#include <boost/filesystem.hpp>
#include <zlib.h>

#include "EventFilter/Utilities/interface/JsonMonitorable.h"
#include "EventFilter/Utilities/interface/FastMonitor.h"
#include "EventFilter/Utilities/interface/JSONSerializer.h"
#include "EventFilter/Utilities/interface/FileIO.h"
#include "EventFilter/Utilities/interface/FastMonitoringService.h"
#include "FWCore/Utilities/interface/Adler32Calculator.h"


namespace evf {
Expand Down Expand Up @@ -47,17 +49,20 @@ namespace evf {
std::auto_ptr<Consumer> c_;
std::string stream_label_;
boost::filesystem::path openDatFilePath_;
boost::filesystem::path openDatChecksumFilePath_;
IntJ processed_;
mutable IntJ accepted_;
IntJ errorEvents_;
IntJ retCodeMask_;
StringJ filelist_;
IntJ filesize_;
StringJ inputFiles_;
IntJ fileAdler32_;
boost::shared_ptr<FastMonitor> jsonMonitor_;
evf::FastMonitoringService *fms_;
DataPointDefinition outJsonDef_;
unsigned char* outBuf_=0;
bool readAdler32Check_=false;


}; //end-of-class-def
Expand All @@ -74,9 +79,11 @@ namespace evf {
filelist_(),
filesize_(0),
inputFiles_(),
fileAdler32_(1),
outBuf_(new unsigned char[1024*1024])
{
std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
// create open dir if not already there
edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
Expand All @@ -95,6 +102,7 @@ namespace evf {
filelist_.setName("Filelist");
filesize_.setName("Filesize");
inputFiles_.setName("InputFiles");
fileAdler32_.setName("FileAdler32");

outJsonDef_.setDefaultGroup("data");
outJsonDef_.addLegendItem("Processed","integer",DataPointDefinition::SUM);
Expand All @@ -104,6 +112,7 @@ namespace evf {
outJsonDef_.addLegendItem("Filelist","string",DataPointDefinition::MERGE);
outJsonDef_.addLegendItem("Filesize","integer",DataPointDefinition::SUM);
outJsonDef_.addLegendItem("InputFiles","string",DataPointDefinition::CAT);
outJsonDef_.addLegendItem("FileAdler32","integer",DataPointDefinition::ADLER32);
std::stringstream tmpss,ss;
tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
Expand All @@ -130,6 +139,7 @@ namespace evf {
jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
jsonMonitor_->commit(nullptr);
}

Expand Down Expand Up @@ -182,6 +192,7 @@ namespace evf {
{
//edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
openDatFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
openDatChecksumFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
c_->setOutputFile(openDatFilePath_.string());
filelist_ = openDatFilePath_.filename().string();
}
Expand All @@ -191,22 +202,45 @@ namespace evf {
{
//edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
long filesize=0;
fileAdler32_.value() = c_->get_adler32();
c_->closeOutputFile();
processed_.value() = fms_->getEventsProcessedForLumi(ls.luminosityBlock());


if(processed_.value()!=0){
//int b;
// move dat file to one level up - this is VERRRRRY inefficient, come up with a smarter idea

//lock
FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
FILE *src = fopen(openDatFilePath_.string().c_str(),"r");

std::string deschecksum = edm::Service<evf::EvFDaqDirector>()->getMergedDatChecksumFilePath(ls.luminosityBlock(), stream_label_);

struct stat istat;
FILE * cf = NULL;
uint32_t mergedAdler32=1;
//get adler32 accumulated checksum for the merged file
if (!stat(deschecksum.c_str(), &istat)) {
std::cout << "checksum size " << istat.st_size << std::endl;
if (istat.st_size) {
cf = fopen(deschecksum.c_str(),"r");
if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open checksum file -: " << deschecksum.c_str();
fscanf(cf,"%u",&mergedAdler32);
fclose(cf);
}
else edm::LogWarning("RecoEventOutputModuleForFU") << "Checksum file size is empty -: "<< deschecksum.c_str();
}

FILE *src = fopen(openDatFilePath_.string().c_str(),"r");

stat(openDatFilePath_.string().c_str(), &istat);
off_t readInput=0;
uint32_t adlera=1;
uint32_t adlerb=0;
while (readInput<istat.st_size) {
unsigned long toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
fread(outBuf_,toRead,1,src);
fwrite(outBuf_,toRead,1,des);
if (readAdler32Check_)
cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
readInput+=toRead;
filesize+=toRead;
}
Expand All @@ -218,8 +252,25 @@ namespace evf {
// }
//}

//write new string representation of the checksum value
cf = fopen(deschecksum.c_str(),"w");
if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();

//write adler32 combine to checksum file
mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);

fprintf(cf,"%u",mergedAdler32);
fclose(cf);

edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
fclose(src);

if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {

throw cms::Exception("RecoEventOutputModuleForFU") << "Adler32 checksum mismatch after reading file -: "
<< openDatFilePath_.string() <<" in LS " << ls.luminosityBlock() << std::endl;
}

}
//remove file
remove(openDatFilePath_.string().c_str());
Expand Down
5 changes: 5 additions & 0 deletions EventFilter/Utilities/plugins/output.jsd
Expand Up @@ -34,6 +34,11 @@
"name" : "InputFiles",
"operation" : "cat",
"type" : "string"
},
{
"name" : "FileAdler32",
"operation" : "sum",
"type" : "integer"
}
]
}
3 changes: 2 additions & 1 deletion EventFilter/Utilities/python/EvFDaqDirector_cfi.py
Expand Up @@ -3,6 +3,7 @@
EvFDaqDirector = cms.Service( "EvFDaqDirector",
buBaseDir = cms.untracked.string(""),
baseDir = cms.untracked.string(""),
runNumber = cms.untracked.uint32(0)
runNumber = cms.untracked.uint32(0),
outputAdler32Recheck=cms.untracked.bool(False)
)

1 change: 1 addition & 0 deletions EventFilter/Utilities/src/DataPointDefinition.cc
Expand Up @@ -20,6 +20,7 @@ const std::string DataPointDefinition::HISTO = "histo";
const std::string DataPointDefinition::CAT = "cat";
const std::string DataPointDefinition::BINARYOR = "binaryOr";
const std::string DataPointDefinition::MERGE = "merge";
const std::string DataPointDefinition::ADLER32 = "adler32";

const std::string DataPointDefinition::LEGEND = "legend";
const std::string DataPointDefinition::DATA = "data";
Expand Down
6 changes: 5 additions & 1 deletion EventFilter/Utilities/src/EvFDaqDirector.cc
Expand Up @@ -48,6 +48,7 @@ namespace evf {
pset.getUntrackedParameter<bool> ("directorIsBu", false)
),
run_(pset.getUntrackedParameter<unsigned int> ("runNumber",0)),
outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck",false)),
hostname_(""),
bu_readlock_fd_(-1),
bu_writelock_fd_(-1),
Expand Down Expand Up @@ -304,7 +305,6 @@ namespace evf {
return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_,ls,index);
}


std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_,ls,stream);
}
Expand All @@ -321,6 +321,10 @@ namespace evf {
return run_dir_ + "/" + fffnaming::streamerDataFileNameWithInstance(run_,ls,stream,hostname_);
}

std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
return run_dir_ + "/" + fffnaming::streamerDataChecksumFileNameWithInstance(run_,ls,stream,hostname_);
}

std::string EvFDaqDirector::getInitFilePath(std::string const& stream) const {
return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_,0,stream);
}
Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/test/startFU.py
Expand Up @@ -88,7 +88,7 @@
getLSFromFilename = cms.untracked.bool(True),
testModeNoBuilderUnit = cms.untracked.bool(False),
verifyAdler32 = cms.untracked.bool(True),
useL1EventID = cms.untracked.bool(False),
useL1EventID = cms.untracked.bool(True),
eventChunkSize = cms.untracked.uint32(16),
numBuffers = cms.untracked.uint32(2),
eventChunkBlock = cms.untracked.uint32(1)
Expand Down