/
write_outputs_vcf.py
358 lines (290 loc) · 12 KB
/
write_outputs_vcf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
#!/usr/bin/env python
"""
Utils for writing VCF with or without depths info.
"""
import os
import glob
import time
from loguru import logger
import h5py
import numpy as np
import pandas as pd
import ipyrad
from ipyrad.assemble.utils import BTS, GETCONS, DCONS, chroms2ints
class FillVCF:
"""
Incorporate indels and trim amounts when grabbing depths from CATG arrays
(depth arrays from step 5). Indels are only releveant to denovo data.
"""
def __init__(self, data, nsnps, sample):
# input locus bits
self.locbits = glob.glob(os.path.join(data.tmpdir, "chunk*.loci"))
self.locbits = sorted(
self.locbits, key=lambda x: int(x.rsplit("-", 1)[-1][:-5]))
self.loclines = None
# input arrays of indels arrays
self.indbits = glob.glob(os.path.join(data.tmpdir, "chunk*.indels*"))
if not self.indbits:
self.indbits = [None] * len(self.locbits)
# input trim arrays
self.trimbits = glob.glob(os.path.join(data.tmpdir, "chunk*.npy"))
self.trimbits = sorted(
self.trimbits, key=lambda x: int(x.rsplit("-", 1)[-1][:-4]))
# array to store vcfdepths for this taxon
self.vcfd = np.zeros((nsnps, 4), dtype=np.uint32)
# the sample for this comp
self.sname = sample.name
self.isref = bool(data.isref)
# snpsmap has locations of SNPs on trimmed loci, e.g.,
# no SNPs are on loc 1 and 2, first is on 3 at post-trim pos 11
# [ 3 0 11 1 41935]
# [ 4 0 57 1 56150]
with h5py.File(data.snps_database, 'r') as io5:
self.snpsmap = io5['snpsmap'][:, [0, 2]]
# TODO: scaffs should be ordered (right?) so no need to load it all!
# All catgs for this sample (this could be done more mem efficient...)
with h5py.File(sample.files.database, 'r') as io5:
self.catgs = io5['catg'][:]
self.maxlen = self.catgs.shape[1]
# Sample-level counters
self.locidx = 0
self.snpidx = 0
def run(self):
"loops over chunked files streaming through all loci for this sample"
for idx in range(len(self.locbits)):
self.localidx = 0
self.locfill(idx)
def locfill(self, idx):
"iterates over loci in chunkfile to get and enter catgs for snps"
# load the arrays for this bit
edges = np.load(self.trimbits[idx])
inds = self.indbits[idx]
if inds:
inds = np.load(inds)
# iterate over the chunk of trimmed loci
self.loclines = iter(open(self.locbits[idx], 'r'))
while 1:
# yield increments locidx by 1
try:
self.yield_loc()
except StopIteration:
break
# get snps for this locus (1-indexed locus idxs)
self.locsnps = self.snpsmap[self.snpsmap[:, 0] == self.locidx]
# get global trim for this locus (0-indexed edge arr)
self.gtrim = edges[self.localidx - 1]
# if SNPs and data for this sample enter catgs
if (self.locsnps.size) and (self.sname in self.names):
if self.isref:
self.ref_enter_catgs()
else:
self.denovo_enter_catgs()
else:
# advance SNP counter even though this sample wasn't in SNP
self.snpidx += self.locsnps.shape[0]
def ref_enter_catgs(self):
# map SNP position to pre-trim locus position
nidx = self.names.index(self.sname)
sidx = self.sidxs[nidx]
tups = [[int(j) for j in i.split(":")] for i in sidx.split("-")]
# SNP is in samples, so get and store catg data for locidx
# [0] post-trim chrom:start-end of locus
# [1:] how far ahead of start does this sample start
# FOR DEBUGGING
# seq = seqs[nidx]
# seqarr = np.array(list(seq))
# enter each SNP
for snp in self.locsnps[:, 1]:
# in case multiple consens were merged in step 6 of this sample
for tup in tups:
cidx, coffset = tup
pos = snp + (self.gtrim - coffset)
if (pos >= 0) & (pos < self.maxlen):
self.vcfd[self.snpidx] += self.catgs[cidx, pos]
self.snpidx += 1
def denovo_enter_catgs(self):
"""
Grab catg depths for each SNP position -- needs to take into account
trim from left end, and impution of indels.
"""
nidx = self.names.index(self.sname)
sidx = self.sidxs[nidx]
tups = [[int(j) for j in i.split("-")] for i in sidx.split(":")]
# SNP is in samples, so get and store catg data for locidx
# [0] post-trim chrom:start-end of locus
# [1:] how far ahead of start does this sample start
# FOR DEBUGGING
seq = self.seqs[nidx]
# enter each SNP
for snp in self.locsnps[:, 1]:
# indels before this SNP
ishift = seq[:snp].count("-")
# in case multiple consens were merged in step 6 of this sample
for tup in tups:
cidx, coffset = tup
# pos = snp + (self.gtrim - coffset) - ishift
pos = snp + coffset - ishift
if (pos >= 0) & (pos < self.maxlen):
self.vcfd[self.snpidx] += self.catgs[cidx, pos]
self.snpidx += 1
def yield_loc(self):
self.names = []
self.seqs = []
while 1:
line = next(self.loclines)
if "|\n" not in line:
# skip if .loci chunk is empty
try:
name, seq = line.split()
self.names.append(name)
self.seqs.append(seq)
except ValueError:
continue
else:
self.locidx += 1
self.localidx += 1
self.sidxs = [i for i in line.rsplit("|", 2)[1].split(',')]
break
def build_vcf(data, chunksize=1000):
"""
"""
# removed at init of Step function anyway.
if os.path.exists(data.outfiles.vcf):
os.remove(data.outfiles.vcf)
# dictionary to translate locus numbers to chroms
if data.isref:
revdict = chroms2ints(data, True)
# pull locus numbers and positions from snps database
with h5py.File(data.snps_database, 'r') as io5:
# iterate over chunks
for chunk in range(0, io5['genos'].shape[0], chunksize):
# if reference then psuedo ref is already ordered with REF first.
pref = io5['pseudoref'][chunk:chunk + chunksize]
snpmap = io5['snpsmap'][chunk:chunk + chunksize]
# load array chunks
if data.isref:
genos = io5['genos'][chunk:chunk + chunksize, 1:, :]
snames = data.snames[1:]
# 1-indexed to 0-indexed (1/9/2019)
chroms = [revdict[i - 1] for i in snpmap[:, 3]]
ids = [
"loc{}_pos{}".format(i - 1, j) for (i, j)
in snpmap[:, [0, 2]]
]
# reference based positions: pos on scaffold: 4, yes. tested.
pos = snpmap[:, 4]
#offset = 1
else:
genos = io5['genos'][chunk:chunk + chunksize, :, :]
snames = data.snames
chroms = ["RAD_{}".format(i - 1) for i in snpmap[:, 0]]
ids = [
"loc{}_pos{}".format(i - 1, j) for (i, j)
in snpmap[:, [0, 2]]
]
# denovo based positions: pos on locus. tested. works. right.
# almost. POS is 1 indexed.
pos = snpmap[:, 2] + 1
# offset = 0
# get alt genotype calls
alts = [
b",".join(i).decode().strip(",")
for i in pref[:, 1:].view("S1")
]
# build df label cols
df_pos = pd.DataFrame({
'#CHROM': chroms,
'POS': pos, # 1-indexed
'ID': ids, # 0-indexed
'REF': [i.decode() for i in pref[:, 0].view("S1")],
'ALT': alts,
'QUAL': [13] * genos.shape[0],
'FILTER': ['PASS'] * genos.shape[0],
})
# get sample coverage at each site
nsamps = (
genos.shape[1] - np.any(genos == 9, axis=2).sum(axis=1)
)
# store sum of coverage at each site
asums = []
# build depth columns for each sample
df_depth = pd.DataFrame({})
for sname in snames:
# build geno strings
genostrs = [
"{}/{}".format(*k) for k in [
i for i in [
list(j) for j in genos[:, snames.index(sname)]
]
]
]
# change 9's into missing
genostrs = ["./." if i == "9/9" else i for i in genostrs]
# genostrs = [
# b"/".join(i).replace(b"9", b".").decode()
# for i in genos[:, snames.index(sname)]
# .astype(bytes)
# ]
# build depth and depthsum strings
dpth = os.path.join(data.tmpdir, sname + ".depths.hdf5")
with h5py.File(dpth, 'r') as s5:
dpt = s5['depths'][chunk:chunk + chunksize]
sums = [sum(i) for i in dpt]
strs = [
",".join([str(k) for k in i.tolist()])
for i in dpt
]
# save concat string to name
df_depth[sname] = [
"{}:{}:{}".format(i, j, k) for (i, j, k) in
zip(genostrs, sums, strs)]
# add sums to global list
asums.append(np.array(sums))
# make final columns
nsums = sum(asums)
colinfo = pd.Series(
name="INFO",
data=[
"NS={};DP={}".format(i, j) for (i, j) in zip(nsamps, nsums)
])
colform = pd.Series(
name="FORMAT",
data=["GT:DP:CATG"] * genos.shape[0],
)
# concat and order columns correctly
infocols = pd.concat([df_pos, colinfo, colform], axis=1)
infocols = infocols[
["#CHROM", "POS", "ID", "REF", "ALT",
"QUAL", "FILTER", "INFO", "FORMAT"]]
arr = pd.concat([infocols, df_depth], axis=1)
# debugging
#print(arr.head())
## PRINTING VCF TO FILE
## choose reference string
if data.isref:
reference = data.params.reference_sequence
else:
reference = "pseudo-reference (most common base at site)"
header = VCFHEADER.format(
date=time.strftime("%Y/%m/%d"),
version=ipyrad.__version__,
reference=os.path.basename(reference)
)
with open(data.outfiles.vcf, 'a') as out:
if chunk == 0:
out.write(header)
arr.to_csv(out, sep='\t', index=False)
else:
arr.to_csv(out, sep='\t', index=False, header=False)
VCFHEADER = """\
##fileformat=VCFv4.0
##fileDate={date}
##source=ipyrad_v.{version}
##reference={reference}
##phasing=unphased
##INFO=<ID=NS,Number=1,Type=Integer,Description="Number of Samples With Data">
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=CATG,Number=1,Type=String,Description="Base Counts (CATG)">
"""