Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite pileup module #140

Merged
merged 19 commits into from Aug 17, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -27,7 +27,7 @@ jobs:
- stage: coverage
jdk: oraclejdk8
script:
- CLOVERAGE_VERSION=1.0.9 lein cloverage -e '^cljam.main' --codecov
- lein cloverage --codecov
- bash <(curl -s https://codecov.io/bash) -f target/coverage/codecov.json
- stage: deploy
jdk: oraclejdk8
Expand Down
18 changes: 18 additions & 0 deletions bench/cljam/algo/pileup_bench.clj
@@ -0,0 +1,18 @@
(ns cljam.algo.pileup-bench
(:require [libra.bench :refer :all]
[libra.criterium :as c]
[criterium.core :as criterium]
[cljam.test-common :as tcommon]
[cljam.algo.pileup :as pileup]
[cljam.io.sam :as sam]))

(defbench mpileup-large-bench
(tcommon/with-before-after {:before (tcommon/prepare-cavia!)}
(let [region {:chr "chr1"}]
(with-open [r (sam/reader tcommon/large-bam-file)]
(dorun (sam/read-alignments r region)) ;; warm up
(criterium/with-progress-reporting
(is
(c/quick-bench
(dorun (pileup/pileup r region)))))))))

8 changes: 4 additions & 4 deletions project.clj
Expand Up @@ -21,7 +21,7 @@
:plugins [[lein-binplus "0.6.4" :exclusions [org.clojure/clojure]]
[lein-codox "0.10.4"]
[lein-marginalia "0.9.1" :exclusions [org.clojure/clojure]]
[lein-cloverage "1.0.10" :exclusions [org.clojure/clojure org.tcrawley/dynapath]]
[lein-cloverage "1.0.11" :exclusions [org.clojure/clojure org.tcrawley/dynapath]]
[net.totakke/lein-libra "0.1.2"]]
:test-selectors {:default #(not-any? % [:slow :remote])
:slow :slow ; Slow tests with local resources
Expand All @@ -32,10 +32,10 @@
:1.7 {:dependencies [[org.clojure/clojure "1.7.0"]]}
:1.8 {:dependencies [[org.clojure/clojure "1.8.0"]]}
:1.9 {:dependencies [[org.clojure/clojure "1.9.0"]]}
:1.10 {:dependencies [[org.clojure/clojure "1.10.0-alpha4"]]}
:1.10 {:dependencies [[org.clojure/clojure "1.10.0-alpha6"]]}
:uberjar {:dependencies [[org.clojure/clojure "1.9.0"]
[org.apache.logging.log4j/log4j-api "2.11.0"]
[org.apache.logging.log4j/log4j-core "2.11.0"]]
[org.apache.logging.log4j/log4j-api "2.11.1"]
[org.apache.logging.log4j/log4j-core "2.11.1"]]
:resource-paths ["bin-resources"]
:main cljam.tools.main
:jvm-opts ["-Dclojure.compiler.direct-linking=true"]
Expand Down
209 changes: 200 additions & 9 deletions src/cljam/algo/pileup.clj
@@ -1,17 +1,208 @@
(ns cljam.algo.pileup
"Functions to calculate pileup from the BAM."
(:require [cljam.io.sam :as sam]
[cljam.algo.pileup.mpileup :as mplp]))
[cljam.io.sam.util.cigar :as cigar]
[cljam.io.sam.util.flag :as flag]
[cljam.io.sam.util.quality :as qual]
[cljam.io.sam.util.refs :as refs]
[cljam.io.pileup :as plpio])
(:import [cljam.io.protocols SAMAlignment]
[cljam.io.pileup PileupBase LocusPile]))

(defn- cover-locus? [^long pos ^SAMAlignment aln]
(<= (.pos aln) pos))

(defn pileup-seq
"Returns a lazy sequence that each element contains [position (alignments
piled-up at the locus)]."
[^long start ^long end alns]
(letfn [(step [^long pos prev-buf rest-alns]
(lazy-seq
(when (< (dec pos) end)
(let [[i rests] (split-with (partial cover-locus? pos) rest-alns)
p (some-> rests ^SAMAlignment first .pos)
[b :as buf] (into (filterv #(<= pos (.end ^SAMAlignment %))
prev-buf) i)
next-pos (if (and p (not b)) p (inc pos))]
(if b
(cons [pos buf] (step next-pos buf rests))
(when (and p (<= p end))
(step next-pos buf rests)))))))]
(let [[x :as xs] (drop-while #(< (.end ^SAMAlignment %) start) alns)]
(when x
(step (max start (.pos ^SAMAlignment x)) [] xs)))))

(defn- quals-at-ref
[idx ^String qual]
(let [empty-qual? (and (= (.length qual) 1)
(= (.charAt qual 0) \*))]
(if empty-qual?
(vec (repeat (count idx) 93)) ;; \~
(mapv (fn [[_ x]]
(if (number? x)
(qual/fastq-char->phred-byte (.charAt qual x))
93))
idx))))

(defn- seqs-at-ref
[idx ^String s]
(mapv (fn [[op x xs]]
(let [c (if (number? x) (.charAt s x) x)]
(case op
:m [c]
:d [c xs]
:i [c (subs s (first xs) (last xs))]))) idx))

(defn index-cigar
"Align bases and base quality scores with the reference coordinate."
[^SAMAlignment aln]
(let [idx (cigar/to-index (.cigar aln))]
(assoc aln
:seqs-at-ref (seqs-at-ref idx (:seq aln))
:quals-at-ref (quals-at-ref idx (.qual aln)))))

(defn basic-mpileup-pred
"Basic predicate function for filtering alignments for mpileup."
[^long min-mapq]
(fn [^SAMAlignment aln]
(let [flag (.flag aln)]
(and flag
(zero? (bit-and (flag/encoded #{:unmapped
:filtered-out
:duplicated
:supplementary
:secondary}) flag))
(or (not (flag/multiple? flag)) (flag/properly-aligned? flag))
(not-empty (.cigar aln))
(<= min-mapq (.mapq aln))))))

(defn resolve-base
"Find a piled-up base and an indel from a alignment."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"an alignment" or "the alignment"

[^long ref-pos ^SAMAlignment aln]
(let [relative-pos (- ref-pos (.pos aln))
qual ((:quals-at-ref aln) relative-pos)
[base indel] ((:seqs-at-ref aln) relative-pos)]
(PileupBase.
(zero? relative-pos)
(when (zero? relative-pos) (.mapq aln))
base
qual
(flag/reversed? (.flag aln))
(= ref-pos (.end aln))
(when-not (number? indel) indel)
(when (number? indel) indel)
aln)))

(defn- resolve-bases
[[ref-pos alns]]
[ref-pos (mapv (partial resolve-base ref-pos) alns)])

(defn ->locus-pile
"Convert a pile into `cljam.io.pileup.LocusPile`."
[chr [pos pile]]
(let [c (count pile)]
(when (pos? c)
(LocusPile. chr pos \N c pile))))

(defn- correct-qual
"Correct quality of two overlapped mate reads by setting zero quality for one
of the base."
[^PileupBase r1 ^PileupBase r2]
(let [b1 (.base r1)
q1 (.qual r1)
b2 (.base r2)
q2 (.qual r2)]
(if (= b1 b2)
[(assoc r1 :qual (min 200 (+ q1 q2))) (assoc r2 :qual 0)]
(if (<= q2 q1)
[(assoc r1 :qual (int (* 0.8 q1))) (assoc r2 :qual 0)]
[(assoc r1 :qual 0) (assoc r2 :qual (int (* 0.8 q2)))]))))

(defn correct-overlapped-bases
"Find out overlapped bases and tweak their base quality scores."
[xs]
(if (<= (count xs) 1)
xs
(->> xs
(group-by (fn [x] (.qname ^SAMAlignment (.alignment ^PileupBase x))))
(into [] (mapcat
(fn [[_ xs]]
(if (<= (count xs) 1) xs (apply correct-qual xs))))))))

(defn- correct-overlaps
[pile]
(update pile 1 correct-overlapped-bases))

(defn filter-by-base-quality
"Returns a predicate for filtering piled-up reads by base quality at its
position."
[min-base-quality]
(fn [p]
(->> #(<= min-base-quality (.qual ^PileupBase %))
(partial filterv)
(update p 1))))

(defn pileup
"Piles up alignments in given region and returns a lazy sequence of
`cljam.io.pileup.LocusPile`s."
([sam-reader region]
(pileup sam-reader region {}))
([sam-reader
{:keys [chr start end] :or {start 1 end Integer/MAX_VALUE}}
{:keys [min-base-quality min-map-quality]
:or {min-base-quality 13 min-map-quality 0}}]
(when-let [len (:len (refs/ref-by-name (sam/read-refs sam-reader) chr))]
(let [s (max 1 start)
e (min len end)]
(->> {:chr chr :start s :end e}
(sam/read-alignments sam-reader)
(sequence
(comp
(filter (basic-mpileup-pred min-map-quality))
(map index-cigar)))
(pileup-seq s e)
(sequence
(comp (map resolve-bases)
(map correct-overlaps)
(map (filter-by-base-quality min-base-quality))
(keep (partial ->locus-pile chr)))))))))

(defn align-pileup-seqs
"Align multiple pileed-up seqs."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/pileed-up/piled-up/

[& xs]
(if (<= (count xs) 1)
(map (fn [{:keys [pos] :as m}] [pos [m]]) (first xs))
(letfn [(step [xs]
(lazy-seq
(when (some seq xs)
(let [min-pos (apply min (keep (comp :pos first) xs))]
(cons [min-pos (mapv
(fn [[{:keys [pos] :as m}]]
(when (= pos min-pos) m)) xs)]
(step (mapv
(fn [[{:keys [pos]} :as ys]]
(if (= pos min-pos) (next ys) ys)) xs)))))))]
(step xs))))

(defn mpileup
"Returns a lazy sequence of cljam.algo.pileup.mpileup.MPileupElement
calculated from FASTA and BAM. If start and end are not supplied, piles whole
range up."
([bam-reader region] (mpileup nil bam-reader region))
([ref-reader bam-reader region] (mplp/pileup ref-reader bam-reader region)))
"Pile up alignments from multiple sources."
[region & sam-readers]
(apply align-pileup-seqs (map #(pileup % region) sam-readers)))

(defn create-mpileup
"Creates a mpileup file from the BAM file."
[in-bam out-mplp]
(with-open [r (sam/bam-reader in-bam)]
(mplp/create-mpileup out-mplp nil r)))
([in-sam out-mplp]
(create-mpileup in-sam nil out-mplp))
([in-sam in-ref out-mplp]
(create-mpileup in-sam in-ref out-mplp nil))
([in-sam in-ref out-mplp region]
(with-open [s (sam/reader in-sam)
w (plpio/writer out-mplp in-ref)]
(let [regs (if region
[region]
(map
(fn [{:keys [name len]}]
{:chr name :start 1 :end len})
(sam/read-refs s)))]
(doseq [reg regs]
(plpio/write-piles w (pileup s reg)))))))
4 changes: 0 additions & 4 deletions src/cljam/algo/pileup/common.clj

This file was deleted.