In [1]:
spark

In [6]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import time
# from utils import (
#     _to_dict,
#     _donut,
#     _pie,
#     _line_graph,
#     _other_fields,
#     _exitcode_info,
#     _better_label
# )
from datetime import datetime, date, timedelta
from pyspark.sql.functions import (
    col,
    lit,
    when,
    sum as _sum,
    count as _count,
    first,
    date_format,
    from_unixtime
)
import numpy as np
import pandas as pd
from pyspark.sql.types import (
    StructType,
    LongType,
    StringType,
    StructField,
    DoubleType,
    IntegerType,
)
# spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")


### Prepare condor file name/configuration

In [3]:
def _get_schema():
    return StructType(
        [
            StructField(
                "data",
                StructType(
                    [
                        StructField("RecordTime", LongType(), nullable=False),
                        StructField("CMSPrimaryDataTier", StringType(), nullable=True),
                        StructField("Status", StringType(), nullable=True),
                        StructField("WallClockHr", DoubleType(), nullable=True),
                        StructField("CoreHr", DoubleType(), nullable=True),
                        StructField("CpuTimeHr", DoubleType(), nullable=True),
                        StructField("Type", StringType(), nullable=True),
                        StructField("CRAB_DataBlock", StringType(), nullable=True),
                        StructField("GlobalJobId", StringType(), nullable=False),
                        StructField("ExitCode", LongType(), nullable=True),
                        StructField("CMS_SubmissionTool", StringType(), nullable=True),
                        StructField("CRAB_Workflow", StringType(), nullable=True)
                    ]
                ),
            ),
        ]
    )

In [1]:
_DEFAULT_HDFS_FOLDER = "/project/monitoring/archive/condor/raw/metric"

In [3]:
# Check available files 
!hdfs dfs -ls /project/monitoring/archive/condor/raw/metric/2023/07/08

In [6]:
def get_candidate_files(start_date, end_date, spark, base=_DEFAULT_HDFS_FOLDER):
    st_date = start_date - timedelta(days=0)
    ed_date = end_date + timedelta(days=0)
    days = (ed_date - st_date).days
    pre_candidate_files = [
        "{base}/{day}{{,.tmp}}".format(
            base=base, day=(st_date + timedelta(days=i)).strftime("%Y/%m/%d")
        )
        for i in range(0, days)
    ]
    sc = spark.sparkContext
    
    candidate_files = [
        f"{base}/{(st_date + timedelta(days=i)).strftime('%Y/%m/%d')}"
        for i in range(0, days)
    ]
    FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    URI = sc._gateway.jvm.java.net.URI
    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    fs = FileSystem.get(URI("hdfs:///"), sc._jsc.hadoopConfiguration())
    # FIXME
    candidate_files = [url for url in candidate_files if fs.globStatus(Path(url))]
    print("No. of Consisted files:", len(candidate_files))
    return candidate_files

#     all_candidate_files = []
#     candidate_files = [
#         f"{base}/{(st_date + timedelta(days=i)).strftime('%Y/%m/%d')}"
#         for i in range(0, days)
#     ]
    
#     URI           = sc._gateway.jvm.java.net.URI
#     Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
#     FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
#     Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
#     fs = FileSystem.get(URI("hdfs:///"), Configuration())

#     for fileNames in candidate_files:
#         status = fs.listStatus(Path(fileNames))
#         candidate_files_day_i = [
#             str(fileStatus.getPath()).replace('hdfs://analytix', '')
#             for fileStatus in status
#         ]
#         all_candidate_files.extend(candidate_files_day_i)
#     print("Files Directory:", candidate_files, "\nNo. of Consisted files:", len(all_candidate_files))
#     return all_candidate_files

def group_files(files, n=16):
    # Yield successive n-sized
    # chunks from files
    all_group = []
    for i in range(0, len(files), n):
        all_group.append(files[i:i+n])
    print("There are", len(all_group), "chunks of files")
    return all_group

## load dataset

In [8]:
schema = _get_schema()
start_date = datetime(2023, 6, 29)
end_date = datetime(2023, 7, 5)

In [26]:
candidate_files = get_candidate_files(start_date, end_date, spark, base=_DEFAULT_HDFS_FOLDER)
candidate_files

No. of Consisted files: 6


['/project/monitoring/archive/condor/raw/metric/2023/06/29',
 '/project/monitoring/archive/condor/raw/metric/2023/06/30',
 '/project/monitoring/archive/condor/raw/metric/2023/07/01',
 '/project/monitoring/archive/condor/raw/metric/2023/07/02',
 '/project/monitoring/archive/condor/raw/metric/2023/07/03',
 '/project/monitoring/archive/condor/raw/metric/2023/07/04']

### Prepare CRAB data file name

In [9]:
TODAY = str(end_date)[:10]
wa_date = TODAY
HDFS_CRAB_part = f'/project/awg/cms/crab/tasks/{wa_date}/'

'2023-06-29'

### Get raw data from condor raw

In [28]:
spark.conf.set("spark.sql.session.timeZone", "UTC")

crab_df = spark.read.format('avro').load(HDFS_CRAB_part)
crab_df = crab_df.select('TM_TASKNAME', 'TM_IGNORE_LOCALITY')

