Skip to content

Commit

Permalink
path fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Apr 5, 2017
1 parent d76924b commit 2a7834f
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 21 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ deps ++= Seq(
libs.value('loci),
libs.value('log4j),
libs.value('parquet_avro),
libs.value('paths),
libs.value('spark_util),
"com.netflix.servo" % "servo-core" % "0.10.0",
"it.unimi.dsi" % "fastutil" % "6.6.5",
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
resolvers += Resolver.sonatypeRepo("snapshots")
addSbtPlugin("org.hammerlab" % "sbt-parent" % "1.7.3")
addSbtPlugin("org.hammerlab" % "sbt-parent" % "1.7.4-SNAPSHOT")
42 changes: 26 additions & 16 deletions src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)
}

val records = sc.newAPIHadoopFile(
path.toString,
path.uri.toString,
classOf[ParquetInputFormat[T]],
classOf[Void],
manifest[T].runtimeClass.asInstanceOf[Class[T]],
Expand Down Expand Up @@ -536,7 +536,7 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)
} catch {
case e: Throwable
log.error(
s"Loading header failed for $fp:n${e.getMessage}\n\t${e.getStackTrace.take(25).map(_.toString).mkString("\n\t")}"
s"Loading header failed for $fp:n${e.getMessage}\n\t${e.getStackTrace.take(25).mkString("\n\t")}"
)
false
}
Expand Down Expand Up @@ -589,7 +589,7 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)
} catch {
case e: Throwable =>
log.error(
s"Loading failed for $fp:n${e.getMessage}\n\t${e.getStackTrace.take(25).map(_.toString).mkString("\n\t")}"
s"Loading failed for $fp:n${e.getMessage}\n\t${e.getStackTrace.take(25).mkString("\n\t")}"
)
None
}
Expand All @@ -613,7 +613,7 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)
filteredFiles.map(
p
sc.newAPIHadoopFile(
p.toString,
p.uri.toString,
classOf[AnySAMInputFormat],
classOf[LongWritable],
classOf[SAMRecordWritable],
Expand All @@ -623,7 +623,7 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)
)
else
sc.newAPIHadoopFile(
path.toString,
path.uri.toString,
classOf[AnySAMInputFormat],
classOf[LongWritable],
classOf[SAMRecordWritable],
Expand Down Expand Up @@ -683,10 +683,19 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)
val conf = getConfiguration(job)
BAMInputFormat.setIntervals(conf, loci.toHtsJDKIntervals)

val records = sc.union(bamFiles.map(p => {
sc.newAPIHadoopFile(p.toString, classOf[BAMInputFormat], classOf[LongWritable],
classOf[SAMRecordWritable], conf)
}))
val records =
sc.union(
bamFiles.map(
p
sc.newAPIHadoopFile(
p.uri.toString,
classOf[BAMInputFormat],
classOf[LongWritable],
classOf[SAMRecordWritable],
conf
)
)
)

if (Metrics.isRecording)
records.instrument()
Expand All @@ -695,7 +704,8 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)
val samRecordConverter = new SAMRecordConverter
AlignmentRecordRDD(
records
.map(p => samRecordConverter.convert(p._2.get))
.values
.map(r samRecordConverter.convert(r.get))
.filter(r =>
ReferenceRegion
.opt(r)
Expand Down Expand Up @@ -854,7 +864,7 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)

val job = HadoopUtil.newJob(sc)
val records = sc.newAPIHadoopFile(
path.toString,
path.uri.toString,
classOf[InterleavedFastqInputFormat],
classOf[Void],
classOf[Text],
Expand Down Expand Up @@ -970,7 +980,7 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)

val job = HadoopUtil.newJob(sc)
val records = sc.newAPIHadoopFile(
path.toString,
path.uri.toString,
classOf[SingleFastqInputFormat],
classOf[Void],
classOf[Text],
Expand Down Expand Up @@ -1024,7 +1034,7 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)
}

sc.newAPIHadoopFile(
path.toString,
path.uri.toString,
classOf[VCFInputFormat],
classOf[LongWritable],
classOf[VariantContextWritable],
Expand Down Expand Up @@ -1163,7 +1173,7 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)
fragmentLength: Long): NucleotideContigFragmentRDD = {
val fastaData: RDD[(LongWritable, Text)] =
sc.newAPIHadoopFile(
path.toString,
path.uri.toString,
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text]
Expand Down Expand Up @@ -1196,7 +1206,7 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)
val job = HadoopUtil.newJob(sc)
val records =
sc.newAPIHadoopFile(
path.toString,
path.uri.toString,
classOf[InterleavedFastqInputFormat],
classOf[Void],
classOf[Text],
Expand Down Expand Up @@ -1301,7 +1311,7 @@ class ADAMContext(val sc: SparkContext)(implicit factory: Factory)
}

def textFile(path: Path, minPartitionsOpt: Option[Int]): RDD[String] =
sc.textFile(path.toString(), minPartitionsOpt.getOrElse(sc.defaultParallelism))
sc.textFile(path.uri.toString(), minPartitionsOpt.getOrElse(sc.defaultParallelism))

/**
* Loads features stored in BED6/12 format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,20 +290,20 @@ class AlignmentRecordRDDSuite
test("convert malformed FASTQ (no quality scores) => SAM => well-formed FASTQ => SAM") {
val noqualPath = testFile("fastq_noqual.fq")

//read FASTQ (malformed)
// read FASTQ (malformed)
val rddA = sc.loadFastq(noqualPath, None, None, LENIENT)

val noqualAPath = tmpLocation(".sam")

//write SAM (fixed and now well-formed)
// write SAM (fixed and now well-formed)
rddA.saveAsSam(noqualAPath)

//read SAM
// read SAM
val rddB = sc.loadAlignments(noqualAPath)

val noqualBPath = tmpLocation(".fastq")

//write FASTQ (well-formed)
// write FASTQ (well-formed)
rddB.saveAsFastq(noqualBPath)

//read FASTQ (well-formed)
Expand Down

0 comments on commit 2a7834f

Please sign in to comment.