Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 187 lines (167 sloc) 5.882 kB
57699f2 @chapmanb Use generalized map framework in automated workflow; allow missing pi…
authored
1 """Helpful utilities for building analysis pipelines.
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
2 """
3 import os
4 import tempfile
5 import shutil
6 import contextlib
7 import itertools
8 import functools
54a4ef0 @chapmanb Initial framework for supporting distributed tasks though Celery mess…
authored
9 import ConfigParser
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
10 try:
11 import multiprocessing
12 from multiprocessing.pool import IMapIterator
13 except ImportError:
14 multiprocessing = None
15
fb4378e @chapmanb Utility function for providing more specific configurations on top of…
authored
16 import yaml
17
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
18 @contextlib.contextmanager
83088d7 @chapmanb Pull out ipython code which doesn't work in favor of upcoming message…
authored
19 def cpmap(cores=1):
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
20 """Configurable parallel map context manager.
21
22 Returns appropriate map compatible function based on configuration:
23 - Local single core (the default)
24 - Multiple local cores
25 """
83088d7 @chapmanb Pull out ipython code which doesn't work in favor of upcoming message…
authored
26 if int(cores) == 1:
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
27 yield itertools.imap
28 else:
29 if multiprocessing is None:
30 raise ImportError("multiprocessing not available")
31 # Fix to allow keyboard interrupts in multiprocessing: https://gist.github.com/626518
32 def wrapper(func):
33 def wrap(self, timeout=None):
34 return func(self, timeout=timeout if timeout is not None else 1e100)
35 return wrap
36 IMapIterator.next = wrapper(IMapIterator.next)
92bfec4 @chapmanb Improve multiprocessor utilization of cores by recycling
authored
37 # recycle threads on Python 2.7; remain compatible with Python 2.6
38 try:
39 pool = multiprocessing.Pool(int(cores), maxtasksperchild=5)
40 except TypeError:
41 pool = multiprocessing.Pool(int(cores))
9053b74 @chapmanb Fixes for diskspace saving and parallelization: try unordered imap fo…
authored
42 yield pool.imap_unordered
57699f2 @chapmanb Use generalized map framework in automated workflow; allow missing pi…
authored
43 pool.terminate()
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
44
45 def map_wrap(f):
46 """Wrap standard function to easily pass into 'map' processing.
47 """
48 @functools.wraps(f)
49 def wrapper(*args, **kwargs):
50 return apply(f, *args, **kwargs)
51 return wrapper
52
7e54fb7 @chapmanb Decorator to handle checking for presence of output file
authored
53 def memoize_outfile(ext):
54 """Creates outfile from input file and ext, running if outfile not present.
55
56 This requires a standard function usage. The first arg, or kwarg 'in_file', needs
57 to be the input file that is being processed. The output name is created with the
58 provided ext relative to the input. The function is only run if the created
59 out_file is not present.
60 """
61 def decor(f):
62 @functools.wraps(f)
63 def wrapper(*args, **kwargs):
64 if len(args) > 0:
65 in_file = args[0]
66 else:
67 in_file = kwargs['in_file']
68 out_file = "%s%s" % (os.path.splitext(in_file)[0], ext)
69 if not os.path.exists(out_file) or os.path.getsize(out_file) == 0:
70 kwargs['out_file'] = out_file
71 f(*args, **kwargs)
72 return out_file
73 return wrapper
74 return decor
75
98fee6e @chapmanb Avoid race conditions when creating directories concurrently
authored
76 def safe_makedir(dname):
77 """Make a directory if it doesn't exist, handling concurrent race conditions.
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
78 """
98fee6e @chapmanb Avoid race conditions when creating directories concurrently
authored
79 if not os.path.exists(dname):
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
80 # we could get an error here if multiple processes are creating
81 # the directory at the same time. Grr, concurrency.
82 try:
98fee6e @chapmanb Avoid race conditions when creating directories concurrently
authored
83 os.makedirs(dname)
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
84 except OSError:
b123ba1 @chapmanb Script to run distributed message passing analysis on LSF cluster
authored
85 if not os.path.isdir(dname):
86 raise
1186866 @chapmanb Return directory from makedir for easier chaining
authored
87 return dname
98fee6e @chapmanb Avoid race conditions when creating directories concurrently
authored
88
89 @contextlib.contextmanager
54a4ef0 @chapmanb Initial framework for supporting distributed tasks though Celery mess…
authored
90 def curdir_tmpdir(remove=True):
98fee6e @chapmanb Avoid race conditions when creating directories concurrently
authored
91 """Context manager to create and remove a temporary directory.
92 """
93 tmp_dir_base = os.path.join(os.getcwd(), "tmp")
94 safe_makedir(tmp_dir_base)
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
95 tmp_dir = tempfile.mkdtemp(dir=tmp_dir_base)
98fee6e @chapmanb Avoid race conditions when creating directories concurrently
authored
96 safe_makedir(tmp_dir)
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
97 try :
98 yield tmp_dir
99 finally :
54a4ef0 @chapmanb Initial framework for supporting distributed tasks though Celery mess…
authored
100 if remove:
101 shutil.rmtree(tmp_dir)
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
102
103 @contextlib.contextmanager
104 def chdir(new_dir):
105 """Context manager to temporarily change to a new directory.
106
107 http://lucentbeing.com/blog/context-managers-and-the-with-statement-in-python/
108 """
109 cur_dir = os.getcwd()
98fee6e @chapmanb Avoid race conditions when creating directories concurrently
authored
110 safe_makedir(new_dir)
d2ba583 @chapmanb Move general utilities out of Picard directory; add in helper functio…
authored
111 os.chdir(new_dir)
112 try :
113 yield
114 finally :
115 os.chdir(cur_dir)
116
117 @contextlib.contextmanager
118 def tmpfile(*args, **kwargs):
119 """Make a tempfile, safely cleaning up file descriptors on completion.
120 """
121 (fd, fname) = tempfile.mkstemp(*args, **kwargs)
122 try:
123 yield fname
124 finally:
125 os.close(fd)
126 if os.path.exists(fname):
127 os.remove(fname)
b4e36eb @chapmanb Utility to create working directorie for project
authored
128
37062c4 @chapmanb Provide general framework for parallelizing long running single tasks…
authored
129 def file_exists(fname):
130 """Check if a file exists and is non-empty.
131 """
132 return os.path.exists(fname) and os.path.getsize(fname) > 0
133
b4e36eb @chapmanb Utility to create working directorie for project
authored
134 def create_dirs(config, names=None):
135 if names is None:
136 names = config["dir"].keys()
137 for dname in names:
138 d = config["dir"][dname]
98fee6e @chapmanb Avoid race conditions when creating directories concurrently
authored
139 safe_makedir(d)
4bdd5c9 @chapmanb Save diskspace during analysis processing by cleaning up intermediate…
authored
140
141 def save_diskspace(fname, reason, config):
142 """Overwrite a file in place with a short message to save disk.
143
144 This keeps files as a sanity check on processes working, but saves
145 disk by replacing them with a short message.
146 """
147 if config["algorithm"].get("save_diskspace", False):
148 with open(fname, "w") as out_handle:
149 out_handle.write("File removed to save disk space: %s" % reason)
150
ea7d52e @chapmanb Provide map-compatible functionality for running distributed celery t…
authored
151 def read_galaxy_amqp_config(galaxy_config, base_dir):
54a4ef0 @chapmanb Initial framework for supporting distributed tasks though Celery mess…
authored
152 """Read connection information on the RabbitMQ server from Galaxy config.
153 """
ea7d52e @chapmanb Provide map-compatible functionality for running distributed celery t…
authored
154 galaxy_config = add_full_path(galaxy_config, base_dir)
54a4ef0 @chapmanb Initial framework for supporting distributed tasks though Celery mess…
authored
155 config = ConfigParser.ConfigParser()
156 config.read(galaxy_config)
157 amqp_config = {}
158 for option in config.options("galaxy_amqp"):
159 amqp_config[option] = config.get("galaxy_amqp", option)
160 return amqp_config
2ad2484 @chapmanb Generalize alignment step of pipeline; centralize specifications for …
authored
161
162 def add_full_path(dirname, basedir=None):
163 if basedir is None:
164 basedir = os.getcwd()
165 if not dirname.startswith("/"):
166 dirname = os.path.join(basedir, dirname)
167 return dirname
fb4378e @chapmanb Utility function for providing more specific configurations on top of…
authored
168
169 # ## Dealing with configuration files
170
171 def merge_config_files(fnames):
172 """Merge configuration files, preferring definitions in latter files.
173 """
174 def _load_yaml(fname):
175 with open(fname) as in_handle:
176 config = yaml.load(in_handle)
177 return config
178 out = _load_yaml(fnames[0])
179 for fname in fnames[1:]:
180 cur = _load_yaml(fname)
181 for k, v in cur.iteritems():
182 if out.has_key(k) and isinstance(out[k], dict):
183 out[k].update(v)
184 else:
185 out[k] = v
186 return out
Something went wrong with that request. Please try again.