/
dedupe.clj
66 lines (62 loc) · 2.61 KB
/
dedupe.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
(ns cljam.algo.dedupe
"Functions to remove duplications."
(:refer-clojure :exclude [dedupe])
(:require [com.climate.claypoole :as cp]
[cljam.io.sam :as sam]
[cljam.io.sam.util.flag :as flag]))
(defn- refs->regions [refs]
(for [{:keys [name len]} refs]
{:chr name :start 1 :end len}))
(defn- sum-quals [a]
(when-let [q ^String (:qual a)]
(when-not (and (= (.length q) 1) (= \* (.charAt q 0)))
(reduce (fn [r x] (+ r (- (int x) 33))) 0 q))))
(defn dedupe-xform
"Returns a transducer which removes PCR duplications."
[& {:keys [remove-dups] :or {remove-dups true}}]
(let [removed (atom #{})
cp (juxt sum-quals :mapq)]
(comp
(partition-by (juxt :rname :pos))
(mapcat
(fn [alns]
(loop [[{:keys [pos tlen qname flag] :as x} :as xs] alns heads {} tails [] dups []]
(if x
(cond
(not (and (flag/multiple? flag)
(not (flag/both-unmapped? flag))
(= (:rnext x) "="))) (recur (next xs) heads (conj tails x) dups)
(pos? tlen) (let [k [pos tlen]]
(if-let [v (get heads k)]
(let [[good bad] (if (pos? (compare (cp v) (cp x))) [v x] [x v])]
(swap! removed conj (:qname bad))
(recur (next xs) (assoc heads k good) tails (conj dups bad)))
(recur (next xs) (assoc heads k x) tails dups)))
(@removed qname) (do (swap! removed disj qname) (recur (next xs) heads tails (conj dups x)))
:else (recur (next xs) heads (conj tails x) dups))
(concat
(vals heads)
tails
(when-not remove-dups
(map (fn [a] (update a :flag #(bit-or % 1024))) dups))))))))))
(defn dedupe
"Removes PCR duplications from paired-end alignments."
[in out & {:keys [remove-dups] :or {remove-dups true}}]
(cp/with-shutdown! [pool (cp/ncpus)]
(let [[header refs] (with-open [r (sam/bam-reader in)] [(sam/read-header r) (sam/read-refs r)])]
(with-open [w (sam/bam-writer out)]
(sam/write-header w header)
(sam/write-refs w header)
(sam/write-alignments
w
(->> refs
refs->regions
(cp/pmap
pool
(fn [region]
(with-open [r (sam/bam-reader in)]
(->> (sam/read-alignments r region)
(sequence (dedupe-xform :remove-dups remove-dups))
doall))))
(sequence cat))
header)))))