In [1]:
"""Retrieve classAds job information using Spark"""

from __future__ import print_function

from subprocess import Popen, PIPE, STDOUT
from pyspark.sql import Column
from pyspark.sql.functions import col
import pyspark.sql.functions as fn
import pyspark.sql.types as types
import schemas

In [2]:
csvreader = spark.read.format("com.databricks.spark.csv").option("nullValue","null").option("mode", "FAILFAST")

# Path where the input files are
basepath="/project/awg/cms/CMS_DBS3_PROD_GLOBAL/current"

# DBS tables
dbs_files = csvreader.schema(schemas.schema_files()).load(basepath+"/FILES")
dbs_blocks = csvreader.schema(schemas.schema_blocks()).load(basepath+"/BLOCKS")
dbs_datasets = csvreader.schema(schemas.schema_datasets()).load(basepath+"/DATASETS")

In [3]:
# Get job reports
inputfile = "/cms/wmarchive/avro/crab/2019/06/02/*.avro"
jobreports = spark.read.format("avro").load(inputfile)

In [4]:
# jobreports.printSchema()

In [5]:
# The following regexps describe what is in the cache
regexp1="/*/Run2016.*-03Feb2017.*/MINIAOD"
regexp2="/*/RunIISummer16MiniAODv2-PUMoriond17_80X_.*/MINIAODSIM"
regexp3="/*/.*-31Mar2018.*/MINIAOD"
regexp4="/*/.*RunIIFall17MiniAODv2.*/MINIAODSIM"

# Get dataset
ds = (jobreports
        .select(fn.explode('LFNArray').alias("lfn"),"*")
        .join(dbs_files, col('f_logical_file_name')==col('lfn'))
        .join(dbs_blocks, col('f_block_id')==col('b_block_id'))
        .join(dbs_datasets, col('d_dataset_id')==col('b_dataset_id'))
        .filter(
                col('d_dataset').rlike(regexp1) |
                col('d_dataset').rlike(regexp2) | 
                col('d_dataset').rlike(regexp3) | 
                col('d_dataset').rlike(regexp4)
               )
        # Select columns to save
        .select(
               col('task').alias('workflow_id'),
               col('meta_data.crab_id').alias('crab_id'),
               col('d_dataset').alias('dataset')
               )
    )

In [6]:
# ds.show(10, False)

In [7]:
(ds.write
   .option("compression","gzip")
   .mode("overwrite")
   .parquet("hdfs://analytix/user/jguiang/shared/WMStats_06-02-2019")
)

In [8]:
# Move from hdfs to eos
p = Popen("hdfs dfs -get /user/jguiang/shared/WMStats_06-02-2019 /eos/user/j/jguiang/data-access/parquet/",
          shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)

print (p.stdout.read())


