# CRAB Spark condor job

This join info between the condor job metrics and crab taskdb, to answer these questions:
- How many jobs use ignorelocality?
- What is wall clock time spent by each CMS data tier and each job type?
- What is the success rate of the Analysis job type?


In [None]:
from datetime import datetime, timedelta, timezone
import os
import time
import pandas as pd

from pyspark import SparkContext, StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    current_user,
    col, collect_list, concat_ws, greatest, lit, lower, when,
    avg as _avg,
    count as _count,
    hex as _hex,
    max as _max,
    min as _min,
    round as _round,
    sum as _sum,
)
from pyspark.sql.types import (
    StructType,
    LongType,
    StringType,
    StructField,
    DoubleType,
    IntegerType,
)

In [None]:
# try to import libs from current directory, fallback to $PWD/../workdir if not found
try:
    from crabspark_utils import get_candidate_files, send_os, send_os_parallel
except ModuleNotFoundError:
    import sys
    sys.path.insert(0, f'{os.getcwd()}/../workdir')
    from crabspark_utils import get_candidate_files, send_os, send_os_parallel


In [None]:
spark = SparkSession\
        .builder\
        .appName('condor-job')\
        .getOrCreate()
spark

In [None]:
# clear any cache left, for working with notebook
# it safe to run everytime cronjob start
spark.catalog.clearCache()

In [None]:
# secret path, also check if file exists
secretpath = os.environ.get('OPENSEARCH_SECRET_PATH', f'{os.getcwd()}/../workdir/secret_opensearch.txt')
if not os.path.isfile(secretpath): 
    raise Exception(f'OS secrets file {secretpath} does not exists')
# if PROD, index prefix will be `crab-*`, otherwise `crab-test-*`
PROD = os.environ.get('PROD', 'false').lower() in ('true', '1', 't')
# FROM_DATE, in strptime("%Y-%m-%d")
START = os.environ.get('START_DATE', None) 
END = os.environ.get('END_DATE', None)

In [None]:
# For run playbook manually, set start/end date here
START_DATE = "2024-10-01"
END_DATE = "2024-10-02"
# if cronjob, replace constant with value from env
if START and END:
    START_DATE = START
    END_DATE = END

In [None]:
# index name
index_name = 'condor-taskdb'
# use prod index pattern if this execution is for production
if PROD:
    index_name = f'crab-prod-{index_name}'
else:
    index_name = f'crab-test-{index_name}'

In [None]:
# datetime object
start_datetime = datetime.strptime(START_DATE, "%Y-%m-%d").replace(tzinfo=timezone.utc)
end_datetime = datetime.strptime(END_DATE, "%Y-%m-%d").replace(tzinfo=timezone.utc)
# sanity check
if end_datetime < start_datetime: 
    raise Exception(f"end date ({END_DATE}) is less than start date ({START_DATE})")
start_epochmilis = int(start_datetime.timestamp()) * 1000
end_epochmilis = int(end_datetime.timestamp()) * 1000
yesterday_epoch = int((end_datetime-timedelta(days=1)).timestamp())

In [None]:
# debug
print(START_DATE, 
      END_DATE, 
      index_name,
      sep='\n')

In [None]:
# read crab data
HDFS_CRAB_part = f'/project/awg/cms/crab/tasks/{END_DATE}/' 
crab_df = spark.read.format('avro').load(HDFS_CRAB_part)
# we did not filter the task here because most jobs was created from older tasks.
# if there are too many crab tasks, it might be safe to filter out the tasks older than 30+7 days ago.
crab_df = crab_df.select('TM_TASKNAME', 'TM_IGNORE_LOCALITY').cache()
crab_df.createOrReplaceTempView("tasks")

In [None]:
# read condor data
# reading file 2 days before start date and 1 days after end date inclusive
# sometime flume (condor log aggregator) process the metrics is delay for 2 days, sometime it has timestamp from the future.
# so we do this to make sure we get all metrics from the date we want. (all of these suggested by CMSMONIT)
# Note that we read all files, compact or not, even it has the same content, we will dedup it in the next step.
_DEFAULT_HDFS_FOLDER = "/project/monitoring/archive/condor/raw/metric"
candidate_files = get_candidate_files(start_datetime, end_datetime, spark=spark, base=_DEFAULT_HDFS_FOLDER, day_delta=2)

# this is map json doc to spark schema
read_schema = StructType(
        [
            StructField(
                "data",
                StructType(
                    [
                        StructField("RecordTime", LongType(), nullable=False),
                        StructField("CMSPrimaryDataTier", StringType(), nullable=True),
                        StructField("Status", StringType(), nullable=True),
                        StructField("WallClockHr", DoubleType(), nullable=True),
                        StructField("CoreHr", DoubleType(), nullable=True),
                        StructField("CpuTimeHr", DoubleType(), nullable=True),
                        StructField("Type", StringType(), nullable=True),
                        StructField("CRAB_DataBlock", StringType(), nullable=True),
                        StructField("GlobalJobId", StringType(), nullable=False),
                        StructField("ExitCode", LongType(), nullable=True),
                        StructField("CRAB_Workflow", StringType(), nullable=True),
                        StructField("CommittedCoreHr", StringType(), nullable=True),
                        StructField("CommittedWallClockHr", StringType(), nullable=True),
                    ]
                ),
            ),
        ]
   )
