Skip to content

Commit

Permalink
Minor updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Oct 30, 2014
1 parent b0a18b1 commit ed5fbf0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
Expand Up @@ -37,8 +37,8 @@ private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
val segment: WriteAheadLogFileSegment
) extends Partition
val segment: WriteAheadLogFileSegment)
extends Partition


/**
Expand All @@ -59,11 +59,11 @@ private[streaming]
class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient hadoopConfig: Configuration,
@transient override val blockIds: Array[BlockId],
@transient val segments: Array[WriteAheadLogFileSegment],
val storeInBlockManager: Boolean,
val storageLevel: StorageLevel
) extends BlockRDD[T](sc, blockIds) {
@transient blockIds: Array[BlockId],
@transient segments: Array[WriteAheadLogFileSegment],
storeInBlockManager: Boolean,
storageLevel: StorageLevel)
extends BlockRDD[T](sc, blockIds) {

require(
blockIds.length == segments.length,
Expand All @@ -76,7 +76,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
assertValid()
Array.tabulate(blockIds.size) { i =>
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) }
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i))
}
}

/**
Expand Down Expand Up @@ -116,8 +117,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
lazy val segmentLocations = HdfsUtils.getFileSegmentLocations(
def blockLocations = getBlockIdLocations().get(partition.blockId)
def segmentLocations = HdfsUtils.getFileSegmentLocations(
partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig)
blockLocations.orElse(segmentLocations).getOrElse(Seq.empty)
}
Expand Down
Expand Up @@ -86,7 +86,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
testStoreInBM: Boolean = false
) {
val numBlocks = numPartitionssInBM + numPartitionsInWAL
val data = Seq.tabulate(numBlocks) { _ => Seq.fill(10) { scala.util.Random.nextString(50) } }
val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50))

// Put the necessary blocks in the block manager
val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt()))
Expand Down

0 comments on commit ed5fbf0

Please sign in to comment.