In [1]:
from datetime import datetime

import click
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, collect_list, collect_set, concat_ws, first, format_number, from_unixtime, greatest, lit, lower, when,
    avg as _avg,
    count as _count,
    hex as _hex,
    max as _max,
    min as _min,
    round as _round,
    split as _split,
    sum as _sum,
)

from pyspark.sql.types import (
    LongType,
    DecimalType
)

In [2]:
import pandas as pd
import pyspark.sql.functions as func

In [3]:
TODAY = datetime.today().strftime('%Y-%m-%d')
# Rucio

HDFS_RUCIO_CONTENTS = f"/project/awg/cms/rucio/{TODAY}/contents/part*.avro"
HDFS_RUCIO_DATASET_LOCKS = f'/project/awg/cms/rucio/{TODAY}/dataset_locks/part*.avro'
HDFS_RUCIO_DIDS = f"/project/awg/cms/rucio/2022-07-17/dids/part*.avro"
HDFS_RUCIO_LOCKS = f"/project/awg/cms/rucio/{TODAY}/locks/part*.avro"
HDFS_RUCIO_REPLICAS = f"/project/awg/cms/rucio/{TODAY}/replicas/part*.avro"
HDFS_RUCIO_RSES = f'/tmp/cmsmonit/rucio_daily_stats-{TODAY}/RSES/part*.avro'
HDFS_RUCIO_RULES = f"/project/awg/cms/rucio/{TODAY}/rules/part*.avro"

# DBS
HDFS_DBS_DATASETS = f'/tmp/cmsmonit/rucio_daily_stats-{TODAY}/DATASETS/part*.avro'
HDFS_DBS_BLOCKS = f'/tmp/cmsmonit/rucio_daily_stats-{TODAY}/BLOCKS/part*.avro'
HDFS_DBS_FILES = f'/tmp/cmsmonit/rucio_daily_stats-{TODAY}/FILES/part*.avro'
TB_DENOMINATOR = 10 ** 12
FLT_N_TB_DGTS = 8

pd.options.display.float_format = '{:,.2f}'.format
pd.set_option('display.max_columns', None)
pd.set_option('max_colwidth', None)
pd.set_option('max_rows', None)

In [4]:
def get_df_locks(spark):
    return spark.read.format('avro').load(HDFS_RUCIO_LOCKS) \
        .withColumn('rse_id', lower(_hex(col('RSE_ID')))) \
        .withColumn('f_size', col('BYTES').cast(LongType())) \
        .filter(col('state')=='O')\
        .withColumnRenamed('NAME', 'f_name') \
        .select(['f_name', 'rse_id', 'f_size'])


#         .filter(col('state')=='A')\
#These include replicas that are unavailable too
def get_df_replicas(spark):
    return spark.read.format('avro').load(HDFS_RUCIO_REPLICAS) \
        .withColumn('rse_id', lower(_hex(col('RSE_ID')))) \
        .withColumn('f_size', col('BYTES').cast(LongType())) \
        .withColumn('locked', when((col("lock_cnt") > 0), 'y').otherwise('n')) \
        .select(['rse_id', 'f_size', 'locked', 'lock_cnt'])


def get_df_rses(spark):
    df_rses = spark.read.format("com.databricks.spark.avro").load(HDFS_RUCIO_RSES) \
        .filter(col('DELETED_AT').isNull()) \
        .withColumn('id', lower(_hex(col('ID')))) \
        .withColumn('rse_tier', _split(col('RSE'), '_').getItem(0)) \
        .withColumn('rse_country', _split(col('RSE'), '_').getItem(1)) \
        .withColumn('rse_kind',
                    when((col("rse").endswith('Temp') | col("rse").endswith('temp') | col("rse").endswith('TEMP')),
                         'temp')
                    .when((col("rse").endswith('Test') | col("rse").endswith('test') | col("rse").endswith('TEST')),
                          'test')
                    .otherwise('prod')
                    ) \
        .select(['id', 'RSE', 'RSE_TYPE', 'rse_tier', 'rse_country', 'rse_kind'])
    return df_rses


