Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAM-625] Enable globbing for BAM. #626

Merged
merged 1 commit into from Mar 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 21 additions & 1 deletion adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala
Expand Up @@ -85,6 +85,14 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var coalesce: Int = -1
@Args4jOption(required = false, name = "-sort_fastq_output", usage = "Sets whether to sort the FASTQ output, if saving as FASTQ. False by default. Ignored if not saving as FASTQ.")
var sortFastqOutput: Boolean = false
@Args4jOption(required = false, name = "-force_load_bam", usage = "Forces Transform to load from BAM/SAM.")
var forceLoadBam: Boolean = false
@Args4jOption(required = false, name = "-force_load_fastq", usage = "Forces Transform to load from unpaired FASTQ.")
var forceLoadFastq: Boolean = false
@Args4jOption(required = false, name = "-force_load_ifastq", usage = "Forces Transform to load from interleaved FASTQ.")
var forceLoadIFastq: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean fasta? or just one extra cmd+p?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a missing i --> interleaved; I had fixed it locally but forgot to commit before pushing.

@Args4jOption(required = false, name = "-force_load_parquet", usage = "Forces Transform to load from Parquet.")
var forceLoadParquet: Boolean = false
}

class Transform(protected val args: TransformArgs) extends ADAMSparkCommand[TransformArgs] with Logging {
Expand Down Expand Up @@ -155,7 +163,19 @@ class Transform(protected val args: TransformArgs) extends ADAMSparkCommand[Tran
}

def run(sc: SparkContext, job: Job) {
this.apply(sc.loadAlignments(args.inputPath)).adamSave(args)
this.apply({
if (args.forceLoadBam) {
sc.loadBam(args.inputPath)
} else if (args.forceLoadFastq) {
sc.loadUnpairedFastq(args.inputPath)
} else if (args.forceLoadIFastq) {
sc.loadInterleavedFastq(args.inputPath)
} else if (args.forceLoadParquet) {
sc.loadParquetAlignments(args.inputPath)
} else {
sc.loadAlignments(args.inputPath)
}
}).adamSave(args)
}

private def createKnownSnpsTable(sc: SparkContext): SnpTable = CreateKnownSnpsTable.time {
Expand Down
Expand Up @@ -71,6 +71,10 @@ class RecordGroupDictionary(val recordGroups: Seq[RecordGroup]) extends Serializ
assert(recordGroupMap.size == recordGroups.length,
"Read group dictionary contains multiple samples with identical read group names.")

def ++(that: RecordGroupDictionary): RecordGroupDictionary = {
new RecordGroupDictionary(recordGroups ++ that.recordGroups)
}

/**
* Returns the numerical index for a given record group name.
*
Expand Down
Expand Up @@ -94,7 +94,6 @@ class SequenceDictionary(val records: Vector[SequenceRecord]) extends Serializab

def +(record: SequenceRecord): SequenceDictionary = this ++ SequenceDictionary(record)
def ++(that: SequenceDictionary): SequenceDictionary = {
assert(this.isCompatibleWith(that))
new SequenceDictionary(records ++ that.records.filter(r => !byName.contains(r.name)))
}

Expand Down
29 changes: 23 additions & 6 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -229,12 +229,29 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
def loadBam(
filePath: String): RDD[AlignmentRecord] = {

// We need to separately read the header, so that we can inject the sequence dictionary
// data into each individual Read (see the argument to samRecordConverter.convert,
// below).
val samHeader = SAMHeaderReader.readSAMHeaderFrom(new Path(filePath), sc.hadoopConfiguration)
val seqDict = adamBamDictionaryLoad(samHeader)
val readGroups = adamBamLoadReadGroups(samHeader)
val (seqDict, readGroups) = FileSystem.get(sc.hadoopConfiguration)
.globStatus(new Path(filePath))
.map(fs => fs.getPath)
.flatMap(fp => {

try {
// We need to separately read the header, so that we can inject the sequence dictionary
// data into each individual Read (see the argument to samRecordConverter.convert,
// below).
val samHeader = SAMHeaderReader.readSAMHeaderFrom(fp, sc.hadoopConfiguration)
log.info("Loaded header from " + fp)
val sd = adamBamDictionaryLoad(samHeader)
val rg = adamBamLoadReadGroups(samHeader)
Some((sd, rg))
} catch {
case _: Throwable => {
log.error("Loading failed for " + fp)
None
}
}
}).reduce((kv1, kv2) => {
(kv1._1 ++ kv2._1, kv1._2 ++ kv2._2)
})

val job = HadoopUtil.newJob(sc)
val records = sc.newAPIHadoopFile(filePath, classOf[AnySAMInputFormat], classOf[LongWritable],
Expand Down