Skip to content

Commit

Permalink
Record job ad at abort or terminate time into a file. #4784
Browse files Browse the repository at this point in the history
This commit records the job ad at abort / terminate time before the
corresponding user log event is logged.  This guarantees that the
job's final(ish) ClassAd is available to a DAG's postjob script.

Note this is slightly different than what is found in the history
file - the job may not exit the queue after any given abort/terminate.

If nothing else, the JobStatus is still running when this code path
is reached.
  • Loading branch information
bbockelm committed Dec 17, 2014
1 parent c97748d commit 4fca2c8
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 57 deletions.
1 change: 1 addition & 0 deletions src/condor_includes/condor_attributes.h
Expand Up @@ -674,6 +674,7 @@
#define ATTR_UID_DOMAIN "UidDomain"
#define ATTR_ULOG_FILE "UserLog"
#define ATTR_ULOG_USE_XML "UserLogUseXML"
#define ATTR_HISTORY_FILE "History"
#define ATTR_UPDATE_INTERVAL "UpdateInterval"
#define ATTR_CLASSAD_LIFETIME "ClassAdLifetime"
#define ATTR_UPDATE_PRIO "UpdatePrio"
Expand Down
43 changes: 43 additions & 0 deletions src/condor_schedd.V6/schedd.cpp
Expand Up @@ -3262,6 +3262,45 @@ namespace {
}
}
}

bool
Scheduler::WriteHistoryFile(PROC_ID job_id)
{
const ClassAd *ad = GetJobAd(job_id.cluster, job_id.proc);
if (!ad)
{
dprintf(D_FULLDEBUG, "Unable to find job ad for %d.%d in order to write history file.\n", job_id.cluster, job_id.proc);
return false;
}
std::string history_file;
if (!ad->EvaluateAttrString(ATTR_HISTORY_FILE, history_file)) {return false;}

{
TemporaryPrivSentry tps(*ad);
int fd = safe_open_wrapper_follow(history_file.c_str(), O_APPEND|O_WRONLY);
if (fd == -1)
{
dprintf(D_FULLDEBUG,
"ERROR: Invalid log file: \"%s\" (errno=%d, %s)\n",
history_file.c_str(), errno, strerror(errno));
return false;
}
classad::ClassAdUnParser unparser;
unparser.SetOldClassAd(true);
std::string buf;
unparser.Unparse(buf, ad);
buf += "\n";
if (-1 == full_write(fd, buf.c_str(), buf.size()))
{
dprintf(D_ALWAYS,
"ERROR: Unable to write to log file \"%s\" (errno=%d, %s).\n",
history_file.c_str(), errno, strerror(errno));
return false;
}
}
return true;
}

// Initialize a WriteUserLog object for a given job and return a pointer to
// the WriteUserLog object created. This object can then be used to write
// events and must be deleted when you're done. This returns NULL if
Expand Down Expand Up @@ -3395,11 +3434,13 @@ Scheduler::WriteSubmitToUserLog( PROC_ID job_id, bool do_fsync )
bool
Scheduler::WriteAbortToUserLog( PROC_ID job_id )
{
WriteHistoryFile(job_id);
WriteUserLog* ULog = this->InitializeUserLog( job_id );
if( ! ULog ) {
// User didn't want log
return true;
}

JobAbortedEvent event;

char* reason = NULL;
Expand Down Expand Up @@ -3566,11 +3607,13 @@ Scheduler::WriteEvictToUserLog( PROC_ID job_id, bool checkpointed )
bool
Scheduler::WriteTerminateToUserLog( PROC_ID job_id, int status )
{
WriteHistoryFile(job_id);
WriteUserLog* ULog = this->InitializeUserLog( job_id );
if( ! ULog ) {
// User didn't want log
return true;
}

JobTerminatedEvent event;
struct rusage r;
memset( &r, 0, sizeof(struct rusage) );
Expand Down
1 change: 1 addition & 0 deletions src/condor_schedd.V6/scheduler.h
Expand Up @@ -400,6 +400,7 @@ class Scheduler : public Service
bool availableTransferd( int cluster, int proc,
TransferDaemon *&td_ref );
bool startTransferd( int cluster, int proc );
bool WriteHistoryFile(PROC_ID job_id);
WriteUserLog* InitializeUserLog( PROC_ID job_id );
bool WriteSubmitToUserLog( PROC_ID job_id, bool do_fsync );
bool WriteAbortToUserLog( PROC_ID job_id );
Expand Down
34 changes: 33 additions & 1 deletion src/condor_shadow.V6.1/baseshadow.cpp
Expand Up @@ -936,6 +936,36 @@ static void set_usageAd (ClassAd* jobAd, ClassAd ** ppusageAd)
}
}

