forked from jgurtowski/ectools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
seqio.py
79 lines (54 loc) · 1.87 KB
/
seqio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from collections import namedtuple
FastaRecord = namedtuple('FastaRecord', ['name','seq'])
FastqRecord = namedtuple('FastqRecord', ['name','seq','desc','qual'])
def seqlen(record):
'''Gets the sequence length for a seqio record'''
return len(record.seq)
def fastaIterator(fh):
l = fh.readline()
if(not l or not l.startswith(">")):
raise Exception("No \">\" at start of Fasta File")
name = l.strip()[1:]
seq = ""
while True:
l = fh.readline()
if not l or l.startswith(">"):
yield FastaRecord(name,seq)
if not l:
break
name = l.strip()[1:]
seq = ""
else:
seq += l.strip()
def recordToString(record):
f = fastqRecordToString if hasattr(record,"desc") else fastaRecordToString
return f(record)
def fastaRecordToString(record):
return "\n".join([">"+record.name,record.seq])
def fastqIterator(fh):
l = fh.readline()
if(not l or not l.startswith("@")):
raise Exception("No \"@\" at start of Fastq File")
name = l.strip()[1:]
while True:
nxt = [fh.readline() for _ in range(3)]
if( not all(nxt)):
raise Exception("Fastq is corrupted")
yield FastqRecord(name, nxt[0].strip(), nxt[1][1:].strip(), nxt[2].strip())
l = fh.readline()
if not l:
break
name = l.strip()[1:]
def fastqRecordToString(record):
return "\n".join(["@"+record.name, record.seq, "+"+record.desc, record.qual])
def iteratorFromExtension(filename):
'''
Get a sequence file iterator
Based on the file's extension
'''
ext = filename.split(".")[-1]
if ext in ["fasta", "fa"]:
return fastaIterator
elif ext in ["fastq", "fq", "txt"]:
return fastqIterator
raise Exception, "Unknown file extension %s for file %s" % (ext,filename)