def get_df_rse_limits(spark):
    df_rse_limits = pd.read_csv("rse_limits.csv")
    df_rse_limits = spark.createDataFrame(df_rse_limits).withColumn('id', lower(col('RSE_ID'))).select(["id", "value"])
    return df_rse_limits

In [5]:
df_replicas = get_df_replicas(spark)
df_rses = get_df_rses(spark)

In [6]:
df_grouped_by_rse = df_replicas.groupby("rse_id")\
    .pivot("locked", values=['y', 'n'])\
    .agg(func.sum("f_size"), func.count("f_size"))\
    .na.fill(value=0)

In [13]:
# df_rse_summary = df_grouped_by_rse.alias("rep1").join(df_rses.alias("rse"), col("rep1.rse_id")==col("rse.id"))\
#     .select(["rep1.rse_id", "rse.rse", "rse.rse_type", "rep1.count_locked", "rep1.size_locked", "rep1.count_dynamic", "rep1.size_dynamic", "rep1.count_total", "rep1.size_total"])

In [14]:
# df_rse_summary.write.format("json").save("rse_info.json")

In [15]:
# df_rse_summary.write.format("com.mongodb.spark.sql.DefaultSource").option("database", "rchauhan").option("collection", "rse_info_connector").save()

In [16]:
# rse_summary_pandas = df_rse_summary.toPandas()


In [17]:
# df_rse_summary_v2 = df_rse_summary.alias("summary").join(df_rse_limits.alias("limits"), col("summary.rse_id")==col("limits.id"), 'inner')\
#     .withColumnRenamed("value", "limit")\
#     .select(["summary.rse_id", "rse", "rse_type", "rse_tier", "rse_country",
#             "locked_size", "locked_file_count", "size_not_locked", "file_not_locked_count", "limit"])

In [18]:
# rse_summary_pandas

In [19]:
# df_rse_summary.show()

# Container - File Map


In [20]:
def get_df_contents(spark):
    return spark.read.format('avro').load(HDFS_RUCIO_CONTENTS) \
        .filter(col("scope")=="cms")\
        .select(['name', 'child_name', 'did_type', 'child_type'])

In [21]:
df_contents = get_df_contents(spark)
df_contents_file = df_contents.filter(col("child_type")=="F").withColumnRenamed("child_name", "file")

In [22]:
df_dataset_file_map = df_contents_file.withColumn("dataset", func.element_at(func.split("name","#"),1)).select(["file", "dataset"])
df_dataset_file_map = df_dataset_file_map.withColumn("data_tier", func.element_at(func.split("dataset","/"),-1)).filter(col("data_tier")!="CONTAINER")

In [24]:
# df_dataset_file_map.limit(5).toPandas()

In [25]:
# df_contents = df_contents.filter(col("did_type")=="C").withColumn("data_tier", func.element_at(func.split("name","/"),-1))
# df_with_containers = df_contents.select(["data_tier"]).distinct()

In [26]:
def get_df_replicas_with_name(spark):
    return spark.read.format('avro').load(HDFS_RUCIO_REPLICAS) \
        .withColumn('rse_id', lower(_hex(col('RSE_ID')))) \
        .withColumn('f_size', col('BYTES').cast(LongType())) \
        .withColumn('locked', when((col("lock_cnt") > 0), 'y').otherwise('n')) \
        .filter(col("rse_id").isin(disk_rse_list))\
        .select(['name', 'f_size', 'locked', 'lock_cnt', 'state']) #to get just the disk rses

In [28]:
# df_replicas_with_name = get_df_replicas_with_name(spark)

In [30]:
# df_joined = df_replicas_with_name.alias("repname1").join(df_dataset_file_map.alias("dfmap1"), col("repname1.name")==col("dfmap1.file"), "left")

In [31]:
# df_joined.limit(10).toPandas()

In [32]:
# #File that do not have a map to a dataset
# df_joined.filter(col("file").isNull()).filter(~col("name").contains("/store/user")).limit(10).toPandas()

In [33]:

# df_with_unknown = df_joined\
#     .withColumn('data_tier_new', when(col("file").isNull(), "UNKNOWN")\
#     .otherwise(col("data_tier")))

