In [104]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Ceyhun Uzunoglu <ceyhunuzngl AT gmail [DOT] com>
"""Get last access timeS of datasets by joining Rucio's REPLICAS, DIDS and CONTENTS tables"""
import pickle
import sys
from datetime import datetime

import pandas as pd
from dateutil.relativedelta import relativedelta
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    avg as _avg,
    col,
    count as _count,
    countDistinct,
    hex as _hex,
    lit,
    lower,
    max as _max,
    min as _min,
    round as _round,
    sum as _sum,
    when,
)
from pyspark.sql.types import (
    LongType,
)

pd.options.display.float_format = "{:,.8f}".format
pd.set_option("display.max_colwidth", None)

HDFS_RUCIO_CONTENTS = "/project/awg/cms/rucio_contents/*/part*.avro"
HDFS_RUCIO_DIDS = "/project/awg/cms/rucio_dids/*/part*.avro"
HDFS_RUCIO_REPLICAS = f"/project/awg/cms/rucio/{datetime.today().strftime('%Y-%m-%d')}/replicas/part*.avro"


def get_ts_thresholds():
    """Returns unix timestamps of 3, 6 and 12 months ago"""
    timestamps = {}
    for num_month in [3, 6, 12]:
        dt = datetime.today() + relativedelta(months=-num_month)
        timestamps[num_month] = int(datetime(dt.year, dt.month, dt.day).timestamp()) * 1000
    return timestamps


def get_disk_rse_ids():
    """Get rse:rse_id map from pickle file

    TODO: Get rse:rse_id map via Rucio python library. I could not run Rucio python library unfortunately.


    Used code in LxPlus (author: David Lange):
    ```py
#!/usr/bin/env python
from subprocess import Popen,PIPE
import os,sys,pickle
def runCommand(comm):
    p = Popen(comm,stdout=PIPE,stderr=PIPE,shell=True)
    pipe=p.stdout.read()
    errpipe=p.stderr.read()
    tupleP=os.waitpid(p.pid,0)
    eC=tupleP[1]
    return eC,pipe.decode(encoding='UTF-8'),errpipe.decode(encoding='UTF-8')
comm="rucio list-rses"
ec,cOut,cErr = runCommand(comm)
rses={}
for l in cOut.split():
    rse=str(l.strip())
    print(rse)
    comm="rucio-admin rse info "+rse
    ec2,cOut2,cErr2 = runCommand(comm)
    id=None
    for l2 in cOut2.split('\n'):
        if "id: " in l2:
            id=l2.split()[1]
            break
    print(id)
    rses[rse]=id
with open("rses.pickle", "wb+") as f:
  pickle.dump(rses, f)
    ```
    """
    with open("rses.pickle", "rb+") as f:
        rses = pickle.load(f)
    return list(
        dict(
            [(k, v) for k, v in rses.items() if not any(tmp in k for tmp in ["Tape", "Test", "Temp"])]
        ).values()
    )


def get_spark_session(yarn=True, verbose=False):
    """
    Get or create the spark context and session.
    """
    sc = SparkContext(appName="cms-monitoring-rucio-last_access-ts")
    return SparkSession.builder.config(conf=sc._conf).getOrCreate()


