Skip to content

Commit

Permalink
Merge pull request #57 from osopardo1/47-stop-losing-records
Browse files Browse the repository at this point in the history
Avoid missing cube files on reading
  • Loading branch information
cugni committed Jan 11, 2022
2 parents 35c9f56 + 0a3ba6e commit bcea74f
Show file tree
Hide file tree
Showing 15 changed files with 313 additions and 269 deletions.
61 changes: 59 additions & 2 deletions core/src/main/scala/io/qbeast/core/model/CubeId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ object CubeId {
} else if (symbol == '/') {
63
} else {
throw new IllegalArgumentException(s"Invalid symbol '${symbol}'.")
throw new IllegalArgumentException(s"Invalid symbol '$symbol'.")
}
for (i <- 0 until math.min(6, end - begin)) {
val mask = 1 << (5 - i)
Expand All @@ -243,9 +243,66 @@ object CubeId {
* @param depth the cube depth
* @param bitMask the bitMask representing the cube z-index.
*/
case class CubeId(dimensionCount: Int, depth: Int, bitMask: Array[Long]) extends Serializable {
case class CubeId(dimensionCount: Int, depth: Int, bitMask: Array[Long])
extends Serializable
with Ordered[CubeId] {
private lazy val range = getRange

/**
* Compare two CubeIds.
* @param that the other CubeId
* @return a negative integer, zero, or a positive integer as this CubeId
* is less than, equal to, or greater than the other CubeId.
*/
override def compare(that: CubeId): Int = {
val thisBitset = BitSet.fromBitMaskNoCopy(bitMask)
val thatBitset = BitSet.fromBitMaskNoCopy(that.bitMask)
val commonDepth = math.min(depth, that.depth)
for (depthOffset <- 0.until(commonDepth * dimensionCount)) {
val firstBit = thisBitset.contains(depthOffset)
val secondBit = thatBitset.contains(depthOffset)
if (firstBit != secondBit) {
if (firstBit) {
return 1
} else {
return -1
}
}

}
// We end up here, if one of the 2 cubes is an ancestor of the other.
// If positive, that < this => this is of deeper level
// If negative, that > this => that is of deeper level
// If equal, both are of the same level
depth.compare(that.depth)
}

/**
* Returns true if this cube is the ancestor of the other cube.
* In case this and other are the same cube, it returns true.
* @param other cube to check
* @return
*/
def isAncestorOf(other: CubeId): Boolean = {
require(
other.dimensionCount == dimensionCount,
"The two cubes must have the same dimension count.")

if (depth > other.depth) {
false
} else {
val end = dimensionCount * depth
val possibleDescendantBits = BitSet.fromBitMaskNoCopy(other.bitMask).until(end).toBitMask

for (i <- possibleDescendantBits.indices) {
if (possibleDescendantBits(i) != bitMask(i)) {
return false
}
}
true
}
}

/**
* Returns whether the identifier represents the root cube.
*
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/io/qbeast/core/model/CubeWeights.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ object CubeWeightsBuilder {
desiredCubeSize: Int,
numPartitions: Int,
numElements: Long,
bufferCapacity: Long): Double = {
bufferCapacity: Long): Int = {
val numGroups = Math.max(numPartitions, numElements / bufferCapacity)
val groupCubeSize = desiredCubeSize.toDouble / numGroups
groupCubeSize
val groupCubeSize = desiredCubeSize / numGroups
Math.max(1, groupCubeSize.toInt)
}

}
Expand All @@ -34,13 +34,13 @@ object CubeWeightsBuilder {
* Builder for creating cube weights.
*
* @param desiredCubeSize the desired cube size
* @param groupSize the boost size
* @param groupSize the number of elements for each group
* @param bufferCapacity the buffer capacity to store the cube weights in memory
* @param replicatedOrAnnouncedSet the announced or replicated cubes' identifiers
*/
class CubeWeightsBuilder protected (
private val desiredCubeSize: Int,
private val groupSize: Double,
private val groupSize: Int,
private val bufferCapacity: Long,
private val replicatedOrAnnouncedSet: Set[CubeId] = Set.empty)
extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize.Typing
import io.qbeast.IISeq
import io.qbeast.core.transform.{Transformation, Transformer}

import scala.collection.immutable.SortedMap

object QTableID {

@JsonCreator
Expand Down Expand Up @@ -188,7 +190,7 @@ case class IndexStatus(
revision: Revision,
replicatedSet: ReplicatedSet = Set.empty,
announcedSet: Set[CubeId] = Set.empty,
cubesStatuses: Map[CubeId, CubeStatus] = Map.empty)
cubesStatuses: SortedMap[CubeId, CubeStatus] = SortedMap.empty)
extends Serializable {

def addAnnouncements(newAnnouncedSet: Set[CubeId]): IndexStatus =
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/io/qbeast/core/model/CubeIdSpeedTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,23 @@ class CubeIdSpeedTest extends AnyFlatSpec with Matchers with TimeLimits {
// scalastyle:off println
println(s"3D completed in $time msseconds - ${count.toDouble / time * 1000} IOPS")
}

"The sorting of CubeId" should "be faster than comparing using strings " in {
val root = CubeId.root(10)
val oneMillion =
Random.shuffle(root.children.flatMap(child => Seq(child) ++ child.children)).toVector
val start2 = System.currentTimeMillis()
val s2 = oneMillion.sortBy(_.string)
val time2 = System.currentTimeMillis() - start2
println(s"Time taken to sort by string: ${time2}")

val start1 = System.currentTimeMillis()
val s1 = oneMillion.sorted
val time1 = System.currentTimeMillis() - start1
println(s"Time spent on oneMillion.sorted: ${time1}")

println(s"s1: ${s1.head}, s2: ${s2.head}")
time2 should be > time1

}
}
81 changes: 81 additions & 0 deletions core/src/test/scala/io/qbeast/core/model/CubeIdTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package io.qbeast.core.model
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.util.Random

/**
* Tests for CubeId.
*/
Expand Down Expand Up @@ -153,4 +155,83 @@ class CubeIdTest extends AnyFlatSpec with Matchers {
cubeId.contains(Point(1.0, 1.0)) shouldBe false
cubeId.contains(Point(0.5, 0.75)) shouldBe true
}

it should "compare parent and children correctly with 2 dimensions" in {

val root = CubeId.root(2)
val kids = root.children.toVector
val grandChildren = Random.shuffle(kids.flatMap(_.children)).sorted
for (kid <- kids) {
root should be < kid
}
for (kid <- grandChildren) {
root should be < kid
}
for (k <- kids) {
for (kk <- k.children) {
k should be < kk
}
}
for (group <- grandChildren.grouped(4)) {
group.map(_.parent).distinct.size shouldBe 1
}

val twoGens = Random.shuffle(kids ++ grandChildren).sorted
for (group <- twoGens.grouped(5)) {
group.takeRight(4).map(_.parent).distinct.head.get shouldBe group.head
}

}

it should "compare parent and children correctly with 8 dimensions" in {

val root = CubeId.root(8)
val kids = root.children.toVector
val grandChildren = Random.shuffle(kids.flatMap(_.children)).sorted
for (kid <- kids) {
root should be < kid
}
for (kid <- grandChildren) {
root should be < kid
}
for (k <- kids) {
for (kk <- k.children) {
k should be < kk
}
}
for (group <- grandChildren.grouped(256)) {
group.map(_.parent).distinct.size shouldBe 1
}

val twoGens = Random.shuffle(kids ++ grandChildren).sorted
for (group <- twoGens.grouped(257)) {
group.takeRight(4).map(_.parent).distinct.head.get shouldBe group.head
}

}

it should "check ancestors correctly" in {
val root = CubeId.root(2)
val kids = root.children.toVector

var otherKid: CubeId = null // This helps to test against other CubeIds
for (kid <- kids) {
root.isAncestorOf(kid) shouldBe true
kid.isAncestorOf(root) shouldBe false

for (gc <- kid.children) {
root.isAncestorOf(gc) shouldBe true
gc.isAncestorOf(root) shouldBe false

kid.isAncestorOf(gc) shouldBe true
gc.isAncestorOf(kid) shouldBe false

if (otherKid != null) {
otherKid.isAncestorOf(gc) shouldBe false
gc.isAncestorOf(otherKid) shouldBe false
}
}
otherKid = kid
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ case class DeltaQbeastSnapshot(snapshot: Snapshot) extends QbeastSnapshot {
*/
override def loadIndexStatus(revisionID: RevisionID): IndexStatus = {
val revision = getRevision(revisionID)
val replicatedSet = getReplicatedSet(lastRevisionID)
val replicatedSet = getReplicatedSet(revisionID)
new IndexStatusBuilder(this, revision, replicatedSet).build()
}

Expand Down
8 changes: 6 additions & 2 deletions src/main/scala/io/qbeast/spark/delta/IndexStatusBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import io.qbeast.spark.utils.TagUtils
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.{Dataset, SparkSession}

import scala.collection.immutable.SortedMap

/**
* Builds the index status from a given snapshot and revision
* @param qbeastSnapshot the QbeastSnapshot
Expand Down Expand Up @@ -44,11 +46,12 @@ private[delta] class IndexStatusBuilder(
* Returns the index state for the given space revision
* @return Dataset containing cube information
*/
def buildCubesStatuses: Map[CubeId, CubeStatus] = {
def buildCubesStatuses: SortedMap[CubeId, CubeStatus] = {

val spark = SparkSession.active
import spark.implicits._
val rev = revision
val builder = SortedMap.newBuilder[CubeId, CubeStatus]
revisionFiles
.groupByKey(_.tags(TagUtils.cube))
.mapGroups((cube, f) => {
Expand All @@ -75,7 +78,8 @@ private[delta] class IndexStatusBuilder(
(rev.createCubeId(cube), cubeStatus)
})
.collect()
.toMap
.foreach(builder += _)
builder.result()
}

}
2 changes: 0 additions & 2 deletions src/main/scala/io/qbeast/spark/index/OTreeDataAnalyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ object DoublePassOTreeDataAnalyzer extends OTreeDataAnalyzer with Serializable {
val cols = revision.columnTransformers.map(_.columnName) ++ indexColumns

// Estimate the desiredSize of the cube at partition level
// If the user has specified a desiredSize too small
// set it to minCubeSize
val numPartitions: Int = weightedDataFrame.rdd.getNumPartitions
val numElements: Long = stats.head.count
val bufferCapacity: Long = CUBE_WEIGHTS_BUFFER_CAPACITY
Expand Down
73 changes: 70 additions & 3 deletions src/main/scala/io/qbeast/spark/index/query/QueryExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
*/
package io.qbeast.spark.index.query

import io.qbeast.core.model.{QbeastFile, QbeastSnapshot}
import io.qbeast.IISeq
import io.qbeast.core.model._
import io.qbeast.spark.utils.State

import scala.collection.mutable

/**
* Executes a query against a Qbeast snapshot
Expand All @@ -21,11 +25,74 @@ class QueryExecutor(querySpecBuilder: QuerySpecBuilder, qbeastSnapshot: QbeastSn
qbeastSnapshot.loadAllRevisions.flatMap { revision =>
val querySpec = querySpecBuilder.build(revision)
val indexStatus = qbeastSnapshot.loadIndexStatus(revision.revisionID)
val indexStatusExecutor = new QueryIndexStatusExecutor(querySpec, indexStatus)

val matchingFiles = indexStatusExecutor.execute(previouslyMatchedFiles)
val matchingFiles = executeRevision(querySpec, indexStatus, previouslyMatchedFiles)
matchingFiles
}
}

private[query] def executeRevision(
querySpec: QuerySpec,
indexStatus: IndexStatus,
previouslyMatchedFiles: Seq[QbeastFile]): IISeq[QbeastFile] = {

val fileMap = previouslyMatchedFiles.map(a => (a.path, a)).toMap

val outputFiles = Vector.newBuilder[QbeastFile]
val stack = mutable.Stack(indexStatus.revision.createCubeIdRoot())
while (stack.nonEmpty) {
val currentCube = stack.pop()

val cubeIter = indexStatus.cubesStatuses.iteratorFrom(currentCube)
// Contains cases for the next element from the iterator being
// 1. the cube itself
// 2. one of the cube's children
// 3. this currentCube's sibling or their subtree
// 4. empty, the currentCube is the right-most cube in the tree and it is not in cubesStatuses
if (cubeIter.hasNext) { // cases 1 to 3
cubeIter.next() match {
case (cube, CubeStatus(maxWeight, _, files)) if cube == currentCube => // Case 1
val unfilteredFiles = if (querySpec.weightRange.to < maxWeight) {
// cube maxWeight is larger than the sample fraction, weightRange.to,
// it means that currentCube is the last cube to visit from the current branch.
// All files are retrieved and no more cubes from the branch will be visited.
files.flatMap(fileMap.get)
} else {
// Otherwise,
// 1. if the currentCube is REPLICATED, we skip the cube
// 2. if the state is ANNOUNCED, ignore the After Announcement elements
// 3. if FLOODED, retrieve all files from the cube
val isReplicated = indexStatus.replicatedSet.contains(cube)
val isAnnounced = indexStatus.announcedSet.contains(cube)
val cubeFiles =
if (isReplicated) {
Vector.empty
} else if (isAnnounced) {
files.flatMap(fileMap.get).filterNot(_.state == State.ANNOUNCED)
} else {
files.flatMap(fileMap.get)
}
val nextLevel = cube.children
.filter(querySpec.querySpace.intersectsWith)
stack.pushAll(nextLevel)
cubeFiles
}

outputFiles ++= unfilteredFiles.filter(file =>
file.maxWeight > querySpec.weightRange.from)

case (cube, _) if currentCube.isAncestorOf(cube) => // Case 2
// c is a child cube of currentCube. Aside from c, we also need to
// consider c's sibling cubes.
val nextLevel = currentCube.children
.filter(querySpec.querySpace.intersectsWith)
stack.pushAll(nextLevel)

case _ => // Case 3
}
}
}
outputFiles.result()
}

}
Loading

0 comments on commit bcea74f

Please sign in to comment.