In [9]:
%%time
import sys
import gzip
import glob
import pandas as pd
import numpy as np
from joblib import load


COLUMN_NAMES = [ 
    "job_id", "queue", "job_type", "hn",
    "just_started", "just_finished", "js", "nc", 
    "hsj", "hsm", "cpt", "rt", 
    "owner", "ram", "img", 
    "ts", "sn", "disk" 
]
CATEGORICAL_VARS = [*COLUMN_NAMES[1:3], *COLUMN_NAMES[4:6], "owner"]
STRING_VARS = ["job_id", "hn", "sn"]
AGG_COLUMNS = ['ram', 'img', 'disk']
LHC_QUEUES = ['alice', 'atlas', 'cms', 'lhcb']

 
class Preprocessor:
    @staticmethod
    def preprocess(inputf):
        data = pd.concat(map(lambda f: pd.read_csv(f, 
                                                   sep=" ", 
                                                   names=COLUMN_NAMES, 
                                                   dtype={c: "category" for c in CATEGORICAL_VARS}, 
                                                   compression="gzip",
                                                   engine="pyarrow"),
                             glob.glob(args[0] + "/*.gz")))


        data[STRING_VARS] = data[STRING_VARS].astype("string")
        data.insert(0, 'job', data['job_id'] + "_" + data['sn'])
        data['job_work_type'] = data['queue'].str.contains(str.join("|", LHC_QUEUES)).map({True: "lhc", False: "non-lhc"}).astype('category')

        agg_data = data.groupby("job").agg({ 
            'rt': list, 
            'ram': list, 
            'img': list,
            'disk': list, 
            'js': max,
            'job_type': 'first',
            'job_work_type': 'first'
        })

        filtered_agg_data = agg_data[
            (agg_data['rt'].apply(lambda x: x[0] <= 180)) & 
            (agg_data['rt'].apply(lambda x: len(x) >= 20)) &
            (agg_data['js'] == 2)
        ].drop(['rt', 'js'], axis=1).reset_index(drop=False)

        for COL in AGG_COLUMNS:
            filtered_agg_data[COL] = filtered_agg_data[COL].apply(lambda x: [np.mean(x[i:j]) for i, j in zip([0, 5, 10, 15], [5, 10, 15, 20])])

        return pd.concat([
            filtered_agg_data[filtered_agg_data.columns.difference(AGG_COLUMNS)],
            pd.concat([pd.DataFrame(filtered_agg_data[COL].tolist()).add_prefix(f"{COL}_") for COL in AGG_COLUMNS], axis=1)
        ], axis=1)
    

class Classifier:
    def __init__(self, inputf):
        self.model = load(inputf)

    def predict(self, X):
        return self.model.predict(X)
        

if __name__ == '__main__':
    args = sys.argv[1:]
    if len(args) < 2:
        print("Usage: python classifier.py <log folder> <model dump>")
        exit(1)
        
    args[0] = "./test"
    args[1] = "./model.joblib"
    
    data = Preprocessor.preprocess(args[0])    
    # clf = Classifier(args[1])
    # np.savetxt('results.txt', np.c_[data['job'], clf.predict(data)], fmt=('%s', '%d'))

CPU times: user 33.6 s, sys: 1.8 s, total: 35.4 s
Wall time: 31.9 s


In [15]:
from dask.distributed import Client
client = Client(n_workers=2, threads_per_worker=2, memory_limit='8GB')
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 35959 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:35959/status,

0,1
Dashboard: http://127.0.0.1:35959/status,Workers: 2
Total threads: 4,Total memory: 14.90 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34227,Workers: 2
Dashboard: http://127.0.0.1:35959/status,Total threads: 4
Started: Just now,Total memory: 14.90 GiB

0,1
Comm: tcp://127.0.0.1:39297,Total threads: 2
Dashboard: http://127.0.0.1:35605/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:40945,
Local directory: /tmp/dask-worker-space/worker-ccf8da5k,Local directory: /tmp/dask-worker-space/worker-ccf8da5k
GPU: Tesla T4,GPU memory: 15.00 GiB

0,1
Comm: tcp://127.0.0.1:40721,Total threads: 2
Dashboard: http://127.0.0.1:34487/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:40057,
Local directory: /tmp/dask-worker-space/worker-j85zsrfl,Local directory: /tmp/dask-worker-space/worker-j85zsrfl
GPU: Tesla T4,GPU memory: 15.00 GiB


In [14]:
%%time
import sys
import dask.dataframe as dd
import pandas as pd
import numpy as np

args = sys.argv[1:]
args[0] = "./test"

