In [8]:

from __future__ import division
from __future__ import print_function
import argparse
import logging
from logging import FileHandler
from logging.handlers import TimedRotatingFileHandler
import os
import traceback
import sys
import json
import pickle
import tempfile
from datetime import datetime
import time
from multiprocessing import Process
from WMCore.Configuration import loadConfigurationFile
from WMCore.Services.Requests import Requests

from RESTInteractions import CRABRest
from ServerUtilities import getColumn, encodeRequest, oracleOutputMapping, executeCommand
from ServerUtilities import SERVICE_INSTANCES
from ServerUtilities import getProxiedWebDir


In [9]:
confFile = os.path.abspath('PublisherConfig.py')
config = loadConfigurationFile(confFile)
config = config.General
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger('Logger')
instance = config.instance
restHost = SERVICE_INSTANCES[instance]['restHost']
dbInstance = SERVICE_INSTANCES[instance]['dbInstance']
myp = os.getenv('X509_USER_PROXY')
print(myp)

/tmp/x509up_u8516


In [10]:
logger.warning('rewre')
print(config.logsDir)

/tmp/belforte/data/srv/Publisher/logs


In [10]:
crabServer = CRABRest(hostname=restHost, localcert=myp, localkey=myp, retry=0,
                      userAgent='StefanoTest')
crabServer.setDbInstance(dbInstance=dbInstance)
workflow='211118_173745:belforte_crab_20211118_183738'

In [11]:
crabDBInfo, _, _ = crabServer.get(api='task', data={'subresource':'search', 'workflow':workflow})
dbStatus = getColumn(crabDBInfo, 'tm_task_status')
print(dbStatus)

SUBMITTED


In [12]:
proxiedWebDir = getProxiedWebDir(crabserver=crabServer, task=workflow)
print(proxiedWebDir)


https://cmsweb.cern.ch:8443/scheddmon/059/cms1627/211118_173745:belforte_crab_20211118_183738


In [9]:
_, local_status_cache_pkl = tempfile.mkstemp(dir='/tmp', prefix='status-cache-', suffix='.pkl')
url = proxiedWebDir + "/status_cache.pkl"
host = 'https://cmsweb.cern.ch'  # this is actually dummy but WMCore.Requests needs it
cdict = {'cert':myp, 'key':myp}
req = Requests(url=host, idict=cdict)
_, ret = req.downloadFile(local_status_cache_pkl, url)
if not ret.status == 200:
    raise Exception('download attempt returned HTTP code %d' % ret.status)
with open(local_status_cache_pkl, 'rb') as fp:
    statusCache = pickle.load(fp)

In [11]:
print(statusCache)