In [29]:
def spark_exec(candidate_files):
    condor_df = (
            spark.read.option("basePath", _DEFAULT_HDFS_FOLDER)
            .json(
                candidate_files,
                schema=schema,
            ).select("data.*")
            .filter(
                f"""Status IN ('Completed')
                AND Type IN ('analysis')
                AND RecordTime >= {start_date.timestamp() * 1000}
                AND RecordTime < {end_date.timestamp() * 1000}
                """
            )
            .drop_duplicates(["GlobalJobId"]).cache()
        ) 
    condor_df.write.mode('overwrite').parquet("/cms/users/eatthaph/condor_vir_data" ,compression='zstd')
    condor_df = spark.read.format('parquet').load('/cms/users/eatthaph/condor_vir_data')
    result_df = condor_df.join(crab_df, crab_df["TM_TASKNAME"] == condor_df["CRAB_Workflow"])\
        .select('RecordTime', 'CMSPrimaryDataTier', 'WallClockHr', 'CoreHr', 'ExitCode', "CRAB_DataBlock", "TM_IGNORE_LOCALITY", "GlobalJobId")
    sub_docs = result_df.toPandas()
    return sub_docs

def loop_excute(candidate_files, initial_n=len(candidate_files)):
    r = 0
    n = initial_n
    df_list = []
    file_chunk = group_files(candidate_files, n)
    while len(file_chunk)!=0 and r<10:
        print("=================================\n round :", r+1, "\n=================================")
        df_err_list = []
        for i, chunk in enumerate(file_chunk):
            print("=================================\n", i+1, "out of", len(file_chunk), "\n=================================")
            try:
                df_list.append(spark_exec(chunk))
            except Exception as ex:
                print("=====", ex)
                df_err_list.extend(chunk)
#         if n != 1:
#             n = n//2
        file_chunk = group_files(df_err_list, n)
        r += 1
        print("")
    print("Fail excuted files :", df_err_list)
    return df_list

In [None]:
# useful_df = loop_excute(candidate_files)
df_list = spark_exec(candidate_files)

In [13]:
len(df_list)

3493715

In [14]:
docs = df_list.to_dict('records')

In [16]:
for i in range(len(docs)):
    if docs[i]['CRAB_DataBlock'] == 'MCFakeBlock':
        docs[i]['CRAB_Type'] = 'PrivateMC'
    else:
        docs[i]['CRAB_Type'] = 'Analysis'

In [17]:
docs[:5]

[{'RecordTime': 1688643025000,
  'CMSPrimaryDataTier': 'USER',
  'WallClockHr': 0.82,
  'CoreHr': 0.82,
  'ExitCode': 8028,
  'CRAB_DataBlock': '/HNL_majorana_ntau1_ctau1p0e01_massHNL2p0_Vall2p871e-02/mkomm-miniaod17v2_221221-b63beb1ae05c0e254c43785544367ee5/USER#8a0215fd-9dc1-4d15-bf81-dd69133cae03',
  'TM_IGNORE_LOCALITY': 'F',
  'GlobalJobId': 'crab3@vocms0195.cern.ch#90888137.0#1688639963',
  'CRAB_Type': 'Analysis'},
 {'RecordTime': 1688639711000,
  'CMSPrimaryDataTier': 'USER',
  'WallClockHr': 0.5169444444444444,
  'CoreHr': 0.5169444444444444,
  'ExitCode': 8028,
  'CRAB_DataBlock': '/HNL_majorana_ntau1_ctau1p0e01_massHNL2p0_Vall2p871e-02/mkomm-miniaod17v2_221221-b63beb1ae05c0e254c43785544367ee5/USER#8a0215fd-9dc1-4d15-bf81-dd69133cae03',
  'TM_IGNORE_LOCALITY': 'F',
  'GlobalJobId': 'crab3@vocms0195.cern.ch#90887492.0#1688637658',
  'CRAB_Type': 'Analysis'},
 {'RecordTime': 1688644985000,
  'CMSPrimaryDataTier': 'USER',
  'WallClockHr': 1.9816666666666667,
  'CoreHr': 1.981666

In [18]:
import osearch

In [19]:
def get_index_schema():
    return {
        "settings": {"index": {"number_of_shards": "1", "number_of_replicas": "1"}},
        "mappings": {
            "properties": {
                "RecordTime": {"format": "epoch_millis", "type": "date"},
                "CMSPrimaryDataTier": {"ignore_above": 2048, "type": "keyword"},
                "GlobalJobId": {"ignore_above": 2048, "type": "keyword"},
                "WallClockHr": {"type": "long"},
                "CoreHr": {"type": "long"},
                "ExitCode": {"ignore_above": 2048, "type": "keyword"},
                "TM_IGNORE_LOCALITY": {"ignore_above": 2048, "type": "keyword"},
                "CRAB_Type": {"ignore_above": 2048, "type": "keyword"},
            }
        }
    }

In [20]:
_index_template = 'crab-condor-ekong'
client = osearch.get_es_client("es-cms1.cern.ch/es", 'secret_opensearch.txt', get_index_schema())
# index_mod="": 'test-foo', index_mod="Y": 'test-foo-YYYY', index_mod="M": 'test-foo-YYYY-MM', index_mod="D": 'test-foo-YYYY-MM-DD',
idx = client.get_or_create_index(timestamp=time.time(), index_template=_index_template, index_mod="M")
nono = client.send(idx, docs, metadata=None, batch_size=10000, drop_nulls=False)

































In [21]:
nono

0