COLUMN_NAMES = [ 
    "job_id", "queue", "job_type", "hn",
    "just_started", "just_finished", "js", "nc", 
    "hsj", "hsm", "cpt", "rt", 
    "owner", "ram", "img", 
    "ts", "sn", "disk" 
]
CATEGORICAL_VARS = [*COLUMN_NAMES[1:3], *COLUMN_NAMES[4:6]]
STRING_VARS = ["job_id", "hn", "sn"]
AGG_COLUMNS = ['ram', 'img', 'disk']
LHC_QUEUES = ['alice', 'atlas', 'cms', 'lhcb']

# class Preprocessor:
#     @staticmethod
#     def preprocess(inputf):
dask_df = dd.read_csv(f"{args[0]}/*.gz", 
                   sep=" ", 
                   names=COLUMN_NAMES, 
                   dtype={c: "category" for c in CATEGORICAL_VARS}, 
                   compression="gzip",
                   blocksize=None
                  )

dask_df[STRING_VARS] = dask_df[STRING_VARS].astype("string")
dask_df['job'] = dask_df['job_id'] + "_" + dask_df['sn']
dask_df['job_work_type']= dask_df['queue'].str.contains(str.join("|", LHC_QUEUES)).map({True: "lhc", False: "non-lhc"}).astype('category')

pandas_df = dask_df.compute()

agg_df = pandas_df.groupby("job").agg({ 
            'rt': list, 
            'ram': list, 
            'img': list,
            'disk': list, 
            'js': max,
            'job_type': 'first',
            'job_work_type': 'first'
        }, split_out=4)

filtered_agg_df = agg_df[
    (agg_df['rt'].apply(lambda x: x[0] <= 180)) & 
    (agg_df['rt'].apply(lambda x: len(x) >= 20)) &
    (agg_df['js'] == 2)
].drop(['rt', 'js'], axis=1).reset_index(drop=False)

for COL in AGG_COLUMNS:
    filtered_agg_df[COL] = filtered_agg_df[COL].apply(lambda x: [np.mean(x[i:j]) for i, j in zip([0, 5, 10, 15], [5, 10, 15, 20])])

pd.concat([
    filtered_agg_df[filtered_agg_df.columns.difference(AGG_COLUMNS)],
    pd.concat([pd.DataFrame(filtered_agg_df[COL].tolist()).add_prefix(f"{COL}_") for COL in AGG_COLUMNS], axis=1)
], axis=1)

CPU times: user 24.4 s, sys: 2.11 s, total: 26.5 s
Wall time: 36 s


Unnamed: 0,job,job_type,job_work_type,ram_0,ram_1,ram_2,ram_3,img_0,img_1,img_2,img_3,disk_0,disk_1,disk_2,disk_3
0,3809554.0_ce07-htc,grid,lhc,0.702459,1.184681,1.188026,1.201397,1.953869,3.270308,3.273641,3.286972,0.120448,0.212754,0.223301,0.237722
1,3809555.0_ce07-htc,grid,lhc,0.730270,1.219301,1.219508,1.219700,1.983569,3.307896,3.307896,3.307897,0.219560,0.385122,0.406156,0.428848
2,3809556.0_ce07-htc,grid,lhc,0.715608,1.201982,1.204282,1.204952,1.967448,3.289028,3.293028,3.293029,0.159653,0.277913,0.290914,0.306437
3,3809557.0_ce07-htc,grid,lhc,0.703479,1.172676,1.172697,1.173039,1.955298,3.258828,3.258828,3.259125,0.136811,0.240585,0.254299,0.269178
4,3809579.0_ce07-htc,grid,lhc,0.716405,1.194057,1.194060,1.194083,1.968221,3.280364,3.280364,3.280366,0.118676,0.207800,0.219872,0.233689
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2069,9160317.0_ce02-htc,grid,lhc,0.000000,9.992448,10.669488,10.751572,0.000027,24.657512,25.608090,25.739288,0.000027,28.422060,29.319966,29.987654
2070,9160318.0_ce02-htc,grid,lhc,0.000000,7.692836,10.320858,10.613844,0.000027,35.678412,35.678412,35.678412,0.000027,28.346516,28.922177,29.378898
2071,9160319.0_ce02-htc,grid,lhc,0.000000,9.204488,10.326972,10.715896,0.000027,24.269204,25.403715,25.614608,0.000027,28.339826,29.017761,29.559704
2072,9160320.0_ce02-htc,grid,lhc,0.000000,10.077300,11.149810,11.440580,0.000027,24.600748,27.179490,27.257240,0.000027,28.363018,28.962276,29.486329