def prepare_spark_dataframes(spark):
    """
        Prepare Spark dataframes using DIDS, REPLICAS AND CONTENTS Rucio tables in HDFS

        --- DBS terminology is used ---
        - file:    file in Rucio
        - block:   dataset in Rucio
        - dataset: container in Rucio
    """
    # STEP-1: Get file to block mappings from CONTENTS table as spark df
    df_contents_f_to_b = spark.read.format("com.databricks.spark.avro").load(HDFS_RUCIO_CONTENTS) \
        .withColumnRenamed("NAME", "block") \
        .withColumnRenamed("CHILD_NAME", "file") \
        .select(["block", "file"]) \
        .cache()

    # STEP-2: Get block to dataset mappings from CONTENTS table as spark df
    df_contents_b_to_d = spark.read.format("com.databricks.spark.avro").load(HDFS_RUCIO_CONTENTS) \
        .withColumnRenamed("NAME", "dataset") \
        .withColumnRenamed("CHILD_NAME", "block") \
        .select(["dataset", "block"]) \
        .cache()

    # STEP-3: Get ids of only Disk RSEs
    disk_rse_ids = get_disk_rse_ids()

    # STEP-4: Create spark dataframe for REPLICAS table by filtering only Disk RSEs of CMS. Importance:
    #           - provides files in Disk RSEs
    #           - provides file size in RSEs (not all of them, see Step-6)
    df_replicas = spark.read.format("avro") \
        .load(HDFS_RUCIO_REPLICAS) \
        .withColumn("rse_id", lower(_hex(col("RSE_ID")))) \
        .withColumn("fsize_replicas", col("BYTES").cast(LongType())) \
        .withColumnRenamed("NAME", "file") \
        .filter(col("rse_id").isin(disk_rse_ids)) \
        .filter(col("SCOPE") == "cms") \
        .select(["file", "rse_id", "fsize_replicas"]) \
        .cache()

    # STEP-5: Create spark dataframe for DIDS table by selecting only Files. Importance:
    #           - provides whole files in CMS
    #           - provides file access times!
    #           - provides file size (compatible with replicas, tested), (not all of them, see step-6)
    df_dids_files = spark.read.format("avro") \
        .load(HDFS_RUCIO_DIDS) \
        .filter(col("SCOPE") == "cms") \
        .filter(col("DID_TYPE") == "F") \
        .withColumnRenamed("NAME", "file") \
        .withColumnRenamed("ACCESSED_AT", "accessed_at") \
        .withColumn("fsize_dids", col("BYTES").cast(LongType())) \
        .select(["file", "fsize_dids", "accessed_at"]) \
        .cache()

    # STEP-6: Left join df_replicas and df_dids_files to fill the fsize for all files. Importance:
    #           - fills fsize for all files by combining both REPLICAS values and DIDS values
    df_replicas_j_dids = df_replicas.join(df_dids_files, ["file"], how="left").cache()

    # STEP-7: Check that REPLICAS and DIDS Files join filled the file size values of all files. Yes!
    if df_replicas_j_dids.filter(col("fsize_dids").isNull() & col("fsize_replicas").isNull()).head():
        print("We have a problem! At least one of them should not be null !")
        sys.exit(1)
    # STEP-8: Check that REPLICAS and DIDS Files size values are compatible. Yes!
    elif df_replicas_j_dids.withColumn("bytes_ratio",
                                       when(
                                           col("fsize_dids").isNotNull() & col("fsize_replicas").isNotNull(),
                                           col("fsize_dids") / col("fsize_replicas")
                                       ).otherwise("0")
                                       ).filter((col("bytes_ratio") != 1.0) & (col("bytes_ratio") != 0)).head():
        print("We have a problem, bytes are not equal in DIDS and REPLICAS!")
        sys.exit(1)

    # STEP-9: fsize_dids or fsize_replicas should not be null. Just combine them to fill file sizes.
    #   - Because size of files are filled, I called "complete"
    df_files_complete = df_replicas_j_dids \
        .withColumn("fsize",
                    when(col("fsize_dids").isNotNull(), col("fsize_dids"))
                    .when(col("fsize_replicas").isNotNull(), col("fsize_replicas"))
                    ) \
        .select(['file', 'rse_id', 'accessed_at', 'fsize']) \
        .cache()

    return (df_contents_f_to_b,
            df_contents_b_to_d,
            df_replicas,
            df_dids_files,
            df_replicas_j_dids,
            df_files_complete
            )

In [105]:
(df_contents_f_to_b,
 df_contents_b_to_d,
 df_replicas,
 df_dids_files,
 df_replicas_j_dids,
 df_files_complete) = prepare_spark_dataframes(spark)

# ===============================================================================
# Continue with joins
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# -------------------------------------------------------------------------------

# --- STEP-10 / Tests to check dataframes are okay ---:
#         df_block_file_rse.select("file").distinct().count() =  is 29921156
#         df_block_file_rse.filter(col("file").isNull()).count() = 0
#         df_block_file_rse.filter(col("block").isNull()).count() = 57892
#         Above line means, we cannot extract block names of 57892 file from CONTENTS table ..
#         .. which provides F:D and D:C mapping (file, dataset, container in Rucio terms)
#         df_block_file_rse.filter(col("rse_id").isNull()).count() = 0
#         df_block_file_rse.filter(col("fsize").isNull()).count() = 0
#         We are all good, just drop null block names.

