This repository has been archived by the owner on May 5, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
_load.py
56 lines (43 loc) · 1.78 KB
/
_load.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import contextlib
import os.path
import shutil
from pkg_resources import resource_stream, resource_exists
from .. import util
_PKG = __package__ + '.data'
class InvalidLocusException(ValueError):
pass
def _check_locus(locus):
loci = frozenset(['igh', 'igk', 'igl'])
llocus = locus.lower()
if llocus not in loci:
raise InvalidLocusException(locus)
return llocus
def _get_file_name(locus, segment, collection=None, extension='.fasta'):
if collection:
return '{0}{1}-{2}{3}'.format(_check_locus(locus), segment, collection,
extension)
else:
return '{0}{1}{2}'.format(_check_locus(locus), segment, extension)
def _handle(locus, segment, collection, extension):
file_name = _get_file_name(locus, segment, collection=collection,
extension=extension)
return resource_stream(_PKG, file_name)
def fasta_handle(locus, segment, collection=None):
return _handle(locus, segment, collection, extension='.fasta')
def gff3_handle(locus, segment, collection=None):
return _handle(locus, segment, collection, extension='.gff3')
def has_gff3(locus, segment, collection=None):
file_name= _get_file_name(locus, segment, collection=collection,
extension='.gff3')
return resource_exists(_PKG, file_name)
@contextlib.contextmanager
def temp_fasta(locus, segment, collection=None):
"""
Copy an IGH segment collection to a temporary file
"""
base = os.path.splitext(_get_file_name(locus, segment, collection))[0]
with fasta_handle(locus, segment, collection=collection) as ifp, \
util.tempdir(prefix=base) as td:
with open(td(base + '.fasta'), 'w') as ofp:
shutil.copyfileobj(ifp, ofp)
yield ofp.name