Skip to content
This repository has been archived by the owner on Oct 4, 2023. It is now read-only.

Commit

Permalink
Input filename conflicts (#590)
Browse files Browse the repository at this point in the history
* add index to conflicting filenames

* Fix call to task.is_leaf

* correct TaskAttemptInputs import in taskmonitor

* correct spelling error

* fix taskmonitor inputs error
  • Loading branch information
nhammond committed Jun 12, 2019
1 parent 4e93d30 commit bc45bce
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 261 deletions.
108 changes: 7 additions & 101 deletions server/loomengine_server/api/models/__init__.py
Expand Up @@ -5,9 +5,11 @@
import uuid
import loomengine_utils.md5calc


def uuidstr():
return str(uuid.uuid4())


def render_from_template(raw_text, context):
if not raw_text:
return ''
Expand All @@ -16,13 +18,15 @@ def render_from_template(raw_text, context):
template = env.get_template('template')
return template.render(**context)


def render_string_or_list(value, context):
if isinstance(value, list):
return [render_from_template(member, context)
for member in value]
else:
return render_from_template(value, context)


def calculate_contents_fingerprint(contents):
if isinstance(contents, dict):
# Sort keys
Expand All @@ -41,6 +45,7 @@ def calculate_contents_fingerprint(contents):
contents_string = str(contents)
return hashlib.md5(contents_string).hexdigest()


def flatten_nodes(node, children_fieldname, node_list=None):
# Converts a tree to a flat list of nodes
# Returns new list of nodes or appends to existing node_list
Expand All @@ -51,6 +56,7 @@ def flatten_nodes(node, children_fieldname, node_list=None):
flatten_nodes(child, children_fieldname, node_list)
return node_list


def copy_prefetch(
source_nodes, dest_nodes,
child_field=None, one_to_x_fields=None):
Expand Down Expand Up @@ -78,107 +84,6 @@ def copy_prefetch(
one_to_x_fields=one_to_x_fields)


class ArrayInputContext(object):
"""This class is used with jinja templates to make the
default representation of an array a space-delimited list.
"""

def __init__(self, items, type):
if type == 'file':
self.items = self._rename_duplicates(items)
else:
self.items = items

def _rename_duplicates(self, filenames):

# Identify filenames that are unique
seen = set()
duplicates = set()
for filename in filenames:
if filename in seen:
duplicates.add(filename)
seen.add(filename)

new_filenames = []
filename_counts = {}
for filename in filenames:
if filename in duplicates:
counter = filename_counts.setdefault(filename, 0)
filename_counts[filename] += 1
filename = self._add_counter_suffix(filename, counter)
new_filenames.append(filename)
return new_filenames

def _add_counter_suffix(self, filename, count):
# Add suffix while preserving file extension:
# myfile -> myfile.__1__
# myfile.txt --> myfile__1__.txt
# my.file.txt --> my.file__1__.txt
parts = filename.split('.')
assert len(parts) > 0, 'missing filename'
if len(parts) == 1:
return parts[0] + '__%s__' % count
else:
return '.'.join(parts[0:len(parts)-1]) + '__%s__.' % count + parts[-1]

def __iter__(self):
return self.items.__iter__()

def __getitem__(self, i):
return self.items[i]

def __str__(self):
return ' '.join([str(item) for item in self.items])

class DummyContext(str):
"""This class is used to create dummy context values used to validate
jinja templates during Template validation, before actual context values
are known. It acts as both a string and a list and attempts to avoid
raising any errors for usage that could be valid for some
particular string or list.
"""

def __init__(self, *args, **kwargs):
super(DummyContext, self).__init__(self, *args, **kwargs)
string = args[0]
self.items = [letter for letter in string]

def __iter__(self, *args, **kwargs):
return self.items.__iter__(*args, **kwargs)

def __len__(self,*args,**kwargs):
return self.items.__len__(*args, **kwargs)

def __getitem__(self, i):
return 'x'

def append(self, *args, **kwargs):
return self.items.append(*args, **kwargs)

def count(self, *args, **kwargs):
return self.items.count(*args, **kwargs)

def extend(self, *args, **kwargs):
return self.items.extend(*args, **kwargs)

def index(self, *args, **kwargs):
return self.items.index(*args, **kwargs)

def insert(self, *args, **kwargs):
return self.items.insert(*args, **kwargs)

def pop(self, *args, **kwargs):
return self.items.pop(*args, **kwargs)

def remove(self, *args, **kwargs):
return self.items.remove(*args, **kwargs)

def reverse(self, *args, **kwargs):
return self.items.reverse(*args, **kwargs)

def sort(self, *args, **kwargs):
return self.items.sort(*args, **kwargs)

class positiveIntegerDefaultDict(defaultdict):
def __getitem__(self, i):
if not int(i) == i:
Expand All @@ -190,6 +95,7 @@ def __getitem__(self, i):
'Invalid value "%s"' % i)
return super(positiveIntegerDefaultDict, self).__getitem__(i)