static bool
WriteHistoryFile(const classad::ClassAd &ad)
{
std::string history_file;
if (!ad.EvaluateAttrString(ATTR_HISTORY_FILE, history_file)) {return false;}

TemporaryPrivSentry tps(ad);
int fd = safe_open_wrapper_follow(history_file.c_str(), O_APPEND|O_WRONLY);
if (fd == -1)
{
dprintf(D_ALWAYS,
"ERROR: Invalid log file: \"%s\" (errno=%d, %s)\n",
history_file.c_str(), errno, strerror(errno));
return false;
}
classad::ClassAdUnParser unparser;
unparser.SetOldClassAd(true);
std::string buf;
unparser.Unparse(buf, &ad);
buf += "\n";
if (-1 == full_write(fd, buf.c_str(), buf.size()))
{
dprintf(D_ALWAYS,
"ERROR: Unable to write to log file \"%s\" (errno=%d, %s).\n",
history_file.c_str(), errno, strerror(errno));
return false;
}
return true;
}

// kind defaults to US_NORMAL.
void
BaseShadow::logTerminateEvent( int exitReason, update_style_t kind )
Expand Down Expand Up @@ -1003,6 +1033,7 @@ BaseShadow::logTerminateEvent( int exitReason, update_style_t kind )
event.setCoreFile( corefile.Value() );
}

WriteHistoryFile(*jobAd);
if (!uLog.writeEvent (&event,jobAd)) {
dprintf (D_ALWAYS,"Unable to log "
"ULOG_JOB_TERMINATED event\n");
Expand Down Expand Up @@ -1083,7 +1114,8 @@ BaseShadow::logTerminateEvent( int exitReason, update_style_t kind )
event.pusageAd = puAd;
}
#endif


WriteHistoryFile(*jobAd);
if (!uLog.writeEvent (&event,jobAd)) {
dprintf (D_ALWAYS,"Unable to log "
"ULOG_JOB_TERMINATED event\n");
Expand Down
148 changes: 92 additions & 56 deletions src/condor_submit.V6/submit.cpp
Expand Up @@ -269,6 +269,7 @@ const char *NotifyUser = "notify_user";
const char *EmailAttributes = "email_attributes";
const char *ExitRequirements = "exit_requirements";
const char *UserLogFile = "log";
const char *HistoryFile = "history";
const char *UseLogUseXML = "log_xml";
const char *DagmanLogFile = "dagman_log";
const char *CoreSize = "coresize";
Expand Down Expand Up @@ -510,6 +511,7 @@ void SetIWD();
void ComputeIWD();
void SetUserLog();
void SetUserLogXML();
void SetHistoryFile();
void SetCoreSize();
void SetFileOptions();
#if !defined(WIN32)
Expand Down Expand Up @@ -5081,6 +5083,89 @@ check_iwd( char const *iwd )
}
}

static void
WarnAboutNFS(const std::string &log)
{
// Check that the log file isn't on NFS
bool nfs_is_error = param_boolean("LOG_ON_NFS_IS_ERROR", false);
bool nfs = false;

if (!nfs_is_error) { return; }

if ( fs_detect_nfs( log.c_str(), &nfs ) != 0 ) {
fprintf(stderr,
"\nWARNING: Can't determine whether log file %s is on NFS\n",
log.c_str() );
} else if ( nfs ) {
fprintf(stderr,
"\nERROR: Log file %s is on NFS.\nThis could cause"
" log file corruption. Condor has been configured to"
" prohibit log files on NFS.\n",
log.c_str() );
DoCleanup(0,0,NULL);
exit( 1 );
}
}

