Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Jun 1, 2019
1 parent a785087 commit d219c8e
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 92 deletions.
Expand Up @@ -42,24 +42,24 @@ private[adam] object FastaSequenceConverter {
seqId: Int = 0,
descriptionLine: Option[String] = None) {
/**
* The contig name and description that was parsed out of this description line.
* The reference name and description that was parsed out of this description line.
*/
val (contigName, contigDescription) = parseDescriptionLine(descriptionLine, fileIndex)
val (referenceName, referenceDescription) = parseDescriptionLine(descriptionLine, fileIndex)

/**
* Parses the text of a given line.
*
* Assumes that the line contains the contig name followed by an optional
* description of the contig, with the two separated by a space.
* Assumes that the line contains the reference name followed by an optional
* description of the reference, with the two separated by a space.
*
* @throws IllegalArgumentException if there is no name in the line and the
* line is not the only record in a file (i.e., the file contains multiple
* contigs).
* @param descriptionLine The optional string describing the contig. If this
* references).
* @param descriptionLine The optional string describing the reference. If this
* is not set and this isn't the only line in the file, we throw.
* @param id The index of this contig in the file.
* @return Returns a tuple containing (the optional contig name, and the
* optional contig description).
* @param id The index of this reference in the file.
* @return Returns a tuple containing (the optional reference name, and the
* optional reference description).
*/
private def parseDescriptionLine(descriptionLine: Option[String],
id: Long): (Option[String], Option[String]) = {
Expand All @@ -76,10 +76,10 @@ private[adam] object FastaSequenceConverter {
if (split._1.contains('|')) {
(None, Some(dL.stripPrefix(">").trim))
} else {
val contigName: String = split._1.stripPrefix(">").trim
val contigDescription: String = split._2.trim
val referenceName: String = split._1.stripPrefix(">").trim
val referenceDescription: String = split._2.trim

(Some(contigName), Some(contigDescription))
(Some(referenceName), Some(referenceDescription))
}
} else {
(Some(dL.stripPrefix(">").trim), None)
Expand All @@ -106,33 +106,33 @@ private[adam] object FastaSequenceConverter {
.filter((kv: (Long, String)) => !kv._2.startsWith(";"))

val descriptionLines: Map[Long, FastaDescriptionLine] = getDescriptionLines(filtered)
val indexToContigDescription = rdd.context.broadcast(descriptionLines)
val indexToReferenceDescription = rdd.context.broadcast(descriptionLines)

val sequenceLines = filtered.filter(kv => !isDescriptionLine(kv._2))

val keyedSequences =
if (indexToContigDescription.value.isEmpty) {
if (indexToReferenceDescription.value.isEmpty) {
sequenceLines.keyBy(kv => -1L)
} else {
sequenceLines.keyBy(row => findContigIndex(row._1, indexToContigDescription.value.keys.toList))
sequenceLines.keyBy(row => findReferenceIndex(row._1, indexToReferenceDescription.value.keys.toList))
}

val groupedContigs = keyedSequences.groupByKey()
val groupedReferences = keyedSequences.groupByKey()

val converter = new FastaSequenceConverter(alphabet)

groupedContigs.flatMap {
groupedReferences.flatMap {
case (id, lines) =>

val descriptionLine = indexToContigDescription.value.getOrElse(id, FastaDescriptionLine())
val descriptionLine = indexToReferenceDescription.value.getOrElse(id, FastaDescriptionLine())
assert(lines.nonEmpty, s"Sequence ${descriptionLine.seqId} has no sequence data.")

val sequence: Seq[String] = lines.toSeq.sortBy(_._1).map(kv => cleanSequence(kv._2))
converter.convert(
descriptionLine.contigName,
descriptionLine.referenceName,
descriptionLine.seqId,
sequence,
descriptionLine.contigDescription
descriptionLine.referenceDescription
)
}
}
Expand Down Expand Up @@ -179,18 +179,18 @@ private[adam] object FastaSequenceConverter {
}

/**
* Finds the index of a contig.
* Finds the index of a reference.
*
* The index of a contig is the highest index below the index of our row.
* The index of a reference is the highest index below the index of our row.
* Here, we define the index as the row number of the description line that
* describes this contig.
* describes this reference.
*
* @param rowIdx The row number of the contig row to check.
* @param rowIdx The row number of the reference row to check.
* @param indices A list containing the row numbers of all description lines.
* @return Returns the row index of the description line that describes this
* sequence line.
*/
private[converters] def findContigIndex(rowIdx: Long, indices: List[Long]): Long = {
private[converters] def findReferenceIndex(rowIdx: Long, indices: List[Long]): Long = {
val idx = indices.filter(_ <= rowIdx)
idx.max
}
Expand Down
Expand Up @@ -42,24 +42,24 @@ private[adam] object FastaSliceConverter {
seqId: Int = 0,
descriptionLine: Option[String] = None) {
/**
* The contig name and description that was parsed out of this description line.
* The reference name and description that was parsed out of this description line.
*/
val (contigName, contigDescription) = parseDescriptionLine(descriptionLine, fileIndex)
val (referenceName, referenceDescription) = parseDescriptionLine(descriptionLine, fileIndex)

/**
* Parses the text of a given line.
*
* Assumes that the line contains the contig name followed by an optional
* description of the contig, with the two separated by a space.
* Assumes that the line contains the reference name followed by an optional
* description of the reference, with the two separated by a space.
*
* @throws IllegalArgumentException if there is no name in the line and the
* line is not the only record in a file (i.e., the file contains multiple
* contigs).
* @param descriptionLine The optional string describing the contig. If this
* references).
* @param descriptionLine The optional string describing the reference. If this
* is not set and this isn't the only line in the file, we throw.
* @param id The index of this contig in the file.
* @return Returns a tuple containing (the optional contig name, and the
* optional contig description).
* @param id The index of this reference in the file.
* @return Returns a tuple containing (the optional reference name, and the
* optional reference description).
*/
private def parseDescriptionLine(descriptionLine: Option[String],
id: Long): (Option[String], Option[String]) = {
Expand All @@ -76,10 +76,10 @@ private[adam] object FastaSliceConverter {
if (split._1.contains('|')) {
(None, Some(dL.stripPrefix(">").trim))
} else {
val contigName: String = split._1.stripPrefix(">").trim
val contigDescription: String = split._2.trim
val referenceName: String = split._1.stripPrefix(">").trim
val referenceDescription: String = split._2.trim

(Some(contigName), Some(contigDescription))
(Some(referenceName), Some(referenceDescription))
}
} else {
(Some(dL.stripPrefix(">").trim), None)
Expand Down Expand Up @@ -109,33 +109,33 @@ private[adam] object FastaSliceConverter {
.filter((kv: (Long, String)) => !kv._2.startsWith(";"))

val descriptionLines: Map[Long, FastaDescriptionLine] = getDescriptionLines(filtered)
val indexToContigDescription = rdd.context.broadcast(descriptionLines)
val indexToReferenceDescription = rdd.context.broadcast(descriptionLines)

val sequenceLines = filtered.filter(kv => !isDescriptionLine(kv._2))

val keyedSequences =
if (indexToContigDescription.value.isEmpty) {
if (indexToReferenceDescription.value.isEmpty) {
sequenceLines.keyBy(kv => -1L)
} else {
sequenceLines.keyBy(row => findContigIndex(row._1, indexToContigDescription.value.keys.toList))
sequenceLines.keyBy(row => findReferenceIndex(row._1, indexToReferenceDescription.value.keys.toList))
}

val groupedContigs = keyedSequences.groupByKey()
val groupedReferences = keyedSequences.groupByKey()

val converter = new FastaSliceConverter(maximumLength)

groupedContigs.flatMap {
groupedReferences.flatMap {
case (id, lines) =>

val descriptionLine = indexToContigDescription.value.getOrElse(id, FastaDescriptionLine())
val descriptionLine = indexToReferenceDescription.value.getOrElse(id, FastaDescriptionLine())
assert(lines.nonEmpty, s"Sequence ${descriptionLine.seqId} has no sequence data.")

val sequence: Seq[String] = lines.toSeq.sortBy(_._1).map(kv => cleanSequence(kv._2))
converter.convert(
descriptionLine.contigName,
descriptionLine.referenceName,
descriptionLine.seqId,
sequence,
descriptionLine.contigDescription
descriptionLine.referenceDescription
)
}
}
Expand Down Expand Up @@ -182,18 +182,18 @@ private[adam] object FastaSliceConverter {
}

/**
* Finds the index of a contig.
* Finds the index of a reference.
*
* The index of a contig is the highest index below the index of our row.
* The index of a reference is the highest index below the index of our row.
* Here, we define the index as the row number of the description line that
* describes this contig.
* describes this reference.
*
* @param rowIdx The row number of the contig row to check.
* @param rowIdx The row number of the reference row to check.
* @param indices A list containing the row numbers of all description lines.
* @return Returns the row index of the description line that describes this
* sequence line.
*/
private[converters] def findContigIndex(rowIdx: Long, indices: List[Long]): Long = {
private[converters] def findReferenceIndex(rowIdx: Long, indices: List[Long]): Long = {
val idx = indices.filter(_ <= rowIdx)
idx.max
}
Expand Down
Expand Up @@ -128,20 +128,6 @@ case class ParquetUnboundReadDataset private[rdd] (
sqlContext.read.parquet(parquetFilename).as[ReadProduct]
}

override def saveAsParquet(filePath: String,
blockSize: Int = 128 * 1024 * 1024,
pageSize: Int = 1 * 1024 * 1024,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
disableDictionaryEncoding: Boolean = false) {
info("Saving directly as Parquet from SQL. Options other than compression codec are ignored.")
dataset.toDF()
.write
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
saveMetadata(filePath)
}

def replaceSequences(newSequences: SequenceDictionary): ReadDataset = {
copy(sequences = newSequences)
}
Expand Down
Expand Up @@ -133,20 +133,6 @@ case class ParquetUnboundSequenceDataset private[rdd] (
sqlContext.read.parquet(parquetFilename).as[SequenceProduct]
}

override def saveAsParquet(filePath: String,
blockSize: Int = 128 * 1024 * 1024,
pageSize: Int = 1 * 1024 * 1024,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
disableDictionaryEncoding: Boolean = false) {
info("Saving directly as Parquet from SQL. Options other than compression codec are ignored.")
dataset.toDF()
.write
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
saveMetadata(filePath)
}

def replaceSequences(newSequences: SequenceDictionary): SequenceDataset = {
copy(sequences = newSequences)
}
Expand Down
Expand Up @@ -138,20 +138,6 @@ case class ParquetUnboundSliceDataset private[rdd] (
sqlContext.read.parquet(parquetFilename).as[SliceProduct]
}

override def saveAsParquet(filePath: String,
blockSize: Int = 128 * 1024 * 1024,
pageSize: Int = 1 * 1024 * 1024,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
disableDictionaryEncoding: Boolean = false) {
info("Saving directly as Parquet from SQL. Options other than compression codec are ignored.")
dataset.toDF()
.write
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
saveMetadata(filePath)
}

def replaceSequences(newSequences: SequenceDictionary): SliceDataset = {
copy(sequences = newSequences)
}
Expand Down

0 comments on commit d219c8e

Please sign in to comment.