Skip to content

Commit

Permalink
Refactor BAM I/O.
Browse files Browse the repository at this point in the history
  • Loading branch information
alumi committed Aug 10, 2017
1 parent 4751237 commit 0916d3d
Show file tree
Hide file tree
Showing 20 changed files with 660 additions and 913 deletions.
4 changes: 1 addition & 3 deletions src/cljam/algo/bam_indexer.clj
Expand Up @@ -9,6 +9,4 @@
[in-bam out-bai & {:keys [n-threads] :or {n-threads 0}}]
(with-open [r (sam/bam-reader in-bam)]
(binding [*n-threads* n-threads]
(bai-core/create-index out-bai
(sam/read-blocks r {} {:mode :pointer})
(sam/read-refs r)))))
(bai-core/create-index out-bai (sam/read-blocks r) (sam/read-refs r)))))
41 changes: 19 additions & 22 deletions src/cljam/algo/convert.clj
Expand Up @@ -5,45 +5,42 @@
[cljam.io.sam.util :as sam-util]
[cljam.io.util :as io-util]
[com.climate.claypoole :as cp])
(:import [cljam.io.bam.writer BAMWriter]))
(:import [java.nio ByteBuffer]
[cljam.io.bam.writer BAMWriter]))

(def ^:private default-num-block 100000)
(def ^:private default-num-write-block 10000)

(defn- sam-write-alignments [rdr wtr hdr num-block num-write-block]
(defn- sam-write-alignments [rdr wtr hdr num-block]
(doseq [alns (partition-all num-block (sam/read-alignments rdr {}))]
(sam/write-alignments wtr alns hdr)))

(defn- bam-write-alignments [rdr wtr hdr num-block num-write-block]
(let [refs (sam-util/make-refs hdr)
w (.writer ^BAMWriter wtr)]
(defn- bam-write-alignments [rdr wtr hdr num-block]
(let [refs (sam-util/make-refs hdr)]
(cp/with-shutdown! [pool (cp/threadpool (cp/ncpus))]
(doseq [alns (partition-all num-block (sam/read-alignments rdr {}))]
(let [blocks (doall (cp/pmap pool
(fn [lalns]
(doall (map #(encoder/encode % refs)
lalns)))
(partition-all num-write-block alns)))]
(doseq [block blocks]
(doseq [e block]
(encoder/write-encoded-alignment w e))))))))
(doseq [blocks (cp/pmap pool
(fn [chunk]
(mapv #(let [bb (ByteBuffer/allocate (encoder/get-block-size %))]
(encoder/encode-alignment bb % refs)
{:data (.array bb)})
chunk))
(partition-all num-block (sam/read-alignments rdr {})))]
(sam/write-blocks wtr blocks)))))

(defn- _convert!
[rdr wtr num-block num-write-block write-alignments-fn]
[rdr wtr num-block write-alignments-fn]
(let [hdr (sam/read-header rdr)]
(sam/write-header wtr hdr)
(sam/write-refs wtr hdr)
(write-alignments-fn rdr wtr hdr num-block num-write-block)))
(write-alignments-fn rdr wtr hdr num-block)))

(defn convert
"Converts file format from input file to output file by the file extension."
[in out & {:keys [num-block num-write-block]
:or {num-block default-num-block
num-write-block default-num-write-block}}]
[in out & {:keys [num-block]
:or {num-block default-num-block}}]
(with-open [rdr (sam/reader in)
wtr (sam/writer out)]
(cond
(io-util/sam-writer? wtr) (_convert! rdr wtr num-block num-write-block sam-write-alignments)
(io-util/bam-writer? wtr) (_convert! rdr wtr num-block num-write-block bam-write-alignments)
(io-util/sam-writer? wtr) (_convert! rdr wtr num-block sam-write-alignments)
(io-util/bam-writer? wtr) (_convert! rdr wtr num-block bam-write-alignments)
:else (throw (ex-info (str "Unsupported output file format " out) {}))))
nil)
17 changes: 9 additions & 8 deletions src/cljam/algo/depth.clj
Expand Up @@ -4,7 +4,8 @@
[cljam.common :as common]
[cljam.util :as util]
[cljam.io.sam :as sam]
[cljam.io.sam.util :as sam-util]))
[cljam.io.sam.util :as sam-util])
(:import [cljam.io.protocols SAMRegionBlock]))

(def ^:const default-step 1000000)

Expand All @@ -17,7 +18,7 @@
(let [pile (long-array (inc (- end beg)))]
(doseq [aln alns]
(let [left (max (:pos aln) beg)
right (min (sam-util/get-end aln) end)
right (min (:end aln) end)
left-index (- left beg)]
(dotimes [i (inc (- right left))]
(aset-long pile (+ i left-index) (inc (aget pile (+ i left-index)))))))
Expand All @@ -28,7 +29,7 @@
[rdr rname start end step]
(let [n-threads (common/get-exec-n-threads)
read-fn (fn [r start end]
(sam/read-alignments r {:chr rname :start start :end end :depth :shallow}))
(sam/read-blocks r {:chr rname :start start :end end} {:mode :region}))
count-fn (fn [xs]
(if (= n-threads 1)
(map (fn [[start end]]
Expand Down Expand Up @@ -62,9 +63,9 @@
(let [beg (int beg)
end (int end)
offset (int offset)]
(doseq [aln alns]
(let [left (Math/max ^int (:pos aln) (unchecked-int beg))
right (unchecked-inc-int (or (:end aln) (sam-util/get-end aln)))
(doseq [^SAMRegionBlock aln alns]
(let [left (Math/max ^int (.pos aln) beg)
right (unchecked-inc-int (.end aln))
left-index (unchecked-add-int (unchecked-subtract-int left beg) offset)
right-index (unchecked-add-int (unchecked-subtract-int right beg) offset)]
(aset-int pile left-index (unchecked-inc-int (aget pile left-index)))
Expand Down Expand Up @@ -103,12 +104,12 @@
(let [pile (int-array (inc (- end start)))
f (if unchecked? unchecked-aset-depth-in-region! aset-depth-in-region!)]
(if (= n-threads 1)
(f (sam/read-alignments rdr region {:depth :shallow}) start end 0 pile)
(f (sam/read-blocks rdr region {:mode :region}) start end 0 pile)
(cp/pdoseq
n-threads
[[s e] (util/divide-region start end step)]
(with-open [r (sam/clone-bam-reader rdr)]
(-> (sam/read-alignments r {:chr chr, :start s, :end e} {:depth :shallow})
(-> (sam/read-blocks r {:chr chr, :start s, :end e} {:mode :region})
(f s e (- s start) pile)))))
pile))

Expand Down
2 changes: 1 addition & 1 deletion src/cljam/algo/pileup/mpileup.clj
Expand Up @@ -137,7 +137,7 @@
refseq (if ref-reader
(cseq/read-sequence ref-reader {:chr chr :start s :end e})
(repeat \N))]
(->> (sam/read-alignments bam-reader {:chr chr :start s :end e :depth :deep})
(->> (sam/read-alignments bam-reader {:chr chr :start s :end e})
(sequence
(comp
(filter basic-mpileup-pred)
Expand Down
4 changes: 2 additions & 2 deletions src/cljam/io/bam/core.clj
Expand Up @@ -2,7 +2,7 @@
"The core of BAM features."
(:require [clojure.java.io :as cio]
[clojure.string :as cstr]
[cljam.io.bam [common :refer [bam-magic]]
[cljam.io.bam [common :as common]
[reader :as reader]
[writer :as writer]]
[cljam.io.bam-index :as bai]
Expand Down Expand Up @@ -33,7 +33,7 @@
[f]
(let [rdr (BGZFInputStream. (cio/file f))
data-rdr (DataInputStream. rdr)]
(when-not (Arrays/equals ^bytes (lsb/read-bytes data-rdr 4) (.getBytes ^String bam-magic))
(when-not (Arrays/equals ^bytes (lsb/read-bytes data-rdr 4) (.getBytes ^String common/bam-magic))
(throw (IOException. "Invalid BAM file")))
(let [{:keys [header refs]} (reader/load-headers data-rdr)
index-delay (delay (bam-index f))]
Expand Down

0 comments on commit 0916d3d

Please sign in to comment.