{u'fjrParseResCheckpoint': 1441, u'nodeMap': {(8968425, 0): u'42', (8968384, 0): u'1', (8968405, 0): u'22', (8968410, 0): u'27', (8968431, 0): u'49', (8968390, 0): u'7', (8968395, 0): u'12', (8968400, 0): u'17', (8968421, 0): u'38', (8968426, 0): u'43', (8968385, 0): u'2', (8968406, 0): u'23', (8968411, 0): u'28', (8968416, 0): u'33', (8968391, 0): u'8', (8968396, 0): u'13', (8968401, 0): u'18', (8968422, 0): u'39', (8968427, 0): u'45', (8968386, 0): u'3', (8968432, 0): u'50', (8968407, 0): u'24', (8968412, 0): u'29', (8968417, 0): u'34', (8968397, 0): u'14', (8968402, 0): u'19', (8968423, 0): u'40', (8968428, 0): u'46', (8968387, 0): u'4', (8968433, 0): u'44', (8968392, 0): u'9', (8968413, 0): u'30', (8968418, 0): u'35', (8968398, 0): u'15', (8968403, 0): u'20', (8968408, 0): u'25', (8968429, 0): u'47', (8968388, 0): u'5', (8968393, 0): u'10', (8968414, 0): u'31', (8968419, 0): u'36', (8968424, 0): u'41', (8968399, 0): u'16', (8968404, 0): u'21', (8968409, 0): u'26', (8968430, 0): u'4

In [15]:
def translateStatus(statusToTr, dbstatus):
    """Translate from DAGMan internal integer status to a string.

    Uses parameter `dbstatus` to clarify if the state is due to the
    user killing the task.
    """
    statusToTr = {1: 'SUBMITTED', 2: 'SUBMITTED', 3: 'SUBMITTED', 4: 'SUBMITTED', 5: 'COMPLETED', 6: 'FAILED'}[
        statusToTr]
    # Unfortunately DAG code for killed task is 6, just as like for finished DAGs with failed jobs
    # Relabeling the status from 'FAILED' to 'FAILED (KILLED)' if a successful kill command was issued
    if statusToTr == 'FAILED' and dbstatus == 'KILLED':
        statusToTr = 'FAILED (KILLED)'
    return statusToTr


In [16]:
def collapseDAGStatus(dagInfo, dbstatus):
    """Collapse the status of one or several DAGs to a single one.

    Take into account that subdags can be submitted to the queue on the
    schedd, but not yet started.
    """
    status_order = ['SUBMITTED', 'FAILED', 'FAILED (KILLED)', 'COMPLETED']

    subDagInfos = dagInfo.get('SubDags', {})
    subDagStatus = dagInfo.get('SubDagStatus', {})
    # Regular splitting, return status of DAG
    if len(subDagInfos) == 0 and len(subDagStatus) == 0:
        return translateStatus(dagInfo['DagStatus'], dbstatus)

    def check_queued(statusOrSUBMITTED):
        # 99 is the status to expect a submitted DAG. If there are less
        # actual DAG status informations than expected DAGs, at least one
        # DAG has to be queued.
        if dbstatus != 'KILLED' and len(subDagInfos) < len([k for k in subDagStatus if subDagStatus[k] == 99]):
            return 'SUBMITTED'
        return statusOrSUBMITTED

    # If the processing DAG is still running, we are 'SUBMITTED',
    # still.
    if len(subDagInfos) > 0:
        state = translateStatus(subDagInfos[0]['DagStatus'], dbstatus)
        if state == 'SUBMITTED':
            return state
    # Tails active: return most active tail status according to
    # `status_order`
    if len(subDagInfos) > 1:
        states = [translateStatus(subDagInfos[k]['DagStatus'], dbstatus) for k in subDagInfos if k > 0]
        for iStatus in status_order:
            if states.count(iStatus) > 0:
                return check_queued(iStatus)
    # If no tails are active, return the status of the processing DAG.
    if len(subDagInfos) > 0:
        return check_queued(translateStatus(subDagInfos[0]['DagStatus'], dbstatus))
    return check_queued(translateStatus(dagInfo['DagStatus'], dbstatus))


In [19]:
statusCacheInfo = statusCache['nodes']
dagInfo = statusCacheInfo['DagStatus']
print(dagInfo)
dagStatus = collapseDAGStatus(dagInfo, dbStatus)
print(dagStatus)
status = dagStatus

{u'SubDagStatus': {}, u'Timestamp': 1637357050, u'NodesTotal': 50, u'SubDags': {}, u'DagStatus': 5}
COMPLETED


In [20]:
def getTaskStatusFromSched(self, workflow, logger):

    def translateStatus(statusToTr, dbstatus):
        """Translate from DAGMan internal integer status to a string.

        Uses parameter `dbstatus` to clarify if the state is due to the
        user killing the task.
        """
        statusToTr = {1: 'SUBMITTED', 2: 'SUBMITTED', 3: 'SUBMITTED', 4: 'SUBMITTED', 5: 'COMPLETED', 6: 'FAILED'}[
            statusToTr]
        # Unfortunately DAG code for killed task is 6, just as like for finished DAGs with failed jobs
        # Relabeling the status from 'FAILED' to 'FAILED (KILLED)' if a successful kill command was issued
        if statusToTr == 'FAILED' and dbstatus == 'KILLED':
            statusToTr = 'FAILED (KILLED)'
        return statusToTr

    def collapseDAGStatus(dagInfo, dbstatus):
        """Collapse the status of one or several DAGs to a single one.

        Take into account that subdags can be submitted to the queue on the
        schedd, but not yet started.
        """
        status_order = ['SUBMITTED', 'FAILED', 'FAILED (KILLED)', 'COMPLETED']

        subDagInfos = dagInfo.get('SubDags', {})
        subDagStatus = dagInfo.get('SubDagStatus', {})
        # Regular splitting, return status of DAG
        if len(subDagInfos) == 0 and len(subDagStatus) == 0:
            return translateStatus(dagInfo['DagStatus'], dbstatus)

        def check_queued(statusOrSUBMITTED):
            # 99 is the status to expect a submitted DAG. If there are less
            # actual DAG status informations than expected DAGs, at least one
            # DAG has to be queued.
            if dbstatus != 'KILLED' and len(subDagInfos) < len([k for k in subDagStatus if subDagStatus[k] == 99]):
                return 'SUBMITTED'
            return statusOrSUBMITTED

        # If the processing DAG is still running, we are 'SUBMITTED',
        # still.
        if len(subDagInfos) > 0:
            state = translateStatus(subDagInfos[0]['DagStatus'], dbstatus)
            if state == 'SUBMITTED':
                return state
        # Tails active: return most active tail status according to
        # `status_order`
        if len(subDagInfos) > 1:
            states = [translateStatus(subDagInfos[k]['DagStatus'], dbstatus) for k in subDagInfos if k > 0]
            for iStatus in status_order:
                if states.count(iStatus) > 0:
                    return check_queued(iStatus)
        # If no tails are active, return the status of the processing DAG.
        if len(subDagInfos) > 0:
            return check_queued(translateStatus(subDagInfos[0]['DagStatus'], dbstatus))
        return check_queued(translateStatus(dagInfo['DagStatus'], dbstatus))

    crabDBInfo, _, _ = self.crabServer.get(api='task', data={'subresource':'search', 'workflow':workflow})
    dbStatus = getColumn(crabDBInfo, 'tm_task_status')
    if dbStatus == 'KILLED':
        return 'KILLED'
    proxiedWebDir = getProxiedWebDir(crabserver=self.crabServer, task=workflow, logFunction=logger)
    # Download status_cache file
    _, local_status_cache_pkl = tempfile.mkstemp(dir='/tmp', prefix='status-cache-', suffix='.pkl')
    url = proxiedWebDir + "/status_cache.pkl"
    host = 'https://cmsweb.cern.ch'  # this is actually dummy but WMCore.Requests needs it
    cdict = {'cert':self.config.serviceCert, 'key':self.config.serviceKey}
    req = Requests(url=host, idict=cdict)
    _, ret = req.downloadFile(local_status_cache_pkl, url)
    if not ret.status == 200:
        raise Exception('download attempt returned HTTP code %d' % ret.status)
    with open(local_status_cache_pkl, 'rb') as fp:
        statusCache = pickle.load(fp)
    statusCacheInfo = statusCache['nodes']
    dagInfo = statusCacheInfo['DagStatus']
    dagStatus = self.collapseDAGStatus(dagInfo, dbStatus)
    #statusToString = {1:'SUBMITTED', 2:'SUBMITTED', 3:'SUBMITTED', 4:'SUBMITTED', 5:'COMPLETED', 6:'FAILED'}
    #status = statusToString[dagStatus]
    status = dagStatus
    return status


In [2]:
import os
confFile = os.path.abspath('PublisherConfig.py')
configurationFile = confFile

In [3]:
from Publisher.PublisherMaster import Master

In [3]:
master = Master(confFile=configurationFile)