# STEP-10: Left join df_files_complete and df_contents_f_to_b to get block names of files.
#   - There are some files that we cannot extract their block names from CONTENTS table
#   - So filter out them.
df_block_file_rse = df_files_complete \
    .join(df_contents_f_to_b, ["file"], how="left") \
    .select(['block', 'file', 'rse_id', 'accessed_at', 'fsize', ]) \
    .filter(col("block").isNotNull()) \
    .cache()

# --- STEP-11 / Tests to check dataframes are okay ---:
#         df_all.filter(col("dataset").isNull()).count() = 280821

# STEP-11: Left join df_block_file_rse and df_contents_b_to_d to get dataset names of blocks&files.
#   - There are some blocks that we cannot extract their dataset names from CONTENTS table.
#   - So filter out them.
df_all = df_block_file_rse \
    .join(df_contents_b_to_d, ["block"], how="left") \
    .select(['dataset', 'block', 'file', 'rse_id', 'accessed_at', 'fsize']) \
    .filter(col("dataset").isNotNull()) \
    .cache()

# STEP-12: Group by "dataset" and "rses" to calculate:
#       - dataset_size_in_rse: total size of dataset in a RSE by summing up dataset's all files in that RSE.
#       - last_access_time_of_dataset_per_rse: last access time of dataset in a RSE ...
#           ... by getting max of file "accessed_at" field of dataset's all files in that RSE.
#       - #files_null_access_time_per_rse: number of files which has NULL "access_at" field ...
#           ... in each dataset in a RSE. ...
#           ... This important to know to filter out if there is any NULL access_at file in calculation.
#       - #files_per_rse: number of files od the dataset in that RSE
#       - #files_unique_per_rse: unique count of dataset files in that RSE
#       Final result will be like: one dataset can be in multiple RSEs and presumably ...
#           ... it may have different sizes since a dataset may lost one of its block or file in a RSE?
df_final_dataset_rse = df_all \
    .groupby(["dataset", "rse_id"]) \
    .agg(_sum(col("fsize")).alias("dataset_size_in_rse"),
         _max(col("accessed_at")).alias("last_access_time_of_dataset_per_rse"),
         _sum(when(col("accessed_at").isNull(), 1).otherwise(0)).alias("#files_null_access_time_per_rse"),
         _count(lit(1)).alias("#files_per_rse"),
         countDistinct(col("file")).alias("#files_unique_per_rse"),
         ) \
    .cache()

# STEP-13: Filter "last_access_time_of_dataset_per_rse" should not be null, should be one of 3, 6 or 12 ...
#            ... and "#files_null_access_time_per_rse"==0 means that there should not be any file ...
#            ... with NULL "access_at" field to be correct as much as possible.
df_final_dataset_rse = df_final_dataset_rse.filter(col("last_access_time_of_dataset_per_rse").isNotNull() &
                                                   (col("#files_null_access_time_per_rse") == 0)).cache()

# STEP-14: Get thresholds. They are unix timestamps which are 3, 6 and 12 months ago from today.
ts_thresholds = get_ts_thresholds()

# STEP-15: Group by dataset to get final result from all RSEs' datasets.
#            - max_dataset_size(TB): max size of dataset in all RSEs that contain this dataset
#            - max_dataset_size(TB): min size of dataset in all RSEs that contain this dataset
#            - max_dataset_size(TB): avg size of dataset in all RSEs that contain this dataset
#            - last_access_time_of_dataset: last access time of dataset in all RSEs
df = df_final_dataset_rse \
    .groupby(["dataset"]) \
    .agg(_round(_max(col("dataset_size_in_rse")) / (10 ** 12), 5).alias("max_dataset_size(TB)"),
         _round(_min(col("dataset_size_in_rse")) / (10 ** 12), 5).alias("min_dataset_size(TB)"),
         _round(_avg(col("dataset_size_in_rse")) / (10 ** 12), 5).alias("avg_dataset_size(TB)"),
         _max(col("last_access_time_of_dataset_per_rse")).alias("last_access_time_of_dataset"),
         ) \
    .withColumn('last_access_at_least_n_months_ago',
                when(col('last_access_time_of_dataset') < ts_thresholds[12], 12)
                .when(col('last_access_time_of_dataset') < ts_thresholds[6], 6)
                .when(col('last_access_time_of_dataset') < ts_thresholds[3], 3)
                .otherwise(None)
                ) \
    .filter(col('last_access_at_least_n_months_ago').isNotNull()) \
    .cache()

In [106]:
df.select(["max_dataset_size(TB)", "min_dataset_size(TB)", "avg_dataset_size(TB)"]).groupBy().sum().show()

+-------------------------+-------------------------+-------------------------+
|sum(max_dataset_size(TB))|sum(min_dataset_size(TB))|sum(avg_dataset_size(TB))|
+-------------------------+-------------------------+-------------------------+
|        7146.659590000003|       5149.6163499999975|       5845.4937700000055|
+-------------------------+-------------------------+-------------------------+



In [107]:
df.limit(10).toPandas()

Unnamed: 0,dataset,max_dataset_size(TB),min_dataset_size(TB),avg_dataset_size(TB),last_access_time_of_dataset,last_access_at_least_n_months_ago
0,/Zprime_NonUniversalSSM_M1750_gl0p5_gh0p5_mg_pythia8_TuneCUETP8M1/RunIISummer16NanoAODv7-PUMoriond17_Nano02Apr2020_102X_mcRun2_asymptotic_v8-v1/NANOAODSIM,0.00019,1e-05,7e-05,1606164238000,6
1,/ggH_HToSSTo4l_MH-600_MS-10_ctauS-1000_TuneCP5_13TeV-powheg-pythia8/RunIIFall17NanoAODv7-PU2017_12Apr2018_Nano02Apr2020_102X_mc2017_realistic_v8-v1/NANOAODSIM,7e-05,7e-05,7e-05,1600314541000,6
2,/GJet_Pt-20toInf_DoubleEMEnriched_MGG-40to80_TuneCP5_13TeV_Pythia8/RunIISummer19UL17MiniAOD-106X_mc2017_realistic_v6-v2/MINIAODSIM,0.4324,0.20519,0.31879,1608028875000,6
3,/GluGluToHHTo2B2WToLNu2J_node_12_TuneCUETP8M1_PSWeights_13TeV-madgraph-pythia8/RunIISummer16NanoAODv7-PUMoriond17_Nano02Apr2020_102X_mcRun2_asymptotic_v8-v1/NANOAODSIM,0.00061,0.0,0.00013,1602198510000,6
4,/JJH0Mf05ph0ToTauTauPlusOneJets_M125_TuneCUETP8M1_13TeV-mcatnloFXFX-pythia8/RunIISummer16NanoAODv7-PUMoriond17_Nano02Apr2020_102X_mcRun2_asymptotic_v8-v1/NANOAODSIM,0.00065,0.00065,0.00065,1596579991000,12
5,/TprimeBToTZ_M-1000_Width-30p_LH_TuneCP5_13TeV-madgraph-pythia8/RunIIFall17NanoAODv7-PU2017_12Apr2018_Nano02Apr2020_102X_mc2017_realistic_v8_ext1-v1/NANOAODSIM,0.00097,0.00097,0.00097,1590225678000,12
6,/GluGluToRadionToHHTo2B2ZTo2L2J_M-2500_narrow_TuneCUETP8M1_PSWeights_13TeV-madgraph-pythia8/RunIISummer16NanoAODv7-PUMoriond17_Nano02Apr2020_102X_mcRun2_asymptotic_v8-v1/NANOAODSIM,0.00052,1e-05,0.00013,1606860971000,6
7,/VHToGG_M85_TuneCP5_13TeV-amcatnloFXFX-madspin-pythia8/RunIISummer19UL18NanoAOD-106X_upgrade2018_realistic_v11_L1v1-v1/NANOAODSIM,0.0004,1e-05,9e-05,1598419208000,12
8,/ZprimeToTTJet_M2000_TuneCP2_13TeV-madgraph-pythia8/RunIIFall17MiniAODv2-PU2017_12Apr2018_94X_mc2017_realistic_v14_ext1-v1/MINIAODSIM,0.01338,0.01338,0.01338,1590054905000,12
9,/SMS-T5qqqqZH_HToBB-mN2-1000to1800_TuneCP2_13TeV-madgraphMLM-pythia8/RunIIAutumn18NanoAODv7-PUFall18Fast_Nano02Apr2020_GridpackScan_102X_upgrade2018_realistic_v21-v1/NANOAODSIM,0.00147,2e-05,0.0004,1604287860000,6
