Skip to content

Commit

Permalink
Fixed bug
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Feb 7, 2018
1 parent 1204755 commit f0ce5df
Showing 1 changed file with 5 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
batches.slice(sliceStart, sliceEnd)
}

logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal))
logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal))

newBlocks.map { block =>
new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]]
Expand All @@ -149,18 +149,12 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}

private def generateDebugString(
blocks: Iterable[Array[UnsafeRow]],
blocks: Seq[UnsafeRow],
startOrdinal: Int,
endOrdinal: Int): String = {
val originalUnsupportedCheck =
sqlContext.getConf("spark.sql.streaming.unsupportedOperationCheck")
try {
sqlContext.setConf("spark.sql.streaming.unsupportedOperationCheck", "false")
s"MemoryBatch [$startOrdinal, $endOrdinal]: " +
s"${blocks.flatten.map(row => encoder.fromRow(row)).mkString(", ")}"
} finally {
sqlContext.setConf("spark.sql.streaming.unsupportedOperationCheck", originalUnsupportedCheck)
}
val fromRow = encoder.resolveAndBind().fromRow _
s"MemoryBatch [$startOrdinal, $endOrdinal]: " +
s"${blocks.map(row => fromRow(row)).mkString(", ")}"
}

override def commit(end: OffsetV2): Unit = synchronized {
Expand Down

0 comments on commit f0ce5df

Please sign in to comment.