from .data_objects import *
from .data_nodes import *
from .labels import *
Expand Down
2 changes: 1 addition & 1 deletion server/loomengine_server/api/models/data_nodes.py
Expand Up @@ -220,7 +220,7 @@ def _get_all_paths(self, seed_path, gather_depth):
last_paths = new_paths
# Convert to set to remove duplicates created by gathering
return paths

def is_ready(self, data_path=None):
# True if all data at or below the given index is ready.
if data_path is not None:
Expand Down
52 changes: 0 additions & 52 deletions server/loomengine_server/api/models/task_attempts.py
Expand Up @@ -357,58 +357,6 @@ class TaskMembership(BaseModel):
child_task_attempt = models.ForeignKey('TaskAttempt', on_delete=models.CASCADE)


class ArrayInputContext(object):
"""This class is used with jinja templates to make the
default representation of an array a space-delimited list.
"""

def __init__(self, items, type):
if type == 'file':
self.items = self._rename_duplicates(items)
else:
self.items = items

def _rename_duplicates(self, filenames):

# Identify filenames that are unique
seen = set()
duplicates = set()
for filename in filenames:
if filename in seen:
duplicates.add(filename)
seen.add(filename)

new_filenames = []
filename_counts = {}
for filename in filenames:
if filename in duplicates:
counter = filename_counts.setdefault(filename, 0)
filename_counts[filename] += 1
filename = self._add_counter_suffix(filename, counter)
new_filenames.append(filename)
return new_filenames

def _add_counter_suffix(self, filename, count):
# Add suffix while preserving file extension:
# myfile -> myfile.__1__
# myfile.txt --> myfile__1__.txt
# my.file.txt --> my.file__1__.txt
parts = filename.split('.')
assert len(parts) > 0, 'missing filename'
if len(parts) == 1:
return parts[0] + '(%s)' % count
else:
return '.'.join(parts[0:len(parts)-1]) + '__%s__.' % count + parts[-1]

def __iter__(self):
return self.items.iter()

def __getitem__(self, i):
return self.items[i]

def __str__(self):
return ' '.join([str(item) for item in self.items])

# To run on new thread
def _run_execute_task_attempt_playbook(task_attempt):
from django.contrib.auth.models import User
Expand Down
108 changes: 94 additions & 14 deletions server/loomengine_server/api/models/tasks.py
Expand Up @@ -5,7 +5,7 @@
import jsonfield
import time

from . import render_from_template, render_string_or_list, ArrayInputContext, \
from . import render_from_template, render_string_or_list, \
calculate_contents_fingerprint, positiveIntegerDefaultDict
from .base import BaseModel
from .data_channels import DataChannel
Expand Down Expand Up @@ -345,11 +345,16 @@ def add_to_all_task_attempts(self, task_attempt):
from api.models.task_attempts import TaskMembership
tm = TaskMembership(parent_task=self, child_task_attempt=task_attempt)
tm.save()

def get_input_context(self, inputs=None, data_path=None):
context = {}
if inputs is None:
inputs = self.inputs.all()
# sort inputs by channel name (or as_channel)
inputs = [i for i in inputs]
inputs.sort(key=lambda i: i.get_internal_channel())
duplicate_filename_counters = \
self._get_duplicate_filename_counters(inputs)
if data_path is None:
data_path = self.data_path
# For valid dimesions (integer > 0) where path is not set,
Expand All @@ -363,20 +368,32 @@ def get_input_context(self, inputs=None, data_path=None):
context['index'] = index
context['size'] = size
for input in inputs:
if input.as_channel:
channel = input.as_channel
else:
channel = input.channel
if input.data_node.is_leaf:
context[channel] = input.data_node\
.substitution_value
else:
context[channel] = ArrayInputContext(
input.data_node.substitution_value,
input.type
)
context[input.get_internal_channel()] \
= InputContext(input, duplicate_filename_counters)
return context

def _get_duplicate_filename_counters(self, inputs):
counters = {}
all_filenames = []
for input in inputs:
if input.type == 'file':
if input.data_node.is_leaf:
all_filenames.append(input.data_node.substitution_value)
else:
all_filenames.extend(input.data_node.substitution_value)
for duplicate_filename in self._get_duplicates(all_filenames):
counters[duplicate_filename] = 0
return counters

def _get_duplicates(self, array):
seen = set()
duplicates = set()
for member in array:
if member in seen:
duplicates.add(member)
seen.add(member)
return duplicates

def get_output_context(self, input_context, outputs=None):
# This returns a value only for Files, where the filename
# is known beforehand and may be used in the command.
Expand Down Expand Up @@ -489,6 +506,12 @@ class TaskInput(DataChannel):
mode = models.CharField(max_length=255)
as_channel = models.CharField(max_length=255, null=True, blank=True)

def get_internal_channel(self):
if self.as_channel:
return self.as_channel
else:
return self.channel

def get_fingerprintable_contents(self):
return {
'mode': self.mode,
Expand Down Expand Up @@ -533,6 +556,7 @@ class TaskEvent(BaseModel):
detail = models.TextField(blank=True)
is_error = models.BooleanField(default=False)


class TaskFingerprint(BaseModel):
value = models.CharField(max_length=255, unique=True)
active_task_attempt = models.OneToOneField(
Expand Down Expand Up @@ -567,3 +591,59 @@ def update_task_attempt_maybe(self, task_attempt):
self.setattrs_and_save_with_retries(
{'active_task_attempt': task_attempt}
)

def _rename_duplicate(filename, counter):
parts = filename.split('.')
assert len(parts) > 0, 'missing filename'
if len(parts) == 1:
return parts[0] + '__%s__' % counter
else:
return '.'.join(
parts[0:len(parts)-1]) + '__%s__.' % counter + parts[-1]

def _index_duplicate_filenames(filename, duplicate_filename_counters):
# Increment filenames if there are duplicates,
# e.g. file__0__.txt, file__1__.txt, file__2__.txt
if filename in duplicate_filename_counters:
counter = duplicate_filename_counters[filename]
duplicate_filename_counters[filename] += 1
filename = _rename_duplicate(filename, counter)
return filename


def InputContext(input, duplicate_filename_counters):
"""This generator returns a string for inputs with scalar data
or an ArrayInputContext for inputs with array data."""
if input.data_node.is_leaf:
if input.type == 'file':
return _index_duplicate_filenames(
input.data_node.substitution_value,
duplicate_filename_counters)
else:
return input.data_node.substitution_value
else:
return ArrayInputContext(input.data_node.substitution_value,
input.type, duplicate_filename_counters)


class ArrayInputContext(object):
"""This class is used with jinja templates to make the
default representation of an array a space-delimited list.
"""

def __init__(self, items, type, duplicate_filename_counters):
if type == 'file':
self.items = [_index_duplicate_filenames(
filename, duplicate_filename_counters)
for filename in items]
else:
self.items = items

def __iter__(self):
return self.items.__iter__()

def __getitem__(self, i):
return self.items[i]

def __str__(self):
return ' '.join([str(item) for item in self.items])

0 comments on commit bc45bce

Please sign in to comment.