In [1]:
# The jupyter magic command will install the package (this notebook was produced with the version 0.3.3)
%pip install --user CMSMonitoring
# As the package is installed in the user space we will need to update the sys.path list 
# to use it. This is only necessary if you don't want to restart the kernel.
# (i.e. you will not need to do this in a script.)
import sys
import site
sys.path.insert(0,site.getusersitepackages())
print(sys.path)

Collecting stomp.py==4.1.21
  Downloading stomp.py-4.1.21.tar.gz (49 kB)
[K     |████████████████████████████████| 49 kB 4.0 MB/s eta 0:00:011
Building wheels for collected packages: stomp.py
  Building wheel for stomp.py (setup.py) ... [?25ldone
[?25h  Created wheel for stomp.py: filename=stomp.py-4.1.21-py2.py3-none-any.whl size=38552 sha256=338c52eccc4064c488aa0fe802de879a9b700d32f61421aeef07179aa023459a
  Stored in directory: /tmp/gartung/.cache/pip/wheels/ba/30/2a/989957ebacf5e70e983563f756f871148a3f54b66ae41049c1
Successfully built stomp.py
Installing collected packages: stomp.py
Successfully installed stomp.py-4.1.21
Note: you may need to restart the kernel to use updated packages.
['/eos/user/g/gartung/.local/lib/python3.8/site-packages', '/cvmfs/sft.cern.ch/lcg/views/LCG_100/x86_64-centos7-gcc8-opt/python', '/cvmfs/sft.cern.ch/lcg/views/LCG_100/x86_64-centos7-gcc8-opt/lib', '', '/cvmfs/sft.cern.ch/lcg/views/LCG_100/x86_64-centos7-gcc8-opt/lib/python3.8/site-packages', '/cvm

In [2]:
import os
import json
import time
from itertools import islice
from CMSMonitoring.StompAMQ import StompAMQ

In [3]:
def splitall(path):
    allparts = []
    while 1:
        parts = os.path.split(path)
        if parts[0] == path:  # sentinel for absolute paths
            allparts.insert(0, parts[0])
            break
        elif parts[1] == path: # sentinel for relative paths
            allparts.insert(0, parts[1])
            break
        else:
            path = parts[0]
            allparts.insert(0, parts[1])
    return allparts


In [4]:
def IB2ts(release):
  from datetime import datetime
  rel_msec  = int(datetime.strptime(release.split("_")[-1], '%Y-%m-%d-%H%M',).strftime('%s'))*1000
  return rel_msec

def IB2relq(release):
    if "_X_" in release:
        release_queue = release.split("_X_",1)[0]+"_X"
    else:
        release_queue = "_".join(release.split("_")[:3])+"_X"
    return release_queue

In [5]:
username = ""
password = ""
producer = "cms-cmpwg"
topic = "/topic/cms.cmpwg"
host = "cms-mb.cern.ch"
port = 61323
cert = "/eos/user/g/gartung/.globus/usercert.pem"
ckey = "/eos/user/g/gartung/.globus/userkey.pem"
stomp_amq = StompAMQ(username, password, producer, topic, key=ckey, cert=cert, validation_schema=None, host_and_ports=[(host, port)])




In [6]:
stomp_amq.connect()

<stomp.connect.StompConnection11 at 0x7f0cddcd5b50>

In [7]:
with open('packages.json', 'r') as file:
    packages=json.load(file)

In [8]:
from typing.re import Pattern
def matchPattern(pattern,text):
    if pattern is None:
        return True
    if isinstance(pattern, Pattern): 
        return re.match(pattern, text)
    return (pattern == text)

In [9]:
import re
compiled=[]
for key in packages.keys():
    if '|' in key:
        [t,l]=key.split('|')
        t=re.compile(t)
        l=re.compile(l)
    else:
        t=None
        l=re.compile(key)
    compiled.append([t,l,packages[key]])
    
def findGroup(data):
    unassigned=[]
    assigned=False
    group='Unassigned|Unassigned'
    for [t,l,g] in compiled:
        if re.match(t,data['module_type']) and re.match(l,data['module_label']):
            assigned=True
            group=g
            break
    return group.split('|')

In [None]:
import glob
import hashlib
documents=[]
for f in glob.glob('/eos/project/c/cmsweb/www/cmscmp/circles/web/data/CMSSW_*_X_*/*amd64*/*/*.json'):
        with open(f,'r') as file:
                dirs=splitall(f)
                data=json.load(file)
                total=data.get("total")
                payload={}
                payload["module_label"]=str(total.get("label","no label"))
                payload["module_type"]=str(total.get("type","no type"))
                [subsystem,package]=findGroup(payload)
                payload["module_package"]=str(package)
                payload["module_subsystem"]=str(subsystem)
                payload[str(payload["module_type"])]=str(payload["module_label"])
                payload["events"]=int(total.get("events", 0))
                payload["time_thread"]=float(total.get("time_thread",0.))
                payload["time_real"]=float(total.get("time_real",0.))
                payload["mem_alloc"]=int(total.get("mem_alloc",0))
                payload["mem_free"]=int(total.get("mem_free",0))
                release=str(dirs[-4])
                arch=str(dirs[-3])
                workflow=str(dirs[-2])
                release_queue=str(IB2relq(release))
                release_ts=int(IB2ts(release))
                str2hash=release+arch+workflow+str(release_ts)+payload.get("module_label")
                rhash=hashlib.sha1(str2hash.encode()).hexdigest()
                payload["release"]=release
                payload["release_queue"]=release_queue
                payload["release_ts"]=release_ts 
                payload["workflow"]=workflow
                payload["arch"]=arch
                payload["hash"]=rhash
                notification, _, _ = stomp_amq.make_notification(payload,"profiling_document",dataSubfield=None)
                documents.append(notification)
                modules=data.get("modules")
                for module in modules:
                    mpayload={}
                    mpayload["module_type"]=str(module.get("type","no type"))
                    mpayload["module_label"]=str(module.get("label","no label"))
                    [subsystem,package]=findGroup(mpayload)
                    mpayload["module_package"]=str(package)
                    mpayload["module_subsystem"]=str(subsystem)
                    mpayload[str(payload["module_type"])]=str(payload["module_label"])
                    mpayload["events"]=int(module.get("events", 0))
                    mpayload["time_thread"]=float(module.get("time_thread",0.))
                    mpayload["time_real"]=float(module.get("time_real",0.))
                    mpayload["mem_alloc"]=int(module.get("mem_alloc",0))
                    mpayload["mem_free"]=int(module.get("mem_free",0))
                    mpayload["release"]=release
                    mpayload["release_queue"]=release_queue
                    mpayload["release_ts"]=release_ts
                    mpayload["workflow"]=workflow
                    mpayload["arch"]=arch
                    str2hash=release+arch+workflow+str(release_ts)+mpayload.get("module_label")
                    mhash=hashlib.sha1(str2hash.encode()).hexdigest()
                    mpayload["hash"]=mhash
                    notification, _, _ = stomp_amq.make_notification(mpayload,"profiling_document",dataSubfield=None)
                    documents.append(notification)

In [None]:
print(documents[0])

In [None]:
print(documents[1])

In [None]:
print(documents[2])

In [None]:
print(documents[10])

In [None]:
print(documents[100])

In [None]:
print(documents[-1000])

In [None]:
print(documents[-100])

In [None]:
print(documents[-10])

In [None]:
print(documents[-1])

In [None]:
results=stomp_amq.send(documents)
print("results", results)