In [34]:
# df_with_unknown.filter(col("data_tier_new")=="UNKNOWN").show()

In [36]:
# df_temp_new = df_with_unknown.groupby("data_tier_new")\
#     .pivot("state")\
#     .agg(func.sum("f_size"))

In [37]:
# df_temp_new2 = df_with_unknown.groupby("data_tier_new")\
#     .pivot("locked")\
#     .agg(func.sum("f_size"))

In [38]:
# df_data_tier_state_pivot = df_temp_new.toPandas()
# df_data_tier_lock_pivot = df_temp_new2.toPandas()

In [39]:
# df_data_tier_lock_pivot_copy = df_data_tier_lock_pivot
# df_data_tier_state_pivot_copy = df_data_tier_state_pivot

In [40]:
# df_data_tier_state_pivot.fillna(0, inplace=True)
# df_data_tier_state_pivot["total"] = df_data_tier_state_pivot["A"] + df_data_tier_state_pivot["B"] + df_data_tier_state_pivot["C"] + df_data_tier_state_pivot["D"] + df_data_tier_state_pivot["U"]


# df_data_tier_state_pivot.sort_values(by="total", ascending=False)

In [41]:
# df_data_tier_lock_pivot.to_csv("disk_datatier_lock_pivot.csv")
# df_data_tier_state_pivot.to_csv("disk_datatier_state_pivot.csv")

In [42]:
# df_data_tier_lock_pivot.fillna(0, inplace=True)
# df_data_tier_lock_pivot["total"] = df_data_tier_lock_pivot["n"] + df_data_tier_lock_pivot["y"]
# df_data_tier_lock_pivot["n_tb"] = df_data_tier_lock_pivot["n"]/(10**12)
# df_data_tier_lock_pivot["y_tb"] = df_data_tier_lock_pivot["y"]/(10**12)
# df_data_tier_lock_pivot["total_tb"] = df_data_tier_lock_pivot["total"]/(10**12)


# df_data_tier_lock_pivot.sort_values(by="total", ascending=False)

In [43]:
# temp_df = pd.read_csv("data_tier_info_locks.csv")
# temp_df

In [44]:
#files without parent 
# difference = df_replicas_with_name.select(["name"]).distinct().subtract(df_dataset_file_map.select(["file"]))

In [45]:
# difference.limit(20).toPandas()

In [46]:
# df_temp = df_dataset_file_map.groupby("file").agg(collect_set("data_tier").alias("tiers"), collect_list("dataset").alias("same_parent"))

In [47]:
# df_temp.filter(func.size(col("tiers")) > 1).filter(~func.array_contains(col("tiers"), "CONTAINER")).limit(20).toPandas()

In [48]:
# df_joined = df_contents.alias("df1").join(df_contents.alias("df2"), col("df1.child_name")==col("df2.name"), "inner" ).select(["df1.name", "df2.child_name", "df1.did_type", "df2.child_type"])

In [49]:
# df_joined2 = df_joined.alias("df3").join(df_contents.alias("df4"), col("df3.child_name")==col("df4.name"), "inner" ).select(["df3.name", "df4.child_name", "df3.did_type", "df4.child_type"])

In [50]:
# df_joined.filter(col("child_type")=='C').show(2)

In [51]:
# df_file_container_map = df_joined.filter(col("child_type")=='F')\
#                         .withColumnRenamed("name", "cname")\
#                         .withColumnRenamed("child_name", "fname")

In [52]:
# df_file_container_map.filter(col("did_type") == "D").count()

In [53]:
# df_file_container_map_v2 = df_file_container_map.withColumn("data_tier", func.element_at(func.split("cname","/"),-1))

In [54]:
# df_file_container_map_v2.groupby("data_tier").count().toPandas()

In [55]:
def get_df_replicas_with_name(spark):
    return spark.read.format('avro').load(HDFS_RUCIO_REPLICAS) \
        .withColumn('f_size', col('BYTES').cast(LongType())) \
        .withColumn('locked', when((col("lock_cnt") > 0), 'y').otherwise('n')) \
        .select(['name', 'f_size', 'locked', 'lock_cnt'])

