In [1]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

from datetime import datetime, timedelta

In [2]:
TODAY = (datetime.today() - timedelta(days=0)).strftime('%Y-%m-%d')

HDFS_RUCIO_REPLICAS = f'/project/awg/cms/rucio/{TODAY}/replicas/part*.avro'
HDFS_RUCIO_DIDS = f'/project/awg/cms/rucio/{TODAY}/dids/part*.avro'

INT_MAX = 2147483647000

In [3]:
def get_df_replicas(spark):
    return spark.read.format('avro').load(HDFS_RUCIO_REPLICAS) \
        .filter(F.col('scope') == 'cms') \
        .withColumnRenamed('NAME', 'file_name') \
        .withColumnRenamed('ACCESSED_AT', 'rep_accessed_at') \
        .groupby("file_name").agg(F.max("rep_accessed_at").alias("max_rep_accessed_at"))\
        .select(['file_name', 'max_rep_accessed_at'])


def get_df_dids_files(spark):
    return spark.read.format('avro').load(HDFS_RUCIO_DIDS) \
        .filter(F.col('DELETED_AT').isNull()) \
        .filter(F.col('HIDDEN') == '0') \
        .filter(F.col('SCOPE') == 'cms') \
        .filter(F.col('DID_TYPE') == 'F') \
        .withColumnRenamed('NAME', 'file_name') \
        .withColumnRenamed('ACCESSED_AT', 'dids_accessed_at') \
        .select(['file_name', 'dids_accessed_at'])



In [4]:
df_replicas = get_df_replicas(spark)
df_dids = get_df_dids_files(spark)

In [5]:
#intmax timestamps in replicas and dids table
dids_intmax = df_dids.filter(F.col("dids_accessed_at")==INT_MAX).count()
replicas_intmax = df_replicas.filter(F.col("max_rep_accessed_at")==INT_MAX).count()

print(f"Invalid dates (set to INTMAX) in DIDs table: {dids_intmax}")
print(f"Invalid dates (set to INTMAX) in Replicas table: {replicas_intmax}")

Invalid dates (set to INTMAX) in DIDs table: 186
Invalid dates (set to INTMAX) in Replicas table: 476


In [6]:
df_accessed_at = df_replicas.join(df_dids, ["file_name"], "inner")\
                    .na.fill(0).withColumn("accessed_at_comparison", 
                    F.when((F.col("dids_accessed_at")+F.col("max_rep_accessed_at")==0), "Both None")\
                     .when(F.col("dids_accessed_at")<F.col("max_rep_accessed_at"), "dids_accessed_at SMALLER THAN max_rep_accessed_at")\
                     .when(F.col("dids_accessed_at")>F.col("max_rep_accessed_at"), "dids_accessed_at GREATER THAN max_rep_accessed_at")\
                     .otherwise("Both Equal")
                )

In [7]:
df_summary = df_accessed_at.groupby("accessed_at_comparison").count()

In [8]:
df_summary.show(10, False)

23/02/12 10:54:28 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 12 for reason Container from a bad node: container_e159_1675935906795_4528_01_000013 on host: ithdp3104.cern.ch. Exit status: 137. Diagnostics: [2023-02-12 10:54:28.140]Container killed on request. Exit code is 137
[2023-02-12 10:54:28.194]Container exited with a non-zero exit code 137. 
[2023-02-12 10:54:28.195]Killed by external signal
.
23/02/12 10:54:28 ERROR YarnScheduler: Lost executor 12 on ithdp3104.cern.ch: Container from a bad node: container_e159_1675935906795_4528_01_000013 on host: ithdp3104.cern.ch. Exit status: 137. Diagnostics: [2023-02-12 10:54:28.140]Container killed on request. Exit code is 137
[2023-02-12 10:54:28.194]Container exited with a non-zero exit code 137. 
[2023-02-12 10:54:28.195]Killed by external signal
.
23/02/12 10:54:28 WARN TaskSetManager: Lost task 73.0 in stage 15.0 (TID 633) (ithdp3104.cern.ch executor 12): ExecutorLostFailure (executor 12 exit