print("==============================================="
      , "Condor Matrix and CRAB Table"
      , "==============================================="
      , "File Directory:", _DEFAULT_HDFS_FOLDER, candidate_files
      , "Work Directory:", os.getcwd()
      , "==============================================="
      , "===============================================", sep='\n')

In [None]:
crab_username = spark.sql("""SELECT current_user() AS user""").toPandas().to_dict('records')[0]['user']

In [None]:
# extract only "interested data" from condor metrics and save into temporary area
# need to do this because we do not have enough memory to compute all data at once.
# (1 days is ok, 1 month got spark OOM)
# "interested data" is
# - selected column (see `read_schema` above)
# - date range from START_DATE inclusive to END_DATE exclusive
# - only status Complete and type analysis
# job will got dedup by `.drop_duplicates(["GlobalJobId"])` in later step
( 
    spark.read.option("basePath", _DEFAULT_HDFS_FOLDER)
         .json(
            candidate_files,
            schema=read_schema,
         )
         .select("data.*")
         .filter(
            f"""Status IN ('Completed')
            AND Type IN ('analysis')
            AND RecordTime >= {start_epochmilis}
            AND RecordTime < {end_epochmilis}
            """
         )
        .drop_duplicates(["GlobalJobId"])
        .write.mode('overwrite').parquet(f"/cms/users/{crab_username}/condor_vir_data" ,compression='zstd') # overriding the same path to cleanup old data. However, we could not run it parallel
)
spark.catalog.clearCache()

In [None]:
condor_df = spark.read.format('parquet').load(f"/cms/users/{crab_username}/condor_vir_data").cache()
condor_df.createOrReplaceTempView("condor")

In [None]:
# query
query = f"""\
WITH filter_tb AS (
SELECT *
FROM condor
WHERE 1=1
AND RecordTime >= {start_epochmilis}
AND RecordTime < {end_epochmilis}
),
join_tb AS (
SELECT RecordTime, CMSPrimaryDataTier, WallClockHr, CoreHr, CpuTimeHr, ExitCode, CRAB_DataBlock, TM_IGNORE_LOCALITY, GlobalJobId, CommittedCoreHr, CommittedWallClockHr
FROM filter_tb
INNER JOIN tasks 
ON filter_tb.CRAB_Workflow = tasks.TM_TASKNAME 
), 
finalize_tb AS (
SELECT RecordTime, CMSPrimaryDataTier, WallClockHr, CoreHr, CpuTimeHr, ExitCode, CRAB_DataBlock, TM_IGNORE_LOCALITY, GlobalJobId, CommittedCoreHr, CommittedWallClockHr, 
       CASE 
           WHEN CRAB_DataBlock = 'MCFakeBlock' THEN 'PrivateMC'  
           ELSE 'Analysis'
       END AS CRAB_Type,        --- to differentiate between analysis and mc
       'condor' AS type,        --- use to match specific data when use wildcard index pattern on grafana side
       RecordTime AS timestamp  --- use `RecordTime` as timestamp
FROM join_tb
)
SELECT * 
FROM finalize_tb 
"""
tmpdf = spark.sql(query)
tmpdf.show(10)



In [None]:
tmpdf.count()

In [None]:
schema = {
            "settings": {"index": {"number_of_shards": "1", "number_of_replicas": "1"}},
            "mappings": {
                "properties": {
                    "RecordTime": {"format": "epoch_millis", "type": "date"},
                    "CMSPrimaryDataTier": {"ignore_above": 2048, "type": "keyword"},
                    "GlobalJobId": {"ignore_above": 2048, "type": "keyword"},
                    "WallClockHr": {"type": "long"},
                    "CoreHr": {"type": "long"},
                    "CpuTimeHr": {"type": "long"},
                    "ExitCode": {"ignore_above": 2048, "type": "keyword"},
                    "TM_IGNORE_LOCALITY": {"ignore_above": 2048, "type": "keyword"},
                    "CRAB_Type": {"ignore_above": 2048, "type": "keyword"},
                    "CRAB_DataBlock": {"ignore_above": 2048, "type": "keyword"},
                    "CommittedCoreHr": {"type": "long"}, 
                    "CommittedWallClockHr": {"type": "long"},
                    "type": {"ignore_above": 2048, "type": "keyword"},
                    "timestamp": {"format": "epoch_millis", "type": "date"},
                }
            }
        }

In [None]:
# this is simple workaround osearch bug when work in notebook because
#   - it load the secret once and use forever
#   - get_or_create_index() create index+schema only the first time it execute
# it is safe to run again even in cronjobs 
import importlib
import osearch
importlib.reload(osearch)

In [None]:
# repartition rdd to make each partition small enough to load back to python kernel, serialize to dict, and send to os.
# for 12M rows, number of from 27 days of data is 51, around 250k per partition.
# try reducing partition to 20 once but make python kernel out-of-memory. 
# so, try to keep it around 200k per partition instead.
partition_num = tmpdf.count() // 200000
tmpdf = tmpdf.repartition(partition_num, 'RecordTime')
total_part = tmpdf.rdd.getNumPartitions()

print(f"Number of partition: {total_part}")

In [None]:
# send to os, serialize df one rdd partition at a time
part = 0
for docs in tmpdf.rdd.mapPartitions(lambda p: [[x.asDict() for x in p]]).toLocalIterator():
    part += 1
    print(f"Partition: {part}/{total_part}, Length of partition: {len(docs)}")
    send_os_parallel(docs, index_name, schema, secretpath, yesterday_epoch, 20000) # batch_size is just arbitrary number

In [None]:
print("Done!")