In [56]:
# df_replicas_with_name = get_df_replicas_with_name(spark)

In [57]:
# df_replicas_with_name.columns

In [58]:
# result_df = df_file_container_map_v2.alias("map1").join(df_replicas_with_name.alias("rep1"), col("map1.fname")==col("rep1.name"), "inner")

In [59]:
# df_replicas_with_name.count()

In [60]:
# result_df.distinct().count()

In [61]:
# result_df.limit(5).toPandas()

In [62]:
# df_replicas_with_name.distinct().count()

In [63]:
# df_file_container_map_v2.distinct().count()

In [64]:
# df_file_container_map.distinct().count()

In [65]:
# df_contents.filter(col("child_type")=="F").count()

In [66]:
# df_file_container_map.filter(col("child_type")=="F").count()

In [67]:
# df_diff = df_contents.filter(col("child_type")=="F").select(["child_name"]).subtract(df_file_container_map.select(["fname"]))

In [68]:
# df_diff.limit(20).toPandas()

In [69]:
def get_df_replicas(spark):
    """Create main replicas dataframe by selecting only Disk or Tape RSEs in Rucio REPLICAS table

    Columns selected:
        - f_name: file name
        - f_size_replicas: represents size of a file in REPLICAS table
        - rse_id
        - rep_accessed_at
        - rep_created_at
    """
    # List of all RSE id list
    # rse_id_list = df_pd_rses['replica_rse_id'].to_list()
    # .filter(col('rse_id').isin(rse_id_list)) \
    return spark.read.format('avro').load(HDFS_RUCIO_REPLICAS) \
        .withColumn('rse_id', lower(_hex(col('RSE_ID')))) \
        .withColumn('f_size_replicas', col('BYTES').cast(LongType())) \
        .withColumnRenamed('NAME', 'f_name') \
        .withColumnRenamed('ACCESSED_AT', 'rep_accessed_at') \
        .withColumnRenamed('CREATED_AT', 'rep_created_at') \
        .select(['scope', 'f_name', 'rse_id', 'f_size_replicas', 'rep_accessed_at', 'rep_created_at'])

In [70]:
# # select count(c.name) from CMS_RUCIO_PROD.replicas r join CMS_RUCIO_PROD.contents c on r.name=c.child_name and r.rse_id = '69DC2E6F37F84650BF96F9CC1ECB1FBF';

# rseId='69DC2E6F37F84650BF96F9CC1ECB1FBF'

In [71]:
# file_replicas = get_df_replicas(spark)


In [72]:
#File replicas at given rse


In [73]:
# def get_spark_session(yarn=True, verbose=False):
#     """Get or create the spark context and session.
#     """
#     sc = SparkContext(appName='cms-monitoring-rucio-datasets-for-mongo')
#     return SparkSession.builder.config(conf=sc._conf).getOrCreate()

In [74]:

def get_df_rses(spark):
    """Get pandas dataframe of RSES
    """
    df_rses = spark.read.format("com.databricks.spark.avro").load(HDFS_RUCIO_RSES) \
        .filter(col('DELETED_AT').isNull()) \
        .withColumn('rse_id', lower(_hex(col('ID')))) \
        .withColumn('rse_tier', _split(col('RSE'), '_').getItem(0)) \
        .withColumn('rse_country', _split(col('RSE'), '_').getItem(1)) \
        .withColumn('rse_kind',
                    when((col("rse").endswith('Temp') | col("rse").endswith('temp') | col("rse").endswith('TEMP')),
                         'temp')
                    .when((col("rse").endswith('Test') | col("rse").endswith('test') | col("rse").endswith('TEST')),
                          'test')
                    .otherwise('prod')
                    ) \
        .select(['rse_id', 'RSE', 'RSE_TYPE', 'rse_tier', 'rse_country', 'rse_kind'])
    return df_rses


In [76]:
# df_rses = get_df_rses(spark)
# df_rses.show(2)

In [77]:
# df_rses.filter(col('rse_kind') =='prod').count()

In [78]:
# df_replicas = get_df_replicas(spark)
# df_replicas.show(2)

In [79]:

def get_df_dids_files(spark):
    """Create spark dataframe for DIDS table by selecting only Files in Rucio DIDS table.

    Filters:
        - DELETED_AT not null
        - HIDDEN = 0
        - SCOPE = cms
        - DID_TYPE = F

    Columns selected:
        - f_name: file name
        - f_size_dids: represents size of a file in DIDS table
        - dids_accessed_at: file last access time
        - dids_created_at: file creation time
    """
    return spark.read.format('avro').load(HDFS_RUCIO_DIDS) \
        .filter(col('DELETED_AT').isNull()) \
        .filter(col('HIDDEN') == '0') \
        .filter(col('SCOPE') == 'cms') \
        .filter(col('DID_TYPE') == 'F') \
        .withColumnRenamed('NAME', 'f_name') \
        .withColumnRenamed('ACCESSED_AT', 'dids_accessed_at') \
        .withColumnRenamed('CREATED_AT', 'dids_created_at') \
        .withColumn('f_size_dids', col('BYTES').cast(LongType())) \
        .select(['f_name', 'f_size_dids', 'dids_accessed_at', 'dids_created_at'])

In [80]:
def get_df_ds_locks(spark):
    """Create dataset locks dataframe"""
    return spark.read.format('avro').load(HDFS_RUCIO_DATASET_LOCKS) \
        .filter(col('SCOPE') == 'cms') \
        .withColumn('rse_id', lower(_hex(col('RSE_ID')))) \
        .withColumnRenamed('NAME', 'b_name_ds_locks') \
        .withColumnRenamed('ACCOUNT', 'account_ds_locks') \
        .select(['b_name_ds_locks', 'account_ds_locks', 'rse_id'])

In [81]:
def get_df_dbs_f_d(spark):
    """Create a dataframe for FILE-DATASET membership/ownership map

    Columns selected: f_name, dataset
    """
    dbs_files = spark.read.format('avro').load(HDFS_DBS_FILES) \
        .withColumnRenamed('LOGICAL_FILE_NAME', 'f_name') \
        .withColumnRenamed('DATASET_ID', 'f_dataset_id') \
        .select(['f_name', 'f_dataset_id'])
    dbs_datasets = spark.read.format('avro').load(HDFS_DBS_DATASETS)
    
    df_dbs_f_d = dbs_files.join(dbs_datasets, dbs_files.f_dataset_id == dbs_datasets.d_dataset_id, how='left') \
        .withColumnRenamed('f_dataset_id', 'dataset_id') \
        .withColumnRenamed('d_dataset', 'dataset') \
        .select(['dataset_id', 'f_name', 'dataset'])
    return df_dbs_f_d

In [82]:
# dbs_datasets = spark.read.format('avro').load(HDFS_DBS_DATASETS)
# dbs_datasets.limit(1).toPandas()

In [84]:
# dbs_files = spark.read.format('avro').load(HDFS_DBS_FILES)
# dbs_files.limit(1).toPandas()

In [85]:

def get_df_replicas_j_dids(df_replicas, df_dids_files):
    """Left join of df_replicas and df_dids_files to fill the RSE_ID, f_size and accessed_at, created_at for all files.

    Be aware that there are 2 columns for each f_size, accessed_at, created_at
    They will be combined in get_df_file_rse_ts_size

    Columns:
        comes from DID:       file, dids_accessed_at, dids_created_at, f_size_dids,
        comes from REPLICAS:  file, rse_id, f_size_replicas, rep_accessed_at, rep_created_at
   """
    return df_replicas.join(df_dids_files, ['f_name'], how='left')

In [86]:
def get_df_file_rse_ts_size(df_replicas_j_dids):
    """Combines columns to get filled and correct values from join of DIDS and REPLICAS

    Firstly, REPLICAS size value will be used. If there are files with no size values, DIDS size values will be used:
    see 'when' function order. For accessed_at and created_at, their max values will be got.

    Columns: file, rse_id, accessed_at, f_size, created_at

    df_file_rse_ts_size: files and their rse_id, size and access time are completed
    """

    # f_size is not NULL, already verified.
    # df_file_rse_ts_size.filter(col('f_size').isNull()).limit(5).toPandas()
    return df_replicas_j_dids \
        .withColumn('f_size',
                    when(col('f_size_replicas').isNotNull(), col('f_size_replicas'))
                    .when(col('f_size_dids').isNotNull(), col('f_size_dids'))
                    ) \
        .withColumn('accessed_at',
                    greatest(col('dids_accessed_at'), col('rep_accessed_at'))
                    ) \
        .withColumn('created_at',
                    greatest(col('dids_created_at'), col('rep_created_at'))
                    ) \
        .select(['f_name', 'rse_id', 'accessed_at', 'f_size', 'created_at'])

In [87]:

def get_df_dataset_file_rse_ts_size(df_file_rse_ts_size, df_dbs_f_d):
    """ Left join df_file_rse_ts_size and df_dbs_f_d to get dataset names of files.

    In short: adds 'dataset' names to 'df_file_rse_ts_size' dataframe by joining DBS tables

    Columns: block(from df_contents_f_to_b), file, rse_id, accessed_at, f_size
    """
    df_dataset_file_rse_ts_size = df_file_rse_ts_size \
        .join(df_dbs_f_d, ['f_name'], how='left') \
        .fillna("UnknownDatasetNameOfFiles_MonitoringTag", subset=['dataset']) \
        .select(['dataset_id', 'dataset', 'f_name', 'rse_id', 'accessed_at', 'created_at', 'f_size'])

    # f_c = df_dataset_file_rse_ts_size.select('f_name').distinct().count()
    # f_w_no_dataset_c = df_dataset_file_rse_ts_size.filter(col('dataset').isNull()).select('f_name').distinct().count()
    # print('Distinct file count:', f_c)
    # print('Files with no dataset name count:', f_w_no_dataset_c)
    # print('% of null dataset name in all files:', round(f_w_no_dataset_c / f_c, 3) * 100)

    return df_dataset_file_rse_ts_size


def get_df_enr_with_rse_info(df_dataset_file_rse_ts_size, df_rses):
    """Add RSE type, name, kind, tier, country by joining RSE ID"""
    return df_dataset_file_rse_ts_size.join(df_rses, ['rse_id'], how='left') \
        .select(['dataset_id', 'dataset', 'f_name', 'rse_id', 'accessed_at', 'created_at', 'f_size',
                 'RSE', 'RSE_TYPE', 'rse_tier', 'rse_country', 'rse_kind'])

In [88]:
# --------------------------------------------------------------------------------
# Main dataset functions
# --------------------------------------------------------------------------------

def get_df_sub_rse_details(df_enr_with_rse_info):
    """Get dataframe of datasets that are not read since N months for sub details htmls

    Group by 'dataset' and 'rse_id' of get_df_dataset_file_rse_ts_size

    Filters:
        - If a dataset contains EVEN a single file with null accessed_at, filter out

    Access time filter logic:
        - If 'last_access_time_of_dataset_in_all_rses' is less than 'n_months_filter', ...
          ... set 'is_not_read_since_{n_months_filter}_months' column as True

    Columns:
        - 'dataset_size_in_rse_gb'
                Total size of a Dataset in an RSE.
                Produced by summing up datasets' all files in that RSE.
        - 'last_access_time_of_dataset_in_rse'
                Last access time of a Dataset in an RSE.
                Produced by getting max `accessed_at`(represents single file's access time) of a dataset in an RSE.
        - '#files_with_null_access_time_of_dataset_in_rse'
                Number of files count, which have NULL `accessed_at` values, of a Dataset in an RSE.
                This is important to know to filter out if there is any NULL `accessed_at` value of a Dataset.
        - '#files_of_dataset_in_rse'
                Number of files count of a Dataset in an RSE
        - '#distinct_files_of_dataset_in_rse'
                Number of unique files count of dataset in an RSE

    df_main_datasets_and_rses: RSE name, dataset and their size and access time calculations
    """
    # Get RSE ID:NAME map
    # rses_id_name_map = dict(df_pd_rses[['replica_rse_id', 'rse']].values)
    # rses_id_type_map = dict(df_pd_rses[['replica_rse_id', 'rse_type']].values)
    # rses_id_tier_map = dict(df_pd_rses[['replica_rse_id', 'rse_tier']].values)
    # rses_id_country_map = dict(df_pd_rses[['replica_rse_id', 'rse_country']].values)
    # rses_id_kind_map = dict(df_pd_rses[['replica_rse_id', 'rse_kind']].values)
    # .replace(rses_id_name_map, subset=['rse_id']) \
    # , 'rse_tier', 'rse_country', 'rse_kind',
    return df_enr_with_rse_info \
        .groupby(['rse_id', 'dataset']) \
        .agg(_sum(col('f_size')).alias('SizeInRseBytes'),
             _max(col('accessed_at')).alias('LastAccessInRse'),
             _count(lit(1)).alias('FileCnt'),
             _sum(when(col('accessed_at').isNull(), 0).otherwise(1)).alias('AccessedFileCnt'),
             first(col('dataset_id')).alias('dataset_id'),
             first(col('RSE_TYPE')).alias('RseType'),
             first(col('RSE')).alias('RSE'),
             first(col('rse_tier')).alias('rse_tier'),
             first(col('rse_country')).alias('rse_country'),
             first(col('rse_kind')).alias('rse_kind'),
             ) \
        .withColumnRenamed('dataset', 'Dataset') \
        .select(['dataset_id', 'RseType', 'RSE', 'Dataset', 'SizeInRseBytes',
                 'LastAccessInRse', 'FileCnt', 'AccessedFileCnt', ])




In [89]:
def get_df_main_datasets(df_sub_rse_details):
    """Get dataframe of datasets not read since N months for main htmls.

    Get last access of dataframe in all RSE(s)
    """
    # Order of the select is important
    return df_sub_rse_details \
        .groupby(['RseType', 'Dataset']) \
        .agg((_max(col('SizeInRseBytes')) / TB_DENOMINATOR).cast(DecimalType(20, FLT_N_TB_DGTS)).alias('MaxTB'),
             (_min(col('SizeInRseBytes')) / TB_DENOMINATOR).cast(DecimalType(20, FLT_N_TB_DGTS)).alias('MinTB'),
             (_avg(col('SizeInRseBytes')) / TB_DENOMINATOR).cast(DecimalType(20, FLT_N_TB_DGTS)).alias('AvgTB'),
             (_sum(col('SizeInRseBytes')) / TB_DENOMINATOR).cast(DecimalType(20, FLT_N_TB_DGTS)).alias('SumTB'),
             _max(col('LastAccessInRse')).alias('LastAccessMs'),
             concat_ws(', ', collect_list('RSE')).alias('RSEs'),
             first(col('dataset_id')).cast(LongType()).alias('Id'),
             ) \
        .withColumn('LastAccess', from_unixtime(col("LastAccessMs") / 1000, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) \
        .select(['Id', 'RseType', 'Dataset', 'LastAccess', 'LastAccessMs',
                 'MaxTB', 'MinTB', 'AvgTB', 'SumTB',
                 'RSEs'])


In [90]:

def get_df_dbs_agg_blocks(spark, df_replicas):
    dbs_files = spark.read.format('avro').load(HDFS_DBS_FILES) \
        .withColumnRenamed('LOGICAL_FILE_NAME', 'f_dbs_name') \
        .withColumnRenamed('DATASET_ID', 'f_dataset_id') \
        .withColumnRenamed('BLOCK_ID', 'f_block_id') \
        .withColumnRenamed('FILE_SIZE', 'f_dbs_size') \
        .select(['f_dbs_name', 'f_dataset_id', 'f_block_id', 'f_dbs_size'])

    df_files = df_replicas.join(dbs_files, df_replicas.f_name == dbs_files.f_dbs_name, how='left') \
        .select(['f_name', 'f_size_replicas', 'f_dataset_id', 'f_block_id'])

    dbs_blocks = spark.read.format('avro').load(HDFS_DBS_BLOCKS) \
        .withColumnRenamed('DATASET_ID', 'b_dataset_id') \
        .withColumnRenamed('BLOCK_ID', 'b_block_id') \
        .withColumnRenamed('BLOCK_NAME', 'b_name') \
        .withColumnRenamed('FILE_COUNT', 'b_file_cnt') \
        .select(['b_name', 'b_block_id', 'b_dataset_id', 'b_file_cnt'])

    df_dbs_b_agg = df_files.join(dbs_blocks, df_files.f_block_id == dbs_blocks.b_block_id, how='left') \
        .groupby('b_block_id') \
        .agg(_sum('f_size_replicas').alias('b_size'),
             first('b_file_cnt').alias('b_file_cnt'),
             first('b_name').alias('b_name'),
             first('b_dataset_id').alias('b_dataset_id'),
             ) \
        .select(['b_block_id', 'b_name', 'b_dataset_id', 'b_size', 'b_file_cnt'])

    return df_dbs_b_agg

In [91]:

def get_df_dbs_main_ds_size(spark):
    dbs_files = spark.read.format('avro').load(HDFS_DBS_FILES) \
        .withColumnRenamed('LOGICAL_FILE_NAME', 'f_name') \
        .withColumnRenamed('DATASET_ID', 'f_dataset_id') \
        .withColumnRenamed('BLOCK_ID', 'f_block_id') \
        .withColumnRenamed('FILE_SIZE', 'f_dbs_size') \
        .select(['f_name', 'f_dataset_id', 'f_block_id', 'f_dbs_size'])
    dbs_datasets = spark.read.format('avro').load(HDFS_DBS_DATASETS) \
        .withColumnRenamed('DATASET_ID', 'd_dataset_id') \
        .withColumnRenamed('DATASET', 'd_dataset') \
        .select(['d_dataset_id', 'd_dataset'])
    df_dbs_d_agg = dbs_files.join(dbs_datasets, dbs_files.f_dataset_id == dbs_datasets.d_dataset_id, how='left') \
        .withColumnRenamed('f_dataset_id', 'dataset_id') \
        .groupby('dataset_id') \
        .agg(_sum('f_dbs_size').alias('d_dbs_size'),
             first('d_dataset').alias('d_name')
             ) \
        .select(['dataset_id', 'd_name', 'd_dbs_size'])
    return df_dbs_d_agg


In [92]:
# # def main(hdfs_out_dir):
# #     """Main function that run Spark dataframe creations and save results to HDFS directory as JSON lines
# #     """

# #     # HDFS output file format. If you change, please modify bin/cron4rucio_ds_mongo.sh accordingly.
# #     write_format = 'json'
# #     write_mode = 'overwrite'

# #     spark = get_spark_session()
# #     # Set TZ as UTC. Also set in the spark-submit confs.
# #     spark.conf.set("spark.sql.session.timeZone", "UTC")

#     # The reason that we have lots of functions that returns PySpark dataframes is mainly for code readability.
# df_rses = get_df_rses(spark)
# df_dbs_f_d = get_df_dbs_f_d(spark)
# df_replicas = get_df_replicas(spark)
# df_dids_files = get_df_dids_files(spark)
# df_replicas_j_dids = get_df_replicas_j_dids(df_replicas, df_dids_files)
# df_file_rse_ts_size = get_df_file_rse_ts_size(df_replicas_j_dids)
# df_dataset_file_rse_ts_size = get_df_dataset_file_rse_ts_size(df_file_rse_ts_size, df_dbs_f_d)
# df_enr_with_rse_info = get_df_enr_with_rse_info(df_dataset_file_rse_ts_size, df_rses)
# df_sub_rse_details = get_df_sub_rse_details(df_enr_with_rse_info)
# df_main_datasets = get_df_main_datasets(df_sub_rse_details)

# #     df_main_datasets.write.save(path=hdfs_out_dir, format=write_format, mode=write_mode)


In [93]:
# df_rses.head()

In [94]:
# df_main_datasets.limit(5).toPandas()

In [95]:
# df_sub_rse_details.columns

In [96]:
# df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "rchauhan").option("collection", "new_collection").load()

In [97]:
# df.show()