In [1]:
#!/usr/bin/env spark-submit
from __future__ import print_function
import argparse

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, coalesce, lit, collect_list
import pyspark.sql.functions as fn
import pyspark.sql.types as types
import dask.dataframe as dd

# stolen from CMSSpark
import schemas

  import pandas.util.testing as tm


In [2]:
schema = types.StructType(
        [
            types.StructField(
                "data",
                types.StructType(
                    [
                        types.StructField("file_lfn", types.StringType(), True),
                        types.StructField("end_time", types.LongType(), True),
                        types.StructField("app_info", types.StringType(), True),
                        types.StructField("site_name", types.StringType(), True),
                        types.StructField("server_domain", types.StringType(), True),
                        types.StructField("fallback", types.StringType(), True),
                    ]
                ),
                False,
            ),
        ]
    )


In [54]:
df = (
        spark.read.schema(schema)
        .json("/project/monitoring/archive/cmssw_pop/raw/metric/%s/*.json.gz" % "2021/*/*")
        .select("data.*")
    )

In [55]:
df = (
        df.filter(df.file_lfn.startswith("/store/"))
        .withColumn("is_crab", df.app_info.contains(":crab:"))
        .drop("app_info")
        .withColumnRenamed("end_time", "timestamp")
    )

In [5]:
df.show()

+--------------------+----------+----------------+---------------+--------+-------+
|            file_lfn| timestamp|       site_name|  server_domain|fallback|is_crab|
+--------------------+----------+----------------+---------------+--------+-------+
|/store/data/Run20...|1617242404|      T2_UA_KIPT|kipt.kharkov.ua|    null|   true|
|/store/data/Run20...|1617242404|      T2_DE_DESY|        unknown|    null|   true|
|/store/unmerged/R...|1617242404| T1_US_FNAL_Disk|       fnal.gov|    null|   null|
|/store/unmerged/H...|1617242404|T2_US_Vanderbilt|        unknown|    null|   null|
|/store/data/Run20...|1617242404| T2_US_Wisconsin|   hep.wisc.edu|    null|   true|
|/store/data/Run20...|1617242404|      T2_DE_DESY|        unknown|    null|   true|
|/store/data/Run20...|1617242405|    T2_US_Purdue|        unknown|    null|   true|
|/store/unmerged/H...|1617242405|T2_US_Vanderbilt|        unknown|    null|   null|
|/store/data/Run20...|1617242405| T2_UK_London_IC|        unknown|    null| 

In [6]:
for col in df.dtypes:
    print(col[0]+" , "+col[1])

file_lfn , string
timestamp , bigint
site_name , string
server_domain , string
fallback , string
is_crab , boolean


In [42]:
df_dom = (
        spark.read
        .json("/user/chmcgrad/domain_map/*.json")
)

In [56]:
df = (
        df.filter(df.file_lfn.startswith("/store/"))
        .join(df_dom, (df.server_domain == df_dom.domain), how = 'left')
        .withColumn("is_local", 
                    when((df.server_domain == 'unknown'), True)
                    .when((df.server_domain == "in2p3.fr") 
                          & df.site_name.isin("T1_FR_CCIN2P3_Disk", "T2_FR_IPHC", "T2_FR_GRIF_LLR", "T2_FR_CCIN2P3"), True)
                    .when((df.server_domain == "in2p3.fr") 
                          & ~df.site_name.isin("T1_FR_CCIN2P3_Disk", "T2_FR_IPHC", "T2_FR_GRIF_LLR", "T2_FR_CCIN2P3"), False)
                    .when(df.site_name == df_dom.site, True)
                    .when(df.fallback == 'true', False)
                    .otherwise(None))
        .drop("server_domain")
        .drop("fallback")
        .drop("domain")
        .drop("site")
    )

In [57]:
df.show()

+--------------------+----------+-----------------+-------+--------+
|            file_lfn| timestamp|        site_name|is_crab|is_local|
+--------------------+----------+-----------------+-------+--------+
|/store/mc/RunIIAu...|1612453895|    T2_US_Florida|   true|   false|
|/store/mc/RunIISu...|1612453896|       T2_DE_DESY|   true|    true|
|/store/mc/RunIIAu...|1612453896|  T2_US_Wisconsin|   true|    true|
|/store/unmerged/R...|1612453896|  T1_US_FNAL_Disk|   null|    true|
|/store/data/Run20...|1612453897|  T2_US_Wisconsin|   true|   false|
|/store/data/Run20...|1612453897|    T2_US_Caltech|   true|    null|
|/store/unmerged/R...|1612453897|        T2_US_MIT|   null|    true|
|/store/mc/RunIIAu...|1612453899| T2_US_Vanderbilt|   true|   false|
|/store/mc/RunIIFa...|1612453899|  T1_US_FNAL_Disk|   true|    true|
|/store/group/phys...|1612453899|       T2_CH_CERN|   true|    true|
|/store/mc/RunIIAu...|1612453900|   T2_US_Nebraska|   true|   false|
|/store/data/Run20...|1612453900| 

In [61]:
df.filter((df.is_local != True) & (df.is_local != False)).show()

+--------+---------+---------+-------+--------+
|file_lfn|timestamp|site_name|is_crab|is_local|
+--------+---------+---------+-------+--------+
+--------+---------+---------+-------+--------+



In [None]:
df.columns[:]

In [None]:
domains_test = df_dom.select(collect_list('server_domain')).first()[0] 
sites_test = df_dom.select(collect_list('site_name')).first()[0] 

domains_main = df.select("server_domain").rdd.flatMap(lambda x: x).collect()
sites_main = df.select("site_name").rdd.flatMap(lambda x: x).collect()


In [None]:
truth = []
for i, dom1 in enumerate(domains_main):
    if dom1 == 'unknown':
        truth.append(False)
    else:
        for j, dom2 in enumerate(domains_test):
            if dom1 == dom2:
#            print("index i = ", i)
#            print("index j = ", j)
                if sites_test[j] == sites_main[i]: 
#                print(sites_test[j] + " =?", sites_main[i])
                    truth.append(True)
                else:
                    truth.append(False)
        

In [None]:
print(len(sites_main))
print(len(truth))
truth

In [None]:
df.show()

In [43]:
df_dom.show()

+--------------------+-----------------+
|              domain|             site|
+--------------------+-----------------+
|         m45.ihep.su|       T2_RU_IHEP|
|       ncg.ingrid.pt| T2_PT_NCG_Lisbon|
|      physics.uoi.gr|   T2_GR_Ioannina|
|physik.rwth-aache...|       T2_DE_RWTH|
|          pi.infn.it|       T2_IT_Pisa|
|              pic.es|   T1_ES_PIC_Disk|
|         pp.rl.ac.uk|T2_UK_SGrid_RALPP|
|              psi.ch|        T3_CH_PSI|
|          rc.ufl.edu|    T2_US_Florida|
|     rcac.purdue.edu|     T2_US_Purdue|
|    recas.ba.infn.it|       T2_IT_Bari|
|       roma1.infn.it|       T2_IT_Rome|
|         rutgers.edu|    T3_US_Rutgers|
|           sdfarm.kr|      T2_KR_KISTI|
|       sprace.org.br|     T2_BR_SPRACE|
|         t2.ucsd.edu|       T2_US_UCSD|
|      ultralight.org|    T2_US_Caltech|
|             unl.edu|   T2_US_Nebraska|
|     datagrid.cea.fr|  T2_FR_GRIF_IRFU|
|             desy.de|       T2_DE_DESY|
+--------------------+-----------------+
only showing top

In [None]:
domains_main