From 360daba9da38659cc1e1fb9b75ee667265b8700c Mon Sep 17 00:00:00 2001 From: DO YUNG YOON Date: Mon, 28 Mar 2016 22:10:42 +0900 Subject: [PATCH 1/4] [S2GRAPH-66]: Optimize toEdge, IndexEdgeDeserializable using mutable Map. use immutable.Map.newBuilder to build merged props. --- .../tall/IndexEdgeDeserializable.scala | 43 +++++++++------- .../wide/IndexEdgeDeserializable.scala | 51 ++++++++++--------- 2 files changed, 52 insertions(+), 42 deletions(-) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index f2339402..da029efc 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -26,6 +26,8 @@ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import scala.collection.immutable + class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { import StorageDeserializable._ @@ -107,7 +109,6 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) } else { // not degree edge - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version) pos = endAt @@ -117,32 +118,38 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version) } - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v - - val idxPropsMap = idxProps.toMap - - val tgtVertexId = - idxPropsMap.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } + val allProps = immutable.Map.newBuilder[Byte, InnerValLike] + /** process index props */ + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> v + else allProps += seq -> v + } + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) + allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) } else { - bytesToKeyValues(kv.value, 0, kv.value.length, version) + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + props.foreach { case (k, v) => + allProps += (k -> v) + } } - - val _mergedProps = (idxProps ++ props).toMap + val _mergedProps = allProps.result() val mergedProps = if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) + } + val ts = kv.timestamp IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index d8bef976..20770c0e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -25,6 +25,8 @@ import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import scala.collection.immutable + class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { import StorageDeserializable._ @@ -89,42 +91,43 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) else parseQualifier(kv, version) - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { + val allProps = immutable.Map.newBuilder[Byte, InnerValLike] + + /** process index props */ + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> v + else allProps += seq -> v + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) + allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) } else if (kv.qualifier.isEmpty) { - parseDegreeValue(kv, version) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.degreeSeq -> InnerVal.withLong(countVal, version)) } else { - parseValue(kv, version) + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + props.foreach { case (k, v) => + allProps += (k -> v) + } } - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - - // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size) - - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield { - if (k == LabelMeta.degreeSeq) k -> v - else seq -> v - } + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - val idxPropsMap = idxProps.toMap val tgtVertexId = if (tgtVertexIdInQualifier) { - idxPropsMap.get(LabelMeta.toSeq) match { + mergedProps.get(LabelMeta.toSeq) match { case None => tgtVertexIdRaw case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) } } else tgtVertexIdRaw - - val _mergedProps = (idxProps ++ props).toMap - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - // logger.error(s"$mergedProps") // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong From c3c3ecf12a51c930a52944ab4bc5143575c9349b Mon Sep 17 00:00:00 2001 From: DO YUNG YOON Date: Sat, 28 May 2016 10:18:37 +0900 Subject: [PATCH 2/4] [S2GRAPH-69]: Change IndexEdge's props data type. --- .../scala/org/apache/s2graph/core/Edge.scala | 43 ++-- .../apache/s2graph/core/storage/Storage.scala | 4 +- .../tall/IndexEdgeDeserializable.scala | 165 +++++++------- .../tall/IndexEdgeSerializable.scala | 4 +- .../wide/IndexEdgeDeserializable.scala | 214 +++++++++--------- .../wide/IndexEdgeSerializable.scala | 4 +- .../s2graph/core/types/InnerValLike.scala | 4 + .../core/storage/hbase/IndexEdgeTest.scala | 24 +- 8 files changed, 234 insertions(+), 228 deletions(-) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala index 1169ba92..979268b0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala @@ -70,14 +70,14 @@ case class IndexEdge(srcVertex: Vertex, op: Byte, version: Long, labelIndexSeq: Byte, - props: Map[Byte, InnerValLike]) extends JSONParser { - if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") + props: Map[Byte, InnerValLikeWithTs]) extends JSONParser { + // if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") // assert(props.containsKey(LabelMeta.timeStampSeq)) - val ts = props(LabelMeta.timeStampSeq).toString.toLong - val degreeEdge = props.contains(LabelMeta.degreeSeq) + lazy val ts = props(LabelMeta.timeStampSeq).innerVal.toString.toLong + lazy val degreeEdge = props.contains(LabelMeta.degreeSeq) lazy val label = Label.findById(labelWithDir.labelId) - val schemaVer = label.schemaVersion + lazy val schemaVer = label.schemaVersion lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta => val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer) @@ -92,9 +92,9 @@ case class IndexEdge(srcVertex: Vertex, case None => /** - * TODO: agly hack - * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once - */ + * TODO: agly hack + * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once + */ val v = k match { case LabelMeta.timeStampSeq => InnerVal.withLong(version, schemaVer) case LabelMeta.toSeq => tgtVertex.innerId @@ -105,14 +105,14 @@ case class IndexEdge(srcVertex: Vertex, } k -> v - case Some(v) => k -> v + case Some(v) => k -> v.innerVal } } lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet - lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v + lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v.innerVal - lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } + // lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } //TODO: // lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList @@ -122,11 +122,11 @@ case class IndexEdge(srcVertex: Vertex, def propsWithName = for { (seq, v) <- props meta <- label.metaPropsMap.get(seq) if seq >= 0 - jsValue <- innerValToJsValue(v, meta.dataType) + jsValue <- innerValToJsValue(v.innerVal, meta.dataType) } yield meta.name -> jsValue - def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, propsWithTs) + def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, props) // only for debug def toLogString() = { @@ -154,8 +154,9 @@ case class Edge(srcVertex: Vertex, def props = propsWithTs.mapValues(_.innerVal) def relatedEdges = { - if (labelWithDir.isDirected) List(this, duplicateEdge) - else { + if (labelWithDir.isDirected) { + List(this, duplicateEdge) + } else { val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) val base = copy(labelWithDir = outDir) List(base, base.reverseSrcTgtEdge) @@ -202,15 +203,15 @@ case class Edge(srcVertex: Vertex, def isDegree = propsWithTs.contains(LabelMeta.degreeSeq) - def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { - case Some(_) => props - case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) - } + // def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { + // case Some(_) => props + // case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) + // } - def propsPlusTsValid = propsPlusTs.filter(kv => kv._1 >= 0) + def propsPlusTsValid = propsWithTs.filter(kv => kv._1 >= 0) def edgesWithIndex = for (labelOrder <- labelOrders) yield { - IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTs) + IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsWithTs) } def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index fd968add..48f36ff4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -1213,13 +1213,13 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { /** IndexEdge */ def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> InnerVal.withLong(amount, indexedEdge.schemaVer)) + val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) val _indexedEdge = indexedEdge.copy(props = newProps) indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) } def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> InnerVal.withLong(amount, indexedEdge.schemaVer)) + val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) val _indexedEdge = indexedEdge.copy(props = newProps) indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) } diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index da029efc..7ccea307 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -75,84 +75,87 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte (Array.empty[(Byte, InnerValLike)], 0) } - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - - // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") - var pos = 0 - val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version) - pos += srcIdLen - val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) - pos += 4 - val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) - pos += 1 - - val op = kv.row(pos) - pos += 1 - - if (pos == kv.row.length) { - // degree - // val degreeVal = Bytes.toLong(kv.value) - val degreeVal = bytesToLongFunc(kv.value, 0) - val ts = kv.timestamp - val props = Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, version), - LabelMeta.degreeSeq -> InnerVal.withLong(degreeVal, version)) - val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) - } else { - // not degree edge - - val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version) - pos = endAt - val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version) - } - - val allProps = immutable.Map.newBuilder[Byte, InnerValLike] - /** process index props */ - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } { - if (k == LabelMeta.degreeSeq) allProps += k -> v - else allProps += seq -> v - } - /** process props */ - if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - } else { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - props.foreach { case (k, v) => - allProps += (k -> v) - } - } - val _mergedProps = allProps.result() - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - - val tgtVertexId = - mergedProps.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } - - val ts = kv.timestamp - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) - - } - } - } + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + schemaVer: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val version = kv.timestamp + // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") + var pos = 0 + val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer) + pos += srcIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) + pos += 1 + + val op = kv.row(pos) + pos += 1 + + if (pos == kv.row.length) { + // degree + // val degreeVal = Bytes.toLong(kv.value) + val degreeVal = bytesToLongFunc(kv.value, 0) + val ts = kv.timestamp + val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer), + LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer)) + val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer)) + IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) + } else { + // not degree edge + + + val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer) + pos = endAt + val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer) + } + + val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + /** process indexProps */ + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) + else allProps += seq -> InnerValLikeWithTs(v, version) + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) + props.foreach { case (k, v) => + allProps += (k -> InnerValLikeWithTs(v, version)) + } + } + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + + /** process tgtVertexId */ + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } + + + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) + + } + } +} diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala index 116412ce..d00877ec 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala @@ -59,9 +59,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge val value = if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) else propsToKeyValues(indexEdge.metas.toSeq) val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index 20770c0e..edb49b42 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -28,111 +28,109 @@ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} import scala.collection.immutable class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { - import StorageDeserializable._ - - type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) - type ValueRaw = (Array[(Byte, InnerValLike)], Int) - - private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { - // val degree = Bytes.toLong(kv.value) - val degree = bytesToLongFunc(kv.value, 0) - val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) - val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) - } - - private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { - var qualifierLen = 0 - var pos = 0 - val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { - val (props, endAt) = bytesToProps(kv.qualifier, pos, version) - pos = endAt - qualifierLen += endAt - val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) - } - qualifierLen += tgtVertexIdLen - (props, endAt, tgtVertexId, tgtVertexIdLen) - } - val (op, opLen) = - if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) - else (kv.qualifier(qualifierLen), 1) - - qualifierLen += opLen - - (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) - } - - private def parseValue(kv: SKeyValue, version: String): ValueRaw = { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - (props, endAt) - } - - private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { - (Array.empty[(Byte, InnerValLike)], 0) - } - - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) - }.getOrElse(parseRow(kv, version)) - - val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = - if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) - else parseQualifier(kv, version) - - val allProps = immutable.Map.newBuilder[Byte, InnerValLike] - - /** process index props */ - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } { - if (k == LabelMeta.degreeSeq) allProps += k -> v - else allProps += seq -> v - } - - /** process props */ - if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - } else if (kv.qualifier.isEmpty) { - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.degreeSeq -> InnerVal.withLong(countVal, version)) - } else { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - props.foreach { case (k, v) => - allProps += (k -> v) - } - } - - val _mergedProps = allProps.result() - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - - val tgtVertexId = if (tgtVertexIdInQualifier) { - mergedProps.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } - } else tgtVertexIdRaw - // logger.error(s"$mergedProps") - // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong - - val ts = kv.timestamp - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) - - } - } + + import StorageDeserializable._ + + type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) + type ValueRaw = (Array[(Byte, InnerValLike)], Int) + + private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { + // val degree = Bytes.toLong(kv.value) + val degree = bytesToLongFunc(kv.value, 0) + val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) + val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) + (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) + } + + private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { + var qualifierLen = 0 + var pos = 0 + val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { + val (props, endAt) = bytesToProps(kv.qualifier, pos, version) + pos = endAt + qualifierLen += endAt + val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) + } + qualifierLen += tgtVertexIdLen + (props, endAt, tgtVertexId, tgtVertexIdLen) + } + val (op, opLen) = + if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) + else (kv.qualifier(qualifierLen), 1) + + qualifierLen += opLen + + (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) + } + + private def parseValue(kv: SKeyValue, version: String): ValueRaw = { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + (props, endAt) + } + + private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { + (Array.empty[(Byte, InnerValLike)], 0) + } + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + schemaVer: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val version = kv.timestamp + + val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => + (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) + }.getOrElse(parseRow(kv, schemaVer)) + + val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = + if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer) + else parseQualifier(kv, schemaVer) + + val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + /** process indexProps */ + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) + else allProps += seq -> InnerValLikeWithTs(v, version) + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else if (kv.qualifier.isEmpty) { + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else { + val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) + props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) } + } + + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + + /** process tgtVertexId */ + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } + + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) + + } +} diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala index 189de225..49e95b44 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -59,9 +59,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge val value = if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) else propsToKeyValues(indexEdge.metas.toSeq) val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala index 09d065f5..d60641f6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala @@ -248,6 +248,10 @@ object InnerValLikeWithTs extends HBaseDeserializable { InnerValLikeWithTs(InnerVal.withLong(l, version), ts) } + def withDouble(d: Double, ts: Long, version: String): InnerValLikeWithTs = { + InnerValLikeWithTs(InnerVal.withDouble(d, version), ts) + } + def withStr(s: String, ts: Long, version: String): InnerValLikeWithTs = { InnerValLikeWithTs(InnerVal.withStr(s, version), ts) } diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala index 5edd5b10..ea900616 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala @@ -35,7 +35,7 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { * @param to: to VertexId for edge. * @param props: expected props of edge. */ - def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, InnerValLike]): Unit = { + def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, InnerValLikeWithTs]): Unit = { val from = InnerVal.withLong(1, l.schemaVersion) val vertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, from) val tgtVertexId = TargetVertexId(HBaseType.DEFAULT_COL_ID, to) @@ -59,9 +59,9 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { l <- Seq(label, labelV2, labelV3, labelV4) } { val to = InnerVal.withLong(101, l.schemaVersion) - val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) - val props = Map(LabelMeta.timeStampSeq -> tsInnerVal, - 1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion)) + val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) + val props = Map(LabelMeta.timeStampSeq -> tsInnerValWithTs, + 1.toByte -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion)) check(l, ts, to, props) } @@ -73,10 +73,10 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { l <- Seq(label, labelV2, labelV3, labelV4) } { val to = InnerVal.withStr("0", l.schemaVersion) - val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) + val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) val props = Map( - LabelMeta.degreeSeq -> InnerVal.withLong(10, l.schemaVersion), - LabelMeta.timeStampSeq -> tsInnerVal) + LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion), + LabelMeta.timeStampSeq -> tsInnerValWithTs) check(l, ts, to, props) } @@ -88,13 +88,13 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { l <- Seq(label, labelV2, labelV3, labelV4) } { val to = InnerVal.withLong(101, l.schemaVersion) + val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) + val props = Map(LabelMeta.timeStampSeq -> tsInnerValWithTs, + 1.toByte -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion), + LabelMeta.countSeq -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion)) - val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) - val props = Map(LabelMeta.timeStampSeq -> tsInnerVal, - 1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion), - LabelMeta.countSeq -> InnerVal.withLong(10, l.schemaVersion)) check(l, ts, to, props) } } -} +} \ No newline at end of file From 96862e6c3d0c1ac93d6b675bc486bd0236aac897 Mon Sep 17 00:00:00 2001 From: DO YUNG YOON Date: Sat, 11 Jun 2016 23:21:10 +0900 Subject: [PATCH 3/4] update changes. --- CHANGES | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES b/CHANGES index a8e5f4ae..404d6b0b 100644 --- a/CHANGES +++ b/CHANGES @@ -108,6 +108,8 @@ Release 0.12.1 - unreleased S2GRAPH-7: Abstract common codes for rest project into s2core. (Committed by daewon). S2GRAPH-31: Remove playframework dependencies on s2core/build.sbt. (Committed by DOYUNG YOON). + + S2GRAPH-69: Change IndexEdge's props data type. (Committed by DOYUNG YOON). TEST From 25a8fcaf9a4ec7eaae18ab25fe1dd41be9c42443 Mon Sep 17 00:00:00 2001 From: DO YUNG YOON Date: Sat, 11 Jun 2016 23:34:37 +0900 Subject: [PATCH 4/4] reformat codes. --- .../tall/IndexEdgeDeserializable.scala | 169 +++++++------- .../wide/IndexEdgeDeserializable.scala | 212 +++++++++--------- 2 files changed, 189 insertions(+), 192 deletions(-) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index 7ccea307..fa8ca47e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -25,7 +25,6 @@ import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} - import scala.collection.immutable class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { @@ -75,87 +74,87 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte (Array.empty[(Byte, InnerValLike)], 0) } - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - schemaVer: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val version = kv.timestamp - // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") - var pos = 0 - val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer) - pos += srcIdLen - val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) - pos += 4 - val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) - pos += 1 - - val op = kv.row(pos) - pos += 1 - - if (pos == kv.row.length) { - // degree - // val degreeVal = Bytes.toLong(kv.value) - val degreeVal = bytesToLongFunc(kv.value, 0) - val ts = kv.timestamp - val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer), - LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer)) - val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer)) - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) - } else { - // not degree edge - - - val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer) - pos = endAt - val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer) - } - - val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - /** process indexProps */ - for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } { - if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) - else allProps += seq -> InnerValLikeWithTs(v, version) - } - - /** process props */ - if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) - } else { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) - props.foreach { case (k, v) => - allProps += (k -> InnerValLikeWithTs(v, version)) - } - } - val _mergedProps = allProps.result() - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) - - /** process tgtVertexId */ - val tgtVertexId = - mergedProps.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) - } - - - IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) - - } - } -} + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + schemaVer: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val version = kv.timestamp + // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") + var pos = 0 + val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer) + pos += srcIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) + pos += 1 + + val op = kv.row(pos) + pos += 1 + + if (pos == kv.row.length) { + // degree + // val degreeVal = Bytes.toLong(kv.value) + val degreeVal = bytesToLongFunc(kv.value, 0) + val ts = kv.timestamp + val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer), + LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer)) + val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer)) + IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) + } else { + // not degree edge + + + val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer) + pos = endAt + val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer) + } + + val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + /** process indexProps */ + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) + else allProps += seq -> InnerValLikeWithTs(v, version) + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) + props.foreach { case (k, v) => + allProps += (k -> InnerValLikeWithTs(v, version)) + } + } + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + + /** process tgtVertexId */ + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } + + + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) + + } + } + } diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index edb49b42..46e4858d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -24,113 +24,111 @@ import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} - import scala.collection.immutable class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { - - import StorageDeserializable._ - - type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) - type ValueRaw = (Array[(Byte, InnerValLike)], Int) - - private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { - // val degree = Bytes.toLong(kv.value) - val degree = bytesToLongFunc(kv.value, 0) - val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) - val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) - } - - private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { - var qualifierLen = 0 - var pos = 0 - val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { - val (props, endAt) = bytesToProps(kv.qualifier, pos, version) - pos = endAt - qualifierLen += endAt - val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) - } - qualifierLen += tgtVertexIdLen - (props, endAt, tgtVertexId, tgtVertexIdLen) - } - val (op, opLen) = - if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) - else (kv.qualifier(qualifierLen), 1) - - qualifierLen += opLen - - (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) - } - - private def parseValue(kv: SKeyValue, version: String): ValueRaw = { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - (props, endAt) - } - - private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { - (Array.empty[(Byte, InnerValLike)], 0) - } - - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - schemaVer: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val version = kv.timestamp - - val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) - }.getOrElse(parseRow(kv, schemaVer)) - - val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = - if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer) - else parseQualifier(kv, schemaVer) - - val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - /** process indexProps */ - for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } { - if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) - else allProps += seq -> InnerValLikeWithTs(v, version) - } - - /** process props */ - if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) - } else if (kv.qualifier.isEmpty) { - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) - } else { - val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) - props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) } - } - - val _mergedProps = allProps.result() - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) - - /** process tgtVertexId */ - val tgtVertexId = - mergedProps.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) - } - - IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) - - } -} + import StorageDeserializable._ + + type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) + type ValueRaw = (Array[(Byte, InnerValLike)], Int) + + private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { + // val degree = Bytes.toLong(kv.value) + val degree = bytesToLongFunc(kv.value, 0) + val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) + val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) + (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) + } + + private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { + var qualifierLen = 0 + var pos = 0 + val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { + val (props, endAt) = bytesToProps(kv.qualifier, pos, version) + pos = endAt + qualifierLen += endAt + val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) + } + qualifierLen += tgtVertexIdLen + (props, endAt, tgtVertexId, tgtVertexIdLen) + } + val (op, opLen) = + if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) + else (kv.qualifier(qualifierLen), 1) + + qualifierLen += opLen + + (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) + } + + private def parseValue(kv: SKeyValue, version: String): ValueRaw = { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + (props, endAt) + } + + private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { + (Array.empty[(Byte, InnerValLike)], 0) + } + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + schemaVer: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val version = kv.timestamp + + val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => + (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) + }.getOrElse(parseRow(kv, schemaVer)) + + val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = + if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer) + else parseQualifier(kv, schemaVer) + + val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + /** process indexProps */ + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) + else allProps += seq -> InnerValLikeWithTs(v, version) + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else if (kv.qualifier.isEmpty) { + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else { + val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) + props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) } + } + + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + + /** process tgtVertexId */ + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } + + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) + + } + }