static void
LogFileChecks(const std::string &log)
{
if (!DumpClassAdToFile)
{
// check that the log is a valid path
if (!DisableFileChecks)
{
FILE* test = safe_fopen_wrapper_follow(log.c_str(), "a+", 0664);
if (!test)
{
fprintf(stderr,
"\nERROR: Invalid log file: \"%s\" (%s)\n", log.c_str(),
strerror(errno));
exit(1);
}
else
{
fclose(test);
}
}

WarnAboutNFS(log);
}
}

static bool
HandleLogFile(char *input_log, std::string &log_fname)
{
if (!input_log) {return false;}
const char *input_log_path_char = full_path(input_log);
if (!input_log_path_char)
{
free(input_log);
return false;
}
std::string log_path(input_log_path_char);
free(input_log);
LogFileChecks(log_path);

MyString mlog(log_path.c_str());
check_and_universalize_path(mlog);
log_fname = mlog;
return true;
}

void
SetHistoryFile()
{
char *history = condor_param(HistoryFile, ATTR_HISTORY_FILE);
std::string history_fname;
if (HandleLogFile(history, history_fname))
{
std::stringstream ss;
ss << ATTR_HISTORY_FILE << " = " << "\"" << history_fname << "\"";
InsertJobExpr(ss.str().c_str());
}
}

void
SetUserLog()
{
Expand All @@ -5090,62 +5175,12 @@ SetUserLog()
*p && *q; ++p, ++q) {
char *ulog_entry = condor_param( *p, *q );

if(ulog_entry) {
std::string buffer;
std::string current_userlog(ulog_entry);
const char* ulog_pcc = full_path(current_userlog.c_str());
if(ulog_pcc) {
std::string ulog(ulog_pcc);
if ( !DumpClassAdToFile ) {
// check that the log is a valid path
if ( !DisableFileChecks ) {
FILE* test = safe_fopen_wrapper_follow(ulog.c_str(), "a+", 0664);
if (!test) {
fprintf(stderr,
"\nERROR: Invalid log file: \"%s\" (%s)\n", ulog.c_str(),
strerror(errno));
exit( 1 );
} else {
fclose(test);
}
}

// Check that the log file isn't on NFS
bool nfs_is_error = param_boolean("LOG_ON_NFS_IS_ERROR", false);
bool nfs = false;

if ( nfs_is_error ) {
if ( fs_detect_nfs( ulog.c_str(), &nfs ) != 0 ) {
fprintf(stderr,
"\nWARNING: Can't determine whether log file %s is on NFS\n",
ulog.c_str() );
} else if ( nfs ) {

fprintf(stderr,
"\nERROR: Log file %s is on NFS.\nThis could cause"
" log file corruption. Condor has been configured to"
" prohibit log files on NFS.\n",
ulog.c_str() );

DoCleanup(0,0,NULL);
exit( 1 );

}
}
}

MyString mulog(ulog.c_str());
check_and_universalize_path(mulog);
buffer += mulog.Value();
UserLogSpecified = true;
}
std::string logExpr(*q);
logExpr += " = ";
logExpr += "\"";
logExpr += buffer;
logExpr += "\"";
InsertJobExpr(logExpr.c_str());
free(ulog_entry);
std::string log_fname;
if (HandleLogFile(ulog_entry, log_fname))
{
std::stringstream ss;
ss << *q << " = " << "\"" << log_fname << "\"";
InsertJobExpr(ss.str().c_str());
}
}
}
Expand Down Expand Up @@ -6656,6 +6691,7 @@ queue(int num)
// really a command, needs to happen before any calls to check_open
SetJobDisableFileChecks();

SetHistoryFile();
SetUserLog();
SetUserLogXML();
SetCoreSize();
Expand Down

0 comments on commit 4fca2c8

Please sign in to comment.