Skip to content

Commit

Permalink
Merge pull request #628 from NDCMS/fix-multiproductiondataset-for-andrew
Browse files Browse the repository at this point in the history
Attempt to Fix Bug Reported by Andrew
  • Loading branch information
Andrew42 committed Mar 13, 2018
2 parents 1b616f7 + 21b493e commit 7d9e955
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 16 deletions.
24 changes: 10 additions & 14 deletions lobster/core/data/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def compare(stat, file):
return True
else:
errorMsg = 'size mismatch after transfer\n'
errorMsg += ' remote size: {0}\n'.format(match.groups()[0])
errorMsg += ' remote size: {0}\n'.format(match.groups()[0])
errorMsg += ' local size: {0}\n'.format(size)
raise RuntimeError(errorMsg)
else:
Expand Down Expand Up @@ -362,10 +362,10 @@ def compare(stat, file):
"hdfs",
"dfs",
"-fs",
'hdfs://'+server,
'hdfs://' + server,
"-stat",
'"Size: %b"',
os.path.join('/',path, remotename)
os.path.join('/', path, remotename)
]
p = run_subprocess(args, capture=True)
try:
Expand All @@ -375,7 +375,7 @@ def compare(stat, file):
elif output.startswith('srm://') or output.startswith('gsiftp://'):
if len(os.environ["LOBSTER_GFAL_COPY"]) > 0:
# FIXME gfal is very picky about its environment
prg = [os.environ["LOBSTER_GFAL_COPY"].replace('copy','stat')]
prg = [os.environ["LOBSTER_GFAL_COPY"].replace('copy', 'stat')]
args = prg + [
os.path.join(output, remotename),
]
Expand All @@ -390,8 +390,6 @@ def compare(stat, file):

else:
logger.info('Skipping gfal-based file check because no gfal executable defined in wrapper.')



# If we get here, we tried all of the other methods and never returned a True, so return false
return False
Expand All @@ -401,7 +399,7 @@ def compare(stat, file):
def check_outputs(data, config):
logger.info('Checking output files...')
for local, remote in config['output files']:
logger.info(' Checking {0} => {1}'.format(local,remote))
logger.info(' Checking {0} => {1}'.format(local, remote))
if not check_output(config, local, remote):
raise IOError("could not verify output file '{}'".format(remote))

Expand Down Expand Up @@ -597,8 +595,8 @@ def copy_inputs(data, config, env):
elif input.startswith("hdfs://"):
logger.info("Trying hdfs client access method")
server, path = re.match("hdfs://([a-zA-Z0-9:.\-]+)/(.*)", input).groups()
server = "hdfs://"+server
remotename = os.path.join('/',path, file)
server = "hdfs://" + server
remotename = os.path.join('/', path, file)

timeout = '300' # Just to be safe, have a timeout
args = [
Expand All @@ -610,8 +608,7 @@ def copy_inputs(data, config, env):
server,
"-get",
remotename,
os.path.basename(file)
]
os.path.basename(file)]
p = run_subprocess(args, env=env)
if p.returncode == 0:
logger.info('Successfully copied input with hdfs client')
Expand Down Expand Up @@ -769,7 +766,7 @@ def copy_outputs(data, config, env):
data['transfers']['chirp']['stageout failure'] += 1
elif output.startswith("hdfs://"):
server, path = re.match("hdfs://([a-zA-Z0-9:.\-]+)/(.*)", output).groups()
server = "hdfs://"+server
server = "hdfs://" + server

timeout = '300' # Just to be safe, have a timeout
args = [
Expand All @@ -781,8 +778,7 @@ def copy_outputs(data, config, env):
server,
"-put",
localname,
os.path.join('/',path, remotename),
]
os.path.join('/', path, remotename)]

p = run_subprocess(args, env=env)
logger.info('Checking output file transfer.')
Expand Down
6 changes: 4 additions & 2 deletions lobster/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def __init__(self, gridpacks, events_per_gridpack, events_per_lumi=500, lumis_pe
self.randomize_seeds = randomize_seeds

self.lumis_per_gridpack = int(math.ceil(float(events_per_gridpack) / events_per_lumi))
self.total_units = len(flatten(self.gridpacks)) * self.lumis_per_gridpack
self.total_untits = 0

def validate(self):
return len(flatten(self.gridpacks)) > 0
Expand All @@ -235,9 +235,11 @@ def get_info(self):
dset = DatasetInfo()
dset.file_based = True

for run, fn in enumerate(flatten(self.gridpacks)):
files = flatten(self.gridpacks)
for run, fn in enumerate(files):
dset.files[fn].lumis = [(run, x) for x in range(1, self.lumis_per_gridpack + 1)]

self.total_units = len(files) * self.lumis_per_gridpack
dset.total_units = self.total_units
dset.tasksize = self.lumis_per_task
dset.stop_on_file_boundary = True # CMSSW can only handle one gridpack at a time
Expand Down

0 comments on commit 7d9e955

Please sign in to comment.