-
Notifications
You must be signed in to change notification settings - Fork 39
/
demultiplex.py
1447 lines (1201 loc) · 48.7 KB
/
demultiplex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
""" demultiplex raw sequence data given a barcode map."""
# py2/3 compatible imports
from __future__ import print_function
from builtins import range
try:
from itertools import izip, islice
except ImportError:
from itertools import islice
izip = zip
# external imports
import os
import io
import gzip
import glob
import time
import shutil
import pickle
import numpy as np
import subprocess as sps
from collections import Counter
# ipyrad imports
from ipyrad.core.sample import Sample
from ipyrad.assemble.utils import IPyradError, ambigcutters, BADCHARS
class Step1:
def __init__(self, data, force, ipyclient):
# store attrs
self.data = data
self.force = force
self.ipyclient = ipyclient
self.skip = False
# check input data files
self.sfiles = self.data.params.sorted_fastq_path
self.rfiles = self.data.params.raw_fastq_path
self.print_headers()
self.select_method()
self.setup_dirs()
def run(self):
if not self.skip:
if self.method == "link_fastqs":
FileLinker(self).run()
else:
Demultiplexer(self).run()
self.data.save()
def setup_dirs(self):
"create output directory, tmp directory."
# assign fastq dir
self.data.dirs.fastqs = os.path.join(
self.data.params.project_dir,
self.data.name + "_fastqs")
self.data.dirs.fastqs = os.path.realpath(self.data.dirs.fastqs)
# Do NOT delete any directory if you're just linking sorted fastqs
# This allows you to reuse _fastqs from previous assemblies.
if self.method == "link_fastqs":
pass
# remove existing if force flag.
elif self.force:
if os.path.exists(self.data.dirs.fastqs):
shutil.rmtree(self.data.dirs.fastqs)
# bail out if overwrite necessary but no force flag.
else:
if os.path.exists(self.data.dirs.fastqs):
raise IPyradError(
"Fastq dir {} already exists: use force to overwrite"
.format(self.data.dirs.fastqs))
self.skip = True
# ensure project dir exists
if not os.path.exists(self.data.params.project_dir):
os.mkdir(self.data.params.project_dir)
# ensure fastq dir exists, but don't make the directory if linking
# because it's empty and unused
if not os.path.exists(self.data.dirs.fastqs) and\
not self.method == "link_fastqs":
os.mkdir(self.data.dirs.fastqs)
def print_headers(self):
# print headers
if self.data._cli:
if self.sfiles:
self.data._print(
"\n Step 1: Loading sorted fastq data to Samples")
else:
self.data._print(
"\n Step 1: Demultiplexing fastq data to Samples")
def select_method(self):
"Checks file paths and for existing samples and returns function"
# do not allow both a sorted_fastq_path and a raw_fastq
if self.sfiles and self.rfiles:
raise IPyradError(NOT_TWO_PATHS)
# but also require that at least one exists
if not (self.sfiles or self.rfiles):
raise IPyradError(NO_SEQ_PATH_FOUND)
# if Samples already exist then bail out unless force
if self.data.samples:
if not self.force:
raise IPyradError(
SAMPLES_EXIST.format(
len(self.data.samples), self.data.name))
else:
if glob.glob(self.sfiles):
self.method = "link_fastqs"
else:
self.method = "demultiplex"
# Creating new Samples
else:
if glob.glob(self.sfiles):
self.method = "link_fastqs"
else:
self.method = "demultiplex"
class FileLinker:
"""
Loads Samples from file names and check sample names for bad chars.
"""
def __init__(self, step):
self.data = step.data
self.input = step.sfiles
self.fastqs = glob.glob(self.input)
self.ftuples = []
# parallel distributor
self.ipyclient = step.ipyclient
self.lbview = self.ipyclient.load_balanced_view()
def run(self):
# checks for bad names and fills self.ftuples with file handles
self.check_files()
# distributes jobs to parallel
self.remote_run_linker()
def check_files(self):
# Assert files are not .bz2 format
if any([i.endswith(".bz2") for i in self.fastqs]):
raise IPyradError(NO_SUPPORT_FOR_BZ2.format(self.input))
# filter out any files without proper file endings. Raise if None
endings = ("gz", "fastq", "fq")
self.fastqs = [i for i in self.fastqs if i.split(".")[-1] in endings]
if not self.fastqs:
raise IPyradError(
NO_FILES_FOUND_PAIRS
.format(self.data.params.sorted_fastq_path))
# link pairs into tuples
if 'pair' in self.data.params.datatype:
# check that names fit the paired naming convention
r1s = [i for i in self.fastqs if "_R1_" in i]
r2s = [i for i in self.fastqs if "_R2_" in i]
# file checks
if not r1s:
raise IPyradError(
"No fastqs files found. File names must contain '_R1_' "
"(and '_R2_' for paired data). See Docs.")
if len(r1s) != len(r2s):
raise IPyradError(
R1_R2_name_error.format(len(r1s), len(r2s)))
# store tuples
self.ftuples = []
for r1file in r1s:
r2file = r1file.replace("_R1_", "_R2_")
if not os.path.exists(r2file):
raise IPyradError(
"Expected R2 file {} to match R1 file {}"
.format(r1file, r2file)
)
self.ftuples.append((r1file, r2file))
# data are not paired, create empty tuple pair
else:
# print warning if _R2_ is in names when not paired
if any(["_R2_" in i for i in self.fastqs]):
print(NAMES_LOOK_PAIRED_WARNING)
self.ftuples = [(i, "") for i in self.fastqs]
def remote_run_linker(self):
"read in fastq files and count nreads for stats and chunking in s2."
# local counters
createdinc = 0
# iterate over input files
for ftup in self.ftuples:
# remove file extension from name
sname = get_name_from_file(ftup[0], None, None)
# Create new Sample Class objects with names from files
if sname not in self.data.samples:
newsamp = Sample(sname)
newsamp.stats.state = 1
newsamp.barcode = None
newsamp.files.fastqs = [ftup]
self.data.samples[sname] = newsamp
createdinc += 1
# send jobs to engines for counting with cat/zcat | wc
rasyncs = {}
if createdinc:
for sample in self.data.samples.values():
# get zip var
gzipped = bool(sample.files.fastqs[0][0].endswith(".gz"))
# submit job to count lines and store async
rasyncs[sample.name] = self.lbview.apply(
zbufcountlines,
*(sample.files.fastqs[0][0], gzipped)
)
# wait for link jobs to finish if parallel
start = time.time()
printstr = ("loading reads ", "s1")
while 1:
fin = [i.ready() for i in rasyncs.values()]
self.data._progressbar(len(fin), sum(fin), start, printstr)
time.sleep(0.1)
if len(fin) == sum(fin):
self.data._print("")
break
# collect link job results
for sname in rasyncs:
res = rasyncs[sname].get() / 4
self.data.samples[sname].stats.reads_raw = res
self.data.samples[sname].stats_dfs.s1["reads_raw"] = res
self.data.samples[sname].state = 1
# print if data were linked
if createdinc:
# double for paired data
if 'pair' in self.data.params.datatype:
createdinc = createdinc * 2
if self.data._cli:
self.data._print(
"{} fastq files loaded to {} Samples."
.format(
createdinc,
len(self.data.samples),
)
)
# save step-1 stats. We don't want to write this to the fastq dir, b/c
# it is not necessarily inside our project dir. Instead, we'll write
# this file into our project dir in the case of linked_fastqs.
self.data.stats_dfs.s1 = self.data._build_stat("s1")
self.data.stats_files.s1 = os.path.join(
self.data.params.project_dir,
self.data.name + '_s1_demultiplex_stats.txt')
with open(self.data.stats_files.s1, 'w') as outfile:
(self.data.stats_dfs.s1
.fillna(value=0)
.astype(np.int64)
.to_string(outfile))
class Demultiplexer:
def __init__(self, step):
self.data = step.data
self.input = step.rfiles
self.fastqs = glob.glob(self.input)
self.ipyclient = step.ipyclient
# single engine jobs
self.iview = self.ipyclient.load_balanced_view(targets=[0])
# limited multi-engine jobs
if len(self.ipyclient.ids) >= 12:
targets = self.ipyclient.ids[::4]
else:
targets = self.ipyclient.ids[:4]
self.lbview = self.ipyclient.load_balanced_view(targets=targets)
# re-parse the barcodes file in case hackers options changed
# e.g., (i7 demuxing or merge technical replicates)
self.data._link_barcodes()
# attrs filled by check_files
self.ftuples = []
self.chunksdict = {}
self.longbar = None
self.check_files()
# attrs filled by get_barcode_dict
self.cutters = None
self.matchdict = {}
self.get_barcode_dict()
# store stats for each file handle (grouped results of chunks)
self.stats = Stats()
def run(self):
# Estimate size of files to plan parallelization.
self.setup_for_splitting()
# work load; i.e., is there one giant file or many small files?
self.splitfiles()
# process the files or chunked file bits
self.remote_run_barmatch()
# concatenate chunks
self.concatenate_chunks()
# store stats and create Sample objects in Assembly
self.store_stats()
# init functions -----------------------------------
def check_files(self):
"Check that data files are present and formatted correctly"
# check for data using glob for fuzzy matching
if not self.fastqs:
raise IPyradError(
NO_RAWS.format(self.data.params.raw_fastq_path))
# find longest barcode
if not os.path.exists(self.data.params.barcodes_path):
raise IPyradError(
"Barcodes file not found. You entered: '{}'".format(
self.data.params.barcodes_path))
# Handle 3rad multi-barcodes. Gets len of the first one.
blens = [len(i.split("+")[0]) for i in self.data.barcodes.values()]
if len(set(blens)) == 1:
self.longbar = (blens[0], 'same')
else:
self.longbar = (max(blens), 'diff')
# i7 tags there will be only one barcode, so this overrides "datatype"
# so that if you are using pair3rad if doesn't cause problems.
# For pair3rad we need to add the length info for barcodes_R2
if not self.data.hackersonly.demultiplex_on_i7_tags:
if "3rad" in self.data.params.datatype:
blens = [
len(i.split("+")[1]) for i in self.data.barcodes.values()
]
# Record if bar1 and bar2 are different lengths
if self.longbar[0] != max(blens):
self.longbar = (self.longbar[0], 'diff', max(blens))
else:
self.longbar = (self.longbar[0], 'same', max(blens))
# gather raw sequence filenames (people want this to be flexible ...)
if 'pair' in self.data.params.datatype:
firsts = [i for i in self.fastqs if "_R1_" in i]
if not firsts:
raise IPyradError(
"First read files names must contain '_R1_'.")
seconds = [i.replace("_R1_", "_R2_") for i in firsts]
self.ftuples = list(zip(firsts, seconds))
else:
self.ftuples = list(zip(self.fastqs, iter(int, 1)))
def get_barcode_dict(self):
"""
Checks sample names and replaces bad chars in dict with _
And returns a list of both resolutions of cut site 1 for ambigs.
# (TGCAG, ) ==> [TGCAG, ]
# (TWGC, ) ==> [TAGC, TTGC]
# (TWGC, AATT) ==> [TAGC, TTGC]
"""
# expand ambigs
self.cutters = [
ambigcutters(i) for i in self.data.params.restriction_overhang
]
assert self.cutters, "Must enter a restriction_overhang for demultiplexing."
# get matchdict
self.matchdict = inverse_barcodes(self.data)
def setup_for_splitting(self, omin=int(8e6)):
"""
Decide to split or not based on whether 1/16th of file size is
bigger than omin, which is default to 8M reads.
"""
# create a tmpdir for chunked_files and a chunk optimizer
self.tmpdir = os.path.realpath(
os.path.join(self.data.dirs.fastqs, "tmpdir")
)
if os.path.exists(self.tmpdir):
shutil.rmtree(self.tmpdir)
os.makedirs(self.tmpdir)
# chunk into 16 pieces
self.nreads = estimate_nreads(self.data, self.ftuples[0][0])
self.optim = int(self.nreads / 16)
# if more files than cpus or optim<8M: no chunking
self.do_file_split = 0
if (len(self.ftuples) > len(self.ipyclient)) or (self.optim > omin):
self.do_file_split = 1
def splitfiles(self):
"sends raws to be chunked"
# send slices N at a time. The dict chunkfiles stores a tuple of
# rawpairs dictionary to store asyncresults for sorting jobs
printstr = ('chunking large files', 's1')
start = time.time()
done = 0
njobs = (
32 * len(self.ftuples) if "pair" in self.data.params.datatype
else 16 * len(self.ftuples)
)
rasyncs = {}
chunksdict = {}
for fidx, ftup in enumerate(self.ftuples):
# get file handle w/o basename for stats output
handle = os.path.splitext(os.path.basename(ftup[0]))[0]
if not self.do_file_split:
chunksdict[handle] = [ftup]
# chunk file into 4 bits using zcat_make_temps
else:
args = (self.data, ftup, fidx, self.tmpdir, self.optim, start)
rasyncs[handle] = self.iview.apply(zcat_make_temps, *args)
# track progress until finished
# for each file submitted we expect it to create 16 or 32 files.
if rasyncs:
while 1:
# break when all jobs are finished
if all([i.ready() for i in rasyncs.values()]):
break
# ntemp files written or being written
done = len(glob.glob(os.path.join(self.tmpdir, "chunk*_*_*")))
self.data._progressbar(njobs, done, start, printstr)
time.sleep(0.5)
# store results
for key, val in rasyncs.items():
chunksdict[key] = val.get()
# clean up
self.ipyclient.purge_everything()
self.data._progressbar(10, 10, start, printstr)
self.data._print("")
# return value
self.chunksdict = chunksdict
def remote_run_barmatch(self):
"Submit chunks to be sorted by barmatch() and collect stats"
# progress bar info
start = time.time()
printstr = ("sorting reads ", "s1")
# chunkfiles is a dict with {handle: chunkslist, ...}. The func barmatch
# writes results to samplename files with PID number, and also writes a
# pickle for chunk specific results with fidx suffix, which it returns.
rasyncs = {}
ridx = 0
for handle, ftuplist in self.chunksdict.items():
for fidx, ftuple in enumerate(ftuplist):
args = (
self.data,
ftuple,
self.longbar,
self.cutters,
self.matchdict,
fidx,
)
rasync = self.lbview.apply(barmatch, args)
rasyncs[ridx] = (handle, rasync)
ridx += 1
# get ready to receive stats: 'total', 'cutfound', 'matched'
self.stats.perfile[handle] = np.zeros(3, dtype=np.int64)
# collect and store results as jobs finish
njobs = len(rasyncs)
done = 0
while 1:
# get list of ridx numbers for finished jobs
finished = [i for (i, j) in rasyncs.items() if j[1].ready()]
# cleanup finished ridx jobs and grab stats
for ridx in finished:
handle, rasync = rasyncs[ridx]
pkl = rasync.get()
self.stats.fill_from_pickle(pkl, handle)
del rasyncs[ridx]
done += 1
# print progress
self.data._progressbar(njobs, done, start, printstr)
time.sleep(0.1)
if njobs == done:
self.data._print("")
break
def concatenate_chunks(self):
"""
If multiple chunk files match to the same sample name but with
different barcodes (i.e., they are technical replicates) then this
will assign all the files to the same sample name file.
"""
# collate files progress bar
start = time.time()
printstr = ("writing/compressing ", "s1")
self.data._progressbar(10, 0, start, printstr)
# get all the files
ftmps = glob.glob(os.path.join(
self.data.dirs.fastqs,
"tmpdir",
"tmp_*.fastq"))
# a dict to assign tmp files to names/reads
r1dict = {}
r2dict = {}
for sname in self.data.barcodes:
if "-technical-replicate-" in sname:
sname = sname.rsplit("-technical-replicate", 1)[0]
r1dict[sname] = []
r2dict[sname] = []
# assign to name keys
for ftmp in ftmps:
base, orient, _ = ftmp.rsplit("_", 2)
sname = base.rsplit("/", 1)[-1].split("tmp_", 1)[1]
if orient == "R1":
r1dict[sname].append(ftmp)
else:
r2dict[sname].append(ftmp)
## concatenate files
snames = []
for sname in self.data.barcodes:
if "-technical-replicate-" in sname:
sname = sname.rsplit("-technical-replicate", 1)[0]
snames.append(sname)
writers = []
for sname in set(snames):
tmp1s = sorted(r1dict[sname])
tmp2s = sorted(r2dict[sname])
writers.append(
self.iview.apply(
collate_files,
*(self.data, sname, tmp1s, tmp2s))
)
total = len(writers)
while 1:
ready = [i.ready() for i in writers]
self.data._progressbar(total, sum(ready), start, printstr)
time.sleep(0.1)
if all(ready):
self.data._print("")
break
def store_stats(self):
"Write stats and stores to Assembly object."
# out file
self.data.stats_files.s1 = os.path.join(
self.data.dirs.fastqs, 's1_demultiplex_stats.txt')
outfile = open(self.data.stats_files.s1, 'w')
# write the header for file stats ------------------------------------
outfile.write(
"{:<35} {:>13}{:>13}{:>13}\n"
.format("raw_file", "total_reads", "cut_found", "bar_matched"))
# write the file stats
r1names = sorted(self.stats.perfile)
for fname in r1names:
dat = self.stats.perfile[fname]
outfile.write(
"{:<35} {:>13}{:>13}{:>13}\n"
.format(fname, dat[0], dat[1], dat[2])
)
# repeat for pairfile
if 'pair' in self.data.params.datatype:
fname = fname.replace("_R1_", "_R2_")
outfile.write(
"{:<35} {:>13}{:>13}{:>13}\n"
.format(fname, dat[0], dat[1], dat[2])
)
# spacer, how many records for each sample --------------------------
outfile.write(
"\n{:<35} {:>13}\n"
.format("sample_name", "total_reads"))
# names alphabetical. Write to file. Will save again below to Samples.
snames = set()
for sname in self.data.barcodes:
if "-technical-replicate-" in sname:
sname = sname.rsplit("-technical-replicate", 1)[0]
snames.add(sname)
for sname in sorted(list(snames)):
outfile.write("{:<35} {:>13}\n"
.format(sname, self.stats.fsamplehits[sname]))
## spacer, which barcodes were found -----------------------------------
outfile.write('\n{:<35} {:>13} {:>13} {:>13}\n'
.format("sample_name", "true_bar", "obs_bar", "N_records"))
## write sample results
for sname in sorted(self.data.barcodes):
if "-technical-replicate-" in sname:
fname = sname.rsplit("-technical-replicate", 1)[0]
else:
fname = sname
# write perfect hit
hit = self.data.barcodes[sname]
offhitstring = ""
# write off-n hits
# sort list of off-n hits
if fname in self.stats.fdbars:
offkeys = list(self.stats.fdbars.get(fname))
for offhit in offkeys[::-1]:
# exclude perfect hit
if offhit not in self.data.barcodes.values():
offhitstring += (
"{:<35} {:>13} {:>13} {:>13}\n"
.format(sname, hit, offhit,
int(self.stats.fbarhits[offhit] / 2))
)
#sumoffhits += fbarhits[offhit]
# write string to file
outfile.write("{:<35} {:>13} {:>13} {:>13}\n"
.format(sname, hit, hit,
int(self.stats.fbarhits[hit] / 2)))
outfile.write(offhitstring)
# write misses
misskeys = list(self.stats.fmisses.keys())
misskeys.sort(key=self.stats.fmisses.get)
for key in misskeys[::-1]:
outfile.write('{:<35} {:>13} {:>13} {:>13}\n'
.format("no_match", "_", key, self.stats.fmisses[key]))
outfile.close()
# Link Sample with this data file to the Assembly object
for sname in snames:
# make the sample
sample = Sample(sname)
# allow multiple barcodes if its a replicate.
barcodes = []
for n in range(500):
fname = "{}-technical-replicate-{}".format(sname, n)
fbar = self.data.barcodes.get(fname)
if fbar:
barcodes.append(fbar)
if barcodes:
sample.barcode = barcodes
else:
sample.barcode = self.data.barcodes[sname]
# file names
if 'pair' in self.data.params.datatype:
sample.files.fastqs = [(
os.path.join(
self.data.dirs.fastqs, sname + "_R1_.fastq.gz"),
os.path.join(
self.data.dirs.fastqs, sname + "_R2_.fastq.gz"),
)]
else:
sample.files.fastqs = [
(os.path.join(
self.data.dirs.fastqs,
sname + "_R1_.fastq.gz",
),
""),
]
# fill in the summary stats
sample.stats["reads_raw"] = int(self.stats.fsamplehits[sname])
# fill in the full df stats value
sample.stats_dfs.s1["reads_raw"] = int(
self.stats.fsamplehits[sname])
# Only link Sample if it has data
if sample.stats["reads_raw"]:
sample.stats.state = 1
self.data.samples[sample.name] = sample
else:
print("Excluded sample: no data found for", sname)
# initiate s1 key for data object
self.data.stats_dfs.s1 = self.data._build_stat("s1")
# cleanup
shutil.rmtree(self.tmpdir)
# this class is created and run inside the barmatch function() that is run
# on remote engines for parallelization.
class BarMatch:
def __init__(self, data, ftuple, longbar, cutters, matchdict, fidx):
"""
Sorts reads to samples based on barcodes and writes stats to a pickle.
"""
# store attrs
self.data = data
self.longbar = longbar
self.cutters = cutters
self.ftuple = ftuple
self.matchdict = matchdict
self.fidx = fidx
# when to write to disk
self.chunksize = int(1e6)
self.epid = os.getpid()
self.filestat = np.zeros(3, dtype=int)
# store reads per sample (group technical replicates)
self.samplehits = {}
for sname in self.data.barcodes:
if "-technical-replicate-" in sname:
sname = sname.rsplit("-technical-replicate", 1)[0]
self.samplehits[sname] = 0
# store all barcodes observed
self.barhits = {}
for barc in self.matchdict:
self.barhits[barc] = 0
# store reads and bars matched to samples
self.read1s = {}
self.read2s = {}
self.dbars = {}
for sname in self.data.barcodes:
if "-technical-replicate-" in sname:
sname = sname.rsplit("-technical-replicate", 1)[0]
self.read1s[sname] = []
self.read2s[sname] = []
self.dbars[sname] = set()
# store counts of what didn't match to samples
self.misses = {}
self.misses['_'] = 0
def run(self):
self.demux = self.get_matching_function()
self.open_read_generators()
pkl = self.sort_reads()
self.close_read_generators()
return pkl
def get_matching_function(self):
if self.longbar[1] == 'same':
if self.data.params.datatype == '2brad':
return getbarcode1
else:
return getbarcode2
else:
return getbarcode3
def open_read_generators(self):
"""
Gzips are always bytes so let's use rb to make unzipped also bytes.
"""
# get file type
if self.ftuple[0].endswith(".gz"):
self.ofile1 = gzip.open(self.ftuple[0], 'rb')
else:
self.ofile1 = open(self.ftuple[0], 'rb')
# create iterators
fr1 = iter(self.ofile1)
quart1 = izip(fr1, fr1, fr1, fr1)
# create second read iterator for paired data
if self.ftuple[1]:
if self.ftuple[0].endswith(".gz"):
self.ofile2 = gzip.open(self.ftuple[1], 'rb')
else:
self.ofile2 = open(self.ftuple[1], 'rb')
# create iterators
fr2 = iter(self.ofile2)
quart2 = izip(fr2, fr2, fr2, fr2)
self.quarts = izip(quart1, quart2)
else:
self.quarts = izip(quart1, iter(int, 1))
def close_read_generators(self):
self.ofile1.close()
if self.ftuple[1]:
self.ofile2.close()
def sort_reads(self):
while 1:
# read in four lines of data and increase counter
try:
read1, read2 = next(self.quarts)
read1 = list(read1)
self.filestat[0] += 1
except StopIteration:
break
# i7 barcodes (get from name string instead of sequence)
if self.data.hackersonly.demultiplex_on_i7_tags:
barcode = read1[0].decode().rsplit(":", 1)[-1].split("+")[0].strip()
else:
# COMBINATORIAL BARCODES (BCODE1+BCODE2)
if '3rad' in self.data.params.datatype:
barcode1 = find3radbcode(self.cutters, self.longbar[0], read1)
barcode2 = find3radbcode(self.cutters, self.longbar[2], read2)
barcode = barcode1 + "+" + barcode2
# USE BARCODE PARSER: length or splitting
else:
# Parse barcode. Uses the parsing function selected above.
barcode = self.demux(self.cutters, read1, self.longbar)
# ensure barcode is string
try:
barcode = barcode.decode()
except AttributeError:
pass
# find if it matches
sname_match = self.matchdict.get(barcode)
if sname_match:
# add to observed set of bars
self.dbars[sname_match].add(barcode)
self.filestat[1:] += 1
self.samplehits[sname_match] += 1
self.barhits[barcode] += 1
if barcode in self.barhits:
self.barhits[barcode] += 1
else:
self.barhits[barcode] = 1
# trim off barcode
lenbar1 = len(barcode)
if '3rad' in self.data.params.datatype:
## Iff 3rad trim the len of the first barcode
lenbar1 = len(barcode1)
lenbar2 = len(barcode2)
# no trim on i7 demux
if self.data.hackersonly.demultiplex_on_i7_tags:
lenbar1 = lenbar2 = 0
# for 2brad we trim the barcode AND the synthetic overhang
# The `+1` is because it trims the newline
if self.data.params.datatype == '2brad':
overlen = len(self.cutters[0][0]) + lenbar1 + 1
read1[1] = read1[1][:-overlen] + b"\n"
read1[3] = read1[3][:-overlen] + b"\n"
else:
read1[1] = read1[1][lenbar1:]
read1[3] = read1[3][lenbar1:]
# Trim barcode off R2 and append. Only 3rad datatype
# pays the cpu cost of splitting R2
if '3rad' in self.data.params.datatype:
read2 = list(read2)
read2[1] = read2[1][lenbar2:]
read2[3] = read2[3][lenbar2:]
# append to sorted reads list
self.read1s[sname_match].append(b"".join(read1).decode())
if 'pair' in self.data.params.datatype:
self.read2s[sname_match].append(b"".join(read2).decode())
else:
self.misses["_"] += 1
if barcode:
self.filestat[1] += 1
# Write to each sample file (pid's have different handles)
if not self.filestat[0] % int(1e6):
# write reads to file
write_to_file(self.data, self.read1s, 1, self.epid)
if 'pair' in self.data.params.datatype:
write_to_file(self.data, self.read2s, 2, self.epid)
# clear out lits of sorted reads
for sname in self.data.barcodes:
if "-technical-replicate-" in sname:
sname = sname.rsplit("-technical-replicate", 1)[0]
self.read1s[sname] = []
self.read2s[sname] = []
## write the remaining reads to file
write_to_file(self.data, self.read1s, 1, self.epid)
if 'pair' in self.data.params.datatype:
write_to_file(self.data, self.read2s, 2, self.epid)
## return stats in saved pickle b/c return_queue is too small
## and the size of the match dictionary can become quite large
samplestats = [self.samplehits, self.barhits, self.misses, self.dbars]
pklname = os.path.join(
self.data.dirs.fastqs,
"tmpdir",
"tmp_{}_{}.p".format(self.epid, self.fidx))
with open(pklname, 'wb') as wout:
pickle.dump([self.filestat, samplestats], wout)
return pklname
# used inside BarMatch to store stats nicely.
class Stats:
def __init__(self):
# stats for each raw input file
self.perfile = {}
# stats for each sample
self.fdbars = {}
self.fsamplehits = Counter()
self.fbarhits = Counter()
self.fmisses = Counter()
def fill_from_pickle(self, pkl, handle):
# load in stats pickle
with open(pkl, 'rb') as infile:
filestats, samplestats = pickle.load(infile)
## pull new stats
self.perfile[handle] += filestats
## update sample stats
samplehits, barhits, misses, dbars = samplestats
self.fsamplehits.update(samplehits)
self.fbarhits.update(barhits)
self.fmisses.update(misses)
self.fdbars.update(dbars)
# -------------------------------------
# EXTERNAL FUNCS
# -------------------------------------
def barmatch(args):
# run procesor
bar = BarMatch(*args)
# writes reads t ofile and writes stats to pickle
pkl = bar.run()
return pkl
# CALLED BY FILELINKER
def get_name_from_file(fname, splitnames, fields):
"Grab Sample names from demultiplexed input fastq file names"
# allowed extensions
file_extensions = [".gz", ".fastq", ".fq", ".fasta", ".clustS", ".consens"]
base, _ = os.path.splitext(os.path.basename(fname))
# remove read number from name