-
Notifications
You must be signed in to change notification settings - Fork 0
/
deblur_result_io.py
157 lines (145 loc) · 4.82 KB
/
deblur_result_io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python
import os
import sys
import collections
import numpy as np
def ensure_dir(f):
d = os.path.dirname(f)
if not os.path.exists(d):
os.makedirs(d)
def write_vblur(b, ofname):
tf = open(ofname, 'w')
text = [ "{0}: {1}\n".format(rlen, "\t".join(map(str,vblur)))
for rlen, vblur in b.items() ]
tf.writelines(text)
tf.close()
def read_vblur(fname):
tf = open(fname)
b = collections.OrderedDict()
for line in tf:
read_len, pc_str = line.rstrip("\n").split(": ")
read_len = int(read_len)
prof = np.array(list(map(float, pc_str.split("\t"))))
b[read_len] = prof
tf.close()
return b
def write_essentials(ptrue, eps, ofname):
if len(ptrue)==0:
return
tf = open(ofname, 'w')
i = 0
for tid, prof in ptrue.items():
text = [ "tid: {0}\n".format(tid),
"0: {0}\n".format("\t".join(map(str, prof))) ] + \
[ "{0}: {1}\n".format(rlen, "\t".join(map(str, eps_rlen)))
for rlen, eps_rlen in eps[tid].items() ]
tf.writelines(text)
i += 1
sys.stdout.write("processed transcripts {0}.\t\r".format(i))
sys.stdout.flush()
sys.stdout.write("\n")
tf.close()
def read_essentials(fname):
tf = open(fname)
ptrue = {}
eps = {}
transcript = {}
line = tf.readline()
i = 0
while line:
if line.startswith("tid: "):
if transcript:
eps[tid] = transcript.copy()
i += 1
sys.stdout.write("processed transcripts {0}.\t\r".format(i))
sys.stdout.flush()
transcript.clear()
tid = line.lstrip("tid: ").rstrip("\n")
else:
read_len, pc_str = line.rstrip("\n").split(": ")
read_len = int(read_len)
prof = np.array(list(map(float, pc_str.split("\t"))))
if read_len == 0:
ptrue[tid] = prof
else:
transcript[read_len] = prof
line = tf.readline()
if transcript:
eps[tid] = transcript.copy()
sys.stdout.write("\n")
tf.close()
return ptrue, eps
def write_cds_profile(base_prof, ofname):
if len(base_prof) == 0:
return
tf = open(ofname, 'w')
i = 0
for tid, prof in base_prof.items():
text = [ "tid: {0}\n".format(tid),
"{0}\n".format("\t".join(map(str, prof))) ]
tf.writelines(text)
i += 1
sys.stdout.write("processed transcripts {0}.\t\r".format(i))
sys.stdout.flush()
sys.stdout.write("\n")
tf.close()
def read_cds_profile(fname):
tf = open(fname)
base_prof = {}
line = tf.readline()
i = 0
while line:
if line.startswith("tid: "):
i += 1
tid = line.lstrip("tid: ").rstrip("\n")
line = tf.readline()
prof = map(float, line.rstrip("\n").split("\t"))
base_prof[tid] = np.array(prof)
sys.stdout.write("processed transcripts {0}.\t\r".format(i))
sys.stdout.flush()
line = tf.readline()
sys.stdout.write("\n")
tf.close()
return base_prof
#=============================
# format conversion
#=============================
def build_cobs_per_rlen_with_shifts(pos_list, start, end, offset):
""" shift left with offset"""
profile = np.zeros(end-start+1)
for pos, cnt in pos_list:
idx_adj = pos-start-offset
if idx_adj < 0 or idx_adj >= len(profile): continue
profile[idx_adj] = cnt
return profile
def build_cobs_per_transcript_with_shifts(clist, start, end, rlen_min, rlen_max, klist):
cobst = {}
for rlen, pos_list in clist.items():
if rlen < rlen_min or rlen > rlen_max: continue
cobst[rlen] = build_cobs_per_rlen_with_shifts(pos_list, start, end, klist[rlen])
return cobst
def build_cobs_with_shifts(tprofile, cds_range, utr5_offset, utr3_offset, rlen_min, rlen_max, klist):
print("build profiles with shifts")
cobs = {}
i = 0
for tid, prof in tprofile.items():
start, end = cds_range[tid]
cobs_current = build_cobs_per_transcript_with_shifts(prof, utr5_offset, (end-start)+utr3_offset, rlen_min, rlen_max, klist)
if len(cobs_current) != 0:
cobs[tid] = cobs_current
i += 1
sys.stdout.write("processed transcript {0}.\t\r".format(i))
sys.stdout.flush()
sys.stdout.write("\n")
return cobs
def build_profile_from_list(pos_list, start, end):
profile = np.zeros(end-start+1)
for pos, cnt in pos_list:
profile[pos-start] = cnt
return profile
def build_cobs_for_deblur(clist, start, end, rlen_min, rlen_max):
cobs = {}
for rlen, pos_list in clist.items():
if rlen < rlen_min or rlen > rlen_max: continue
cobs[rlen] = build_profile_from_list(pos_list, start, end)
return cobs