-
Notifications
You must be signed in to change notification settings - Fork 304
/
ADAMSAMOutputFormat.scala
108 lines (92 loc) · 3.63 KB
/
ADAMSAMOutputFormat.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.read
import htsjdk.samtools.SAMFileHeader
import hbparquet.hadoop.util.ContextUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext }
import org.apache.spark.rdd.InstrumentedOutputFormat
import org.bdgenomics.adam.instrumentation.Timers
import org.seqdoop.hadoop_bam.{
KeyIgnoringAnySAMOutputFormat,
KeyIgnoringSAMRecordWriter,
SAMFormat,
SAMRecordWritable
}
object ADAMSAMOutputFormat extends Serializable {
private[read] var header: Option[SAMFileHeader] = None
/**
* Attaches a header to the ADAMSAMOutputFormat Hadoop writer. If a header has previously
* been attached, the header must be cleared first.
*
* @throws Exception Exception thrown if a SAM header has previously been attached, and not cleared.
*
* @param samHeader Header to attach.
*
* @see clearHeader
*/
def addHeader(samHeader: SAMFileHeader) {
assert(header.isEmpty, "Cannot attach a new SAM header without first clearing the header.")
header = Some(samHeader)
}
/**
* Clears the attached header.
*
* @see addHeader
*/
def clearHeader() {
header = None
}
/**
* Returns the current header.
*
* @return Current SAM header.
*/
private[read] def getHeader: SAMFileHeader = {
assert(header.isDefined, "Cannot return header if not attached.")
header.get
}
}
class ADAMSAMOutputFormat[K]
extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable {
setSAMHeader(ADAMSAMOutputFormat.getHeader)
}
class InstrumentedADAMSAMOutputFormat[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
override def timerName(): String = Timers.WriteSAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMSAMOutputFormat[K]]
}
class ADAMSAMOutputFormatHeaderLess[K]
extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable {
setWriteHeader(false)
override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = ContextUtil.getConfiguration(context)
// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))
// read the header file
readSAMHeaderFrom(path, conf)
// now that we have the header set, we need to make a record reader
return new KeyIgnoringSAMRecordWriter(getDefaultWorkFile(context, ""),
header,
false,
context)
}
}
class InstrumentedADAMSAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
override def timerName(): String = Timers.WriteSAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMSAMOutputFormatHeaderLess[K]]
}