Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throw a meaninful error if LFN isn't found in FileCatalog #27695

Merged
merged 2 commits into from Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 9 additions & 5 deletions IOPool/Streamer/interface/StreamerInputFile.h
Expand Up @@ -15,14 +15,18 @@

namespace edm {
class EventSkipperByID;
class FileCatalogItem;
class StreamerInputFile {
public:
/**Reads a Streamer file */
explicit StreamerInputFile(std::string const& name,
std::string const& LFN,
std::shared_ptr<EventSkipperByID> eventSkipperByID = std::shared_ptr<EventSkipperByID>());
explicit StreamerInputFile(std::string const& name,
std::shared_ptr<EventSkipperByID> eventSkipperByID = std::shared_ptr<EventSkipperByID>());

/** Multiple Streamer files */
explicit StreamerInputFile(std::vector<std::string> const& names,
explicit StreamerInputFile(std::vector<FileCatalogItem> const& names,
std::shared_ptr<EventSkipperByID> eventSkipperByID = std::shared_ptr<EventSkipperByID>());

~StreamerInputFile();
Expand All @@ -45,7 +49,7 @@ namespace edm {
void closeStreamerFile();

private:
void openStreamerFile(std::string const& name);
void openStreamerFile(std::string const& name, std::string const& LFN);
IOSize readBytes(char* buf, IOSize nBytes);
IOOffset skipBytes(IOSize nBytes);

Expand All @@ -65,9 +69,9 @@ namespace edm {
std::vector<char> headerBuf_; /** Buffer to store file Header */
std::vector<char> eventBuf_; /** Buffer to store Event Data */

unsigned int currentFile_; /** keeps track of which file is in use at the moment*/
std::vector<std::string> streamerNames_; /** names of Streamer files */
bool multiStreams_; /** True if Multiple Streams are Read */
unsigned int currentFile_; /** keeps track of which file is in use at the moment*/
std::vector<FileCatalogItem> streamerNames_; /** names of Streamer files */
bool multiStreams_; /** True if Multiple Streams are Read */
std::string currentFileName_;
bool currentFileOpen_;

Expand Down
6 changes: 3 additions & 3 deletions IOPool/Streamer/src/StreamerFileReader.cc
Expand Up @@ -13,13 +13,12 @@ namespace edm {

StreamerFileReader::StreamerFileReader(ParameterSet const& pset, InputSourceDescription const& desc)
: StreamerInputSource(pset, desc),
streamerNames_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames")),
streamReader_(),
eventSkipperByID_(EventSkipperByID::create(pset).release()),
initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")) {
InputFileCatalog catalog(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
pset.getUntrackedParameter<std::string>("overrideCatalog"));
streamerNames_ = catalog.fileNames();
streamerNames_ = catalog.fileCatalogItems();
reset_();
}

Expand All @@ -29,7 +28,8 @@ namespace edm {
if (streamerNames_.size() > 1) {
streamReader_ = std::make_unique<StreamerInputFile>(streamerNames_, eventSkipperByID());
} else if (streamerNames_.size() == 1) {
streamReader_ = std::make_unique<StreamerInputFile>(streamerNames_.at(0), eventSkipperByID());
streamReader_ = std::make_unique<StreamerInputFile>(
streamerNames_.at(0).fileName(), streamerNames_.at(0).logicalFileName(), eventSkipperByID());
} else {
throw Exception(errors::FileReadError, "StreamerFileReader::StreamerFileReader")
<< "No fileNames were specified\n";
Expand Down
3 changes: 2 additions & 1 deletion IOPool/Streamer/src/StreamerFileReader.h
Expand Up @@ -15,6 +15,7 @@ namespace edm {
class ConfigurationDescriptions;
class EventPrincipal;
class EventSkipperByID;
class FileCatalogItem;
struct InputSourceDescription;
class ParameterSet;
class StreamerInputFile;
Expand All @@ -37,7 +38,7 @@ namespace edm {
std::shared_ptr<EventSkipperByID const> eventSkipperByID() const { return get_underlying_safe(eventSkipperByID_); }
std::shared_ptr<EventSkipperByID>& eventSkipperByID() { return get_underlying_safe(eventSkipperByID_); }

std::vector<std::string> streamerNames_; // names of Streamer files
std::vector<FileCatalogItem> streamerNames_; // names of Streamer files
edm::propagate_const<std::unique_ptr<StreamerInputFile>> streamReader_;
edm::propagate_const<std::shared_ptr<EventSkipperByID>> eventSkipperByID_;
int initialNumberOfEventsToSkip_;
Expand Down
33 changes: 25 additions & 8 deletions IOPool/Streamer/src/StreamerInputFile.cc
Expand Up @@ -6,6 +6,7 @@
#include "FWCore/Utilities/interface/EDMException.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/TimeOfDay.h"
#include "FWCore/Catalog/interface/InputFileCatalog.h"

#include "Utilities/StorageFactory/interface/IOFlags.h"
#include "Utilities/StorageFactory/interface/StorageFactory.h"
Expand All @@ -17,7 +18,9 @@ namespace edm {

StreamerInputFile::~StreamerInputFile() { closeStreamerFile(); }

StreamerInputFile::StreamerInputFile(std::string const& name, std::shared_ptr<EventSkipperByID> eventSkipperByID)
StreamerInputFile::StreamerInputFile(std::string const& name,
std::string const& LFN,
std::shared_ptr<EventSkipperByID> eventSkipperByID)
: startMsg_(),
currentEvMsg_(),
headerBuf_(1000 * 1000),
Expand All @@ -33,11 +36,14 @@ namespace edm {
newHeader_(false),
storage_(),
endOfFile_(false) {
openStreamerFile(name);
openStreamerFile(name, LFN);
readStartMessage();
}

StreamerInputFile::StreamerInputFile(std::vector<std::string> const& names,
StreamerInputFile::StreamerInputFile(std::string const& name, std::shared_ptr<EventSkipperByID> eventSkipperByID)
: StreamerInputFile(name, name, eventSkipperByID) {}

StreamerInputFile::StreamerInputFile(std::vector<FileCatalogItem> const& names,
std::shared_ptr<EventSkipperByID> eventSkipperByID)
: startMsg_(),
currentEvMsg_(),
Expand All @@ -53,17 +59,28 @@ namespace edm {
currProto_(0),
newHeader_(false),
endOfFile_(false) {
openStreamerFile(names.at(0));
openStreamerFile(names.at(0).fileName(), names.at(0).logicalFileName());
++currentFile_;
readStartMessage();
currRun_ = startMsg_->run();
currProto_ = startMsg_->protocolVersion();
}

void StreamerInputFile::openStreamerFile(std::string const& name) {
void StreamerInputFile::openStreamerFile(std::string const& name, std::string const& LFN) {
closeStreamerFile();

currentFileName_ = name;

// Check if the logical file name was found.
if (currentFileName_.empty()) {
// LFN not found in catalog.
throw cms::Exception("LogicalFileNameNotFound", "StreamerInputFile::openStreamerFile()\n")
<< "Logical file name '" << LFN << "' was not found in the file catalog.\n"
<< "If you wanted a local file, you forgot the 'file:' prefix\n"
<< "before the file name in your configuration file.\n";
return;
}

logFileAction(" Initiating request to open file ");

IOOffset size = -1;
Expand Down Expand Up @@ -173,9 +190,9 @@ namespace edm {

bool StreamerInputFile::openNextFile() {
if (currentFile_ <= streamerNames_.size() - 1) {
FDEBUG(10) << "Opening file " << streamerNames_.at(currentFile_).c_str() << std::endl;
FDEBUG(10) << "Opening file " << streamerNames_.at(currentFile_).fileName().c_str() << std::endl;

openStreamerFile(streamerNames_.at(currentFile_));
openStreamerFile(streamerNames_.at(currentFile_).fileName(), streamerNames_.at(currentFile_).logicalFileName());

// If start message was already there, then compare the
// previous and new headers
Expand All @@ -198,7 +215,7 @@ namespace edm {
//Values from new Header should match up
if (currRun_ != startMsg_->run() || currProto_ != startMsg_->protocolVersion()) {
throw Exception(errors::MismatchedInputFiles, "StreamerInputFile::compareHeader")
<< "File " << streamerNames_.at(currentFile_)
<< "File " << streamerNames_.at(currentFile_).fileName()
<< "\nhas different run number or protocol version than previous\n";
return false;
}
Expand Down
9 changes: 7 additions & 2 deletions IOPool/Streamer/test/BuildFile.xml
Expand Up @@ -5,13 +5,18 @@
</bin>
<bin file="ReadStreamerFile.cpp">
<use name="IOPool/Streamer"/>
<flags TEST_RUNNER_ARGS=" bin/bash teststreamfile.dat"/>
<use name="FWCore/Catalog"/>
<use name="FWCore/PluginManager"/>
<use name="FWCore/ServiceRegistry"/>
<use name="FWCore/Services"/>
<use name="FWCore/ParameterSetReader"/>
<flags TEST_RUNNER_ARGS="all"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is all a special value known by scram?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that it is an option known by ReadStreamerFile

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it's just an arg to the test program, scram doesn't know anything about the value. Alternatively we could make all the default and get rid of the arg, which might be safer--previously the arg was set to a nonsense string and as a result the tests weren't even being run.

</bin>
<bin file="WriteStreamerFile.cpp">
<use name="IOPool/Streamer"/>
</bin>
<bin file="RunThis_t.cpp" name="NewStreamerUNCOMPRESSED">
<flags TEST_RUNNER_ARGS=" /bin/bash IOPool/Streamer/test RunUNCOMPRESSED.sh"/>
<flags TEST_RUNNER_ARGS="/bin/bash IOPool/Streamer/test RunUNCOMPRESSED.sh"/>
</bin>
<bin file="RunThis_t.cpp" name="NewStreamerZLIB">
<flags TEST_RUNNER_ARGS=" /bin/bash IOPool/Streamer/test RunZLIB.sh"/>
Expand Down
117 changes: 97 additions & 20 deletions IOPool/Streamer/test/ReadStreamerFile.cpp
Expand Up @@ -28,60 +28,85 @@ Disclaimer: Most of the code here is randomly written during
#include "IOPool/Streamer/interface/EventMessage.h"
#include "IOPool/Streamer/interface/InitMessage.h"
#include "IOPool/Streamer/interface/StreamerInputFile.h"
#include "FWCore/Catalog/interface/InputFileCatalog.h"
#include "FWCore/Catalog/interface/SiteLocalConfig.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/PluginManager/interface/PluginManager.h"
#include "FWCore/PluginManager/interface/standard.h"
#include "FWCore/Services/src/SiteLocalConfigService.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"

#include <iostream>

void readSingleStream() {
int readSingleStream(bool verbose) {
try {
// ----------- init
std::string initfilename = "teststreamfile.dat";
edm::StreamerInputFile stream_reader(initfilename);

std::cout << "Trying to Read The Init message from Streamer File: " << initfilename << std::endl;
InitMsgView const* init = stream_reader.startMessage();
std::cout << "\n\n-------------INIT---------------------" << std::endl;
std::cout << "Dump the Init Message from Streamer:-" << std::endl;
dumpInitView(init);
if (verbose) {
std::cout << "\n\n-------------INIT---------------------" << std::endl;
std::cout << "Dump the Init Message from Streamer:-" << std::endl;
dumpInitView(init);
}

// ------- event

while (stream_reader.next()) {
std::cout << "----------EVENT-----------" << std::endl;
EventMsgView const* eview = stream_reader.currentRecord();
dumpEventView(eview);
if (verbose) {
std::cout << "----------EVENT-----------" << std::endl;
dumpEventView(eview);
}
}

} catch (cms::Exception& e) {
std::cerr << "Exception caught: " << e.what() << std::endl;
return 1;
}
return 0;
}

int readMultipleStreams() {
int readMultipleStreams(bool verbose) {
try {
std::unique_ptr<edm::SiteLocalConfig> slcptr =
std::make_unique<edm::service::SiteLocalConfigService>(edm::ParameterSet());
auto slc = std::make_shared<edm::serviceregistry::ServiceWrapper<edm::SiteLocalConfig> >(std::move(slcptr));
edm::ServiceToken slcToken = edm::ServiceRegistry::createContaining(slc);
edm::ServiceRegistry::Operate operate(slcToken);

int evCount = 0;
std::vector<std::string> streamFiles;
streamFiles.push_back("teststreamfile0.dat");
streamFiles.push_back("teststreamfile1.dat");
streamFiles.push_back("file:teststreamfile.dat");
streamFiles.push_back("file:teststreamfile.dat");

edm::StreamerInputFile stream_reader(streamFiles);
edm::InputFileCatalog catalog(streamFiles, "");

edm::StreamerInputFile stream_reader(catalog.fileCatalogItems());

std::cout << "Trying to Read The Init message from Streamer File: "
<< "teststreamfile0.dat" << std::endl;
<< "teststreamfile.dat" << std::endl;

InitMsgView const* init = stream_reader.startMessage();
std::cout << "\n\n-------------INIT---------------------" << std::endl;
std::cout << "Dump the Init Message from Streamer:-" << std::endl;
dumpInitView(init);
if (verbose) {
std::cout << "\n\n-------------INIT---------------------" << std::endl;
std::cout << "Dump the Init Message from Streamer:-" << std::endl;
dumpInitView(init);
}

while (stream_reader.next()) {
if (stream_reader.newHeader()) {
std::cout << "File Boundary has just been crossed, a new file is read" << std::endl;
std::cout << "A new INIT Message is available" << std::endl;
std::cout << "Event from next file is also avialble" << std::endl;
}
std::cout << "----------EVENT-----------" << std::endl;
EventMsgView const* eview = stream_reader.currentRecord();
dumpEventView(eview);
if (verbose) {
std::cout << "----------EVENT-----------" << std::endl;
dumpEventView(eview);
}
++evCount;
}

Expand All @@ -93,9 +118,58 @@ int readMultipleStreams() {
return 0;
}

int readInvalidLFN(bool verbose) {
try {
std::unique_ptr<edm::SiteLocalConfig> slcptr =
std::make_unique<edm::service::SiteLocalConfigService>(edm::ParameterSet());
auto slc = std::make_shared<edm::serviceregistry::ServiceWrapper<edm::SiteLocalConfig> >(std::move(slcptr));
edm::ServiceToken slcToken = edm::ServiceRegistry::createContaining(slc);
edm::ServiceRegistry::Operate operate(slcToken);

int evCount = 0;
std::vector<std::string> streamFiles;
streamFiles.push_back("teststreamfile.dat");

edm::InputFileCatalog catalog(streamFiles, "");

edm::StreamerInputFile stream_reader(catalog.fileCatalogItems());

std::cout << "Trying to Read The Init message from Streamer File: "
<< "teststreamfile.dat" << std::endl;

InitMsgView const* init = stream_reader.startMessage();
if (verbose) {
std::cout << "\n\n-------------INIT---------------------" << std::endl;
std::cout << "Dump the Init Message from Streamer:-" << std::endl;
dumpInitView(init);
}

while (stream_reader.next()) {
if (stream_reader.newHeader()) {
std::cout << "File Boundary has just been crossed, a new file is read" << std::endl;
std::cout << "A new INIT Message is available" << std::endl;
std::cout << "Event from next file is also avialble" << std::endl;
}
EventMsgView const* eview = stream_reader.currentRecord();
if (verbose) {
std::cout << "----------EVENT-----------" << std::endl;
dumpEventView(eview);
}
++evCount;
}

std::cout << " TOTAL Events Read: " << evCount << std::endl;
} catch (cms::Exception& e) {
std::cerr << "Exception caught: " << e.what() << std::endl;
if (e.category() == "LogicalFileNameNotFound")
return 0;
}
return 1;
}

void help() {
std::cout << "Valid options are: " << std::endl;
std::cout << "single, multi, all" << std::endl;
std::cout << "single, multi, invalid, all" << std::endl;
}

int main(int argc, char* argv[]) {
Expand All @@ -107,11 +181,14 @@ int main(int argc, char* argv[]) {

std::string doThis(argv[1]);

int ret(0);
if (doThis == "all" || doThis == "single")
readSingleStream();
ret += readSingleStream(false);
if (doThis == "all" || doThis == "multi")
readMultipleStreams();
ret += readMultipleStreams(false);
if (doThis == "all" || doThis == "invalid")
ret += readInvalidLFN(false);
std::cout << "\n\nReadStreamerFile TEST DONE\n" << std::endl;

return 0;
return ret;
}