In [1]:
PUBLIC_PATH = "/eos/user/f/fgomezco/www/"

In [2]:

from datetime import datetime

import click
import os
import pandas as pd
from dateutil.relativedelta import relativedelta
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, collect_list, countDistinct, concat_ws, 
    greatest, lit, lower, udf, when, from_unixtime, split,
    avg as _avg,
    count as _count,
    hex as _hex,
    max as _max,
    min as _min,
    round as _round,
    sum as _sum,
    desc as _desc,
)

from pyspark.sql.types import (
    LongType,
    StringType,
)

TODAY = datetime.today().strftime('%Y-%m-%d')

print(TODAY)

# https://github.com/dmwm/CMSKubernetes/tree/master/docker/sqoop/scripts
HDFS_RUCIO_CONTENTS = "/project/awg/cms/rucio/{}/contents/part*.avro".format(TODAY)
HDFS_RUCIO_REPLICAS = "/project/awg/cms/rucio/{}/replicas/part*.avro".format(TODAY)
HDFS_RUCIO_DIDS     = "/project/awg/cms/rucio/{}/dids/part*.avro".format(TODAY)
HDFS_RUCIO_LOCKS    = "/project/awg/cms/rucio/{}/locks/part*.avro".format(TODAY)
HDFS_RUCIO_RSES     = "/project/awg/cms/rucio/{}/rses/part*.avro".format(TODAY)
HDFS_RUCIO_RULES     = "/project/awg/cms/rucio/{}/rules/part*.avro".format(TODAY)

pd.options.display.float_format = '{:,.2f}'.format
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)

def get_rse_ids(rse_name):
    df_target_rse = df_rses.filter(col("rse") == rse_name).toPandas()
    return df_target_rse['rse_id'].iloc[0]

# Get the Contents Table (dataset-file table)
df_contents_f = spark.read.format('avro').load(HDFS_RUCIO_CONTENTS) \
    .filter(col("CHILD_TYPE").contains('F')) \
    .withColumnRenamed("NAME", 'parent_did') \
    .withColumnRenamed("CHILD_NAME", 'child_name') \
    .withColumn('size_bytes', col('BYTES').cast(LongType())) \
    .select(["child_name", "parent_did", "size_bytes"]) \
#    .withColumnRenamed("BYTES", 'size_bytes') \




2022-09-28


In [3]:
df_replicas_main = spark.read.format('avro').load(HDFS_RUCIO_REPLICAS) \
    .filter(col("STATE") == 'A') \
    .withColumn('f_name', col('NAME')) \
    .withColumn('replica_rse_id', lower(_hex(col('RSE_ID')))) \
    .withColumn('f_size', col('BYTES').cast(LongType())) \
    .withColumn('created_at', from_unixtime(col('CREATED_AT')/1000)) \
    .select(['replica_rse_id','f_name','f_size','created_at'])
    

In [4]:
# Get the RSEs table
df_target_rses = spark.read.format('avro').load(HDFS_RUCIO_RSES) \
    .withColumnRenamed("RSE", "rse") \
    .withColumn('rse_id', lower(_hex(col('ID')))) \
    .select(['rse_id', 'rse']) \
    .filter( \
            ( col('rse').contains("_Tape") ) & \
            ( ~ col('rse').contains("T3_") ) & \
            ( ~ col('rse').contains("T2_") ) & \
            ( ~ col('rse').contains("Temp") ) & \
            ( ~ col('rse').contains("Test") ) ) \
    .toPandas()   

In [5]:
df_target_rses

Unnamed: 0,rse_id,rse
0,dbdaf50ee9d24f92ad24473539522204,T1_US_FNAL_Tape
1,f091e56ff0f440e28db115dada9fc97e,T1_ES_PIC_Tape
2,515d41d70dd843098f0029b9ae6b4914,T1_UK_RAL_Tape
3,89a267a997d74de6850af9442e21d2a8,T1_IT_CNAF_Tape
4,f44c866a264d4da9972969e9f3b5bb52,T0_CH_CERN_Tape
5,5017683aea934c46b94bac226f086e53,T1_FR_CCIN2P3_Tape
6,71b2e6c1a389439d9363dee61bf2aa04,T1_RU_JINR_Tape
7,3d355ce4f055410daf0ea660e77eb73d,T1_DE_KIT_Tape


In [6]:
df_rses = spark.read.format('avro').load(HDFS_RUCIO_RSES)\
    .withColumnRenamed("RSE", "rse") \
    .withColumn('rse_id', lower(_hex(col('ID')))) \
    .select(['rse_id', 'rse']) \
    .filter( \
            ( col('rse').contains("_Tape") ) & \
            ( ~ col('rse').contains("T3_") ) & \
            ( ~ col('rse').contains("T2_") ) & \
            ( ~ col('rse').contains("Temp") ) & \
            ( ~ col('rse').contains("Test") ) ) 

In [7]:
df_replicas_tapes = df_replicas_main.join(df_rses.withColumnRenamed('rse_id', 'replica_rse_id'), ['replica_rse_id'], how='left').dropna().select(['rse', 'replica_rse_id', 'f_name', 'f_size', 'created_at'])

In [8]:
df_replicas_tapes

DataFrame[rse: string, replica_rse_id: string, f_name: string, f_size: bigint, created_at: string]

In [9]:
df_tape_contents = df_replicas_tapes.join( df_contents_f.withColumnRenamed('parent_did', 'dataset') \
                                          , df_replicas_tapes["f_name"] == df_contents_f["child_name"], how="left") \
    .select(['rse', 'f_name', 'f_size', 'created_at', 'dataset'])

In [10]:
aux = df_tape_contents.groupby(['rse', 'dataset']).agg(_sum('f_size').alias('size_bytes'), _max('created_at').alias('max_created_at'), _min('created_at').alias('min_created_at'))

In [11]:
aux = aux.withColumn("container", split(aux["dataset"], "#").getItem(0))

In [12]:
aux

DataFrame[rse: string, dataset: string, size_bytes: bigint, max_created_at: string, min_created_at: string, container: string]

In [13]:
aux = aux.withColumn("tier", split(aux["container"], "/").getItem(3))

In [16]:
DF = aux.groupBy(['rse', 'container', 'tier']) \
    .agg(_sum('size_bytes').alias('size_bytes'), \
         _max('max_created_at').alias('max_created_at'), \
         _min('min_created_at').alias('min_created_at'))

In [18]:
DF

DataFrame[rse: string, container: string, tier: string, size_bytes: bigint, max_created_at: string, min_created_at: string]

In [15]:
DF.limit(100).toPandas()

Unnamed: 0,rse,container,tier,sum(size_bytes),max(max_created_at),min(min_created_at)
0,T0_CH_CERN_Tape,/ADDGravToGG_NegInt-0_LambdaT-6500_M-4000To6500_TuneCP2_13TeV-pythia8/RunIIFall17GS-93X_mc2017_realistic_v3-v1/GEN-SIM,GEN-SIM,58827294771,2020-10-02 00:20:47,2020-09-16 09:02:53
1,T0_CH_CERN_Tape,/ADDGravToGG_NegInt-0_LambdaT-7000_M-1000To2000_TuneCP2_13TeV-pythia8/RunIIFall17MiniAODv2-PU2017_12Apr2018_94X_mc2017_realistic_v14-v1/MINIAODSIM,MINIAODSIM,4164135433,2020-10-02 02:57:07,2020-09-20 03:52:14
2,T0_CH_CERN_Tape,/ADDGravToLL_LambdaT-10000_M-2000ToInf_13TeV-pythia8/RunIIAutumn18DRPremix-102X_upgrade2018_realistic_v15-v1/AODSIM,AODSIM,57203254927,2020-10-01 22:02:40,2020-09-20 03:58:45
3,T0_CH_CERN_Tape,/ADDGravToLL_LambdaT-8000_M-800To1300_13TeV-pythia8/RunIIFall17MiniAODv2-PU2017_12Apr2018_94X_mc2017_realistic_v14_ext1-v1/MINIAODSIM,MINIAODSIM,6860260968,2020-10-01 23:44:18,2020-09-16 08:46:28
4,T0_CH_CERN_Tape,/ADDGravToLL_LambdaT-9000_M-1300To2000_13TeV-pythia8/RunIIFall17NanoAODv5-PU2017_12Apr2018_Nano1June2019_102X_mc2017_realistic_v7-v1/NANOAODSIM,NANOAODSIM,238363706,2020-06-26 07:54:10,2020-06-26 04:01:55
5,T0_CH_CERN_Tape,/ADDmonoPhoton_MD-1_d-3_TuneCP5_13TeV-pythia8/RunIISummer20UL18RECO-106X_upgrade2018_realistic_v11_L1v1-v2/AODSIM,AODSIM,16255518411,2021-05-04 03:13:18,2021-05-04 03:13:18
6,T0_CH_CERN_Tape,/ADDmonoPhoton_MD-1_d-3_TuneCUETP8M1_13TeV-pythia8/RunIIFall15DR76-25nsFlat10to50ZsecalNzshcalRaw_76X_mcRun2_asymptotic_2016EcalTune_30fb_v1-v1/AODSIM,AODSIM,14825920139,2020-10-02 02:53:28,2020-09-23 02:44:28
7,T0_CH_CERN_Tape,/AlCaLumiPixels2/Run2017G-v1/RAW,RAW,327370464,2020-09-23 03:30:28,2020-09-23 03:30:28
8,T0_CH_CERN_Tape,/AlCaLumiPixels3/Run2018C-v1/RAW,RAW,790132439205,2020-09-28 22:46:04,2020-09-16 09:47:12
9,T0_CH_CERN_Tape,/AxialMonoW_Mphi-300_Mchi-300_gSM-0p25_gDM-1p0_v2_13TeV-madgraph/RunIISummer16NanoAODv4-PUMoriond17_Nano14Dec2018_102X_mcRun2_asymptotic_v6-v1/NANOAODSIM,NANOAODSIM,355609653,2020-06-26 04:08:59,2020-06-26 04:08:59


In [18]:
DF.count()

528364

In [19]:
test = DF.groupBy(['Tier']).agg(_count('container'), _sum('size_bytes').alias('size_bytes'))

In [20]:
test.toPandas()

Unnamed: 0,Tier,count(container),size_bytes
0,DQMIO,1184,54634135177809
1,GEN,401,428592794458064
2,RAW-RECO,1616,5004307796117657
3,GEN-SIM,5658,10452274631232937
4,AODSIM,105746,67681527941871765
5,GEN-SIM-DIGI-RAW,897,14129363579283180
6,USER,4516,4171245529173783
7,GEN-SIM-DIGI-RAW-MINIAOD,301,904421173225773
8,LHE,7845,934697495392633
9,NANOAODSIM,207970,448281422100372


In [28]:
filename = "TapeDeletionCampaignFall2022_AllContainersAtTapeSites.csv"

In [23]:
DF_pandas = DF.toPandas()

In [26]:
! mkdir /eos/user/f/fgomezco/www/TapeDeletionCampaignFall2022/

In [29]:
DF_pandas.to_csv(path_or_buf=PUBLIC_PATH+filename)

In [28]:
spark.conf.set("spark.sql.debug.maxToStringFields", 200)


In [31]:
test = DF.groupBy(['Tier']).agg(_count('container'), _sum('sum(size_bytes)').alias('size_bytes'))

In [32]:
test.toPandas()

Unnamed: 0,Tier,count(container),size_bytes
0,GEN,394,406459499015495
1,DQMIO,1166,54609544501644
2,RAW-RECO,1595,5003250155498196
3,GEN-SIM,5647,10358942352110224
4,AODSIM,101978,67021459653274068
5,GEN-SIM-DIGI-RAW,897,14129363579283180
6,USER,4498,4169489085168408
7,GEN-SIM-DIGI-RAW-MINIAOD,298,903496839030728
8,LHE,7837,931755317460014
9,GEN-SIM-RAW,222,1084039061945533


In [None]:
DF.limit(100).toPandas()

In [30]:
48870000000 / 10**12

0.04887