From 360daba9da38659cc1e1fb9b75ee667265b8700c Mon Sep 17 00:00:00 2001 From: DO YUNG YOON Date: Mon, 28 Mar 2016 22:10:42 +0900 Subject: [PATCH 1/3] [S2GRAPH-66]: Optimize toEdge, IndexEdgeDeserializable using mutable Map. use immutable.Map.newBuilder to build merged props. --- .../tall/IndexEdgeDeserializable.scala | 43 +++++++++------- .../wide/IndexEdgeDeserializable.scala | 51 ++++++++++--------- 2 files changed, 52 insertions(+), 42 deletions(-) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index f2339402..da029efc 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -26,6 +26,8 @@ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import scala.collection.immutable + class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { import StorageDeserializable._ @@ -107,7 +109,6 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) } else { // not degree edge - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version) pos = endAt @@ -117,32 +118,38 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version) } - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v - - val idxPropsMap = idxProps.toMap - - val tgtVertexId = - idxPropsMap.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } + val allProps = immutable.Map.newBuilder[Byte, InnerValLike] + /** process index props */ + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> v + else allProps += seq -> v + } + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) + allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) } else { - bytesToKeyValues(kv.value, 0, kv.value.length, version) + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + props.foreach { case (k, v) => + allProps += (k -> v) + } } - - val _mergedProps = (idxProps ++ props).toMap + val _mergedProps = allProps.result() val mergedProps = if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) + } + val ts = kv.timestamp IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index d8bef976..20770c0e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -25,6 +25,8 @@ import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import scala.collection.immutable + class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { import StorageDeserializable._ @@ -89,42 +91,43 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) else parseQualifier(kv, version) - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { + val allProps = immutable.Map.newBuilder[Byte, InnerValLike] + + /** process index props */ + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> v + else allProps += seq -> v + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) + allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) } else if (kv.qualifier.isEmpty) { - parseDegreeValue(kv, version) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.degreeSeq -> InnerVal.withLong(countVal, version)) } else { - parseValue(kv, version) + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + props.foreach { case (k, v) => + allProps += (k -> v) + } } - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - - // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size) - - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield { - if (k == LabelMeta.degreeSeq) k -> v - else seq -> v - } + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - val idxPropsMap = idxProps.toMap val tgtVertexId = if (tgtVertexIdInQualifier) { - idxPropsMap.get(LabelMeta.toSeq) match { + mergedProps.get(LabelMeta.toSeq) match { case None => tgtVertexIdRaw case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) } } else tgtVertexIdRaw - - val _mergedProps = (idxProps ++ props).toMap - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - // logger.error(s"$mergedProps") // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong From c3c3ecf12a51c930a52944ab4bc5143575c9349b Mon Sep 17 00:00:00 2001 From: DO YUNG YOON Date: Sat, 28 May 2016 10:18:37 +0900 Subject: [PATCH 2/3] [S2GRAPH-69]: Change IndexEdge's props data type. --- .../scala/org/apache/s2graph/core/Edge.scala | 43 ++-- .../apache/s2graph/core/storage/Storage.scala | 4 +- .../tall/IndexEdgeDeserializable.scala | 165 +++++++------- .../tall/IndexEdgeSerializable.scala | 4 +- .../wide/IndexEdgeDeserializable.scala | 214 +++++++++--------- .../wide/IndexEdgeSerializable.scala | 4 +- .../s2graph/core/types/InnerValLike.scala | 4 + .../core/storage/hbase/IndexEdgeTest.scala | 24 +- 8 files changed, 234 insertions(+), 228 deletions(-) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala index 1169ba92..979268b0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala @@ -70,14 +70,14 @@ case class IndexEdge(srcVertex: Vertex, op: Byte, version: Long, labelIndexSeq: Byte, - props: Map[Byte, InnerValLike]) extends JSONParser { - if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") + props: Map[Byte, InnerValLikeWithTs]) extends JSONParser { + // if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") // assert(props.containsKey(LabelMeta.timeStampSeq)) - val ts = props(LabelMeta.timeStampSeq).toString.toLong - val degreeEdge = props.contains(LabelMeta.degreeSeq) + lazy val ts = props(LabelMeta.timeStampSeq).innerVal.toString.toLong + lazy val degreeEdge = props.contains(LabelMeta.degreeSeq) lazy val label = Label.findById(labelWithDir.labelId) - val schemaVer = label.schemaVersion + lazy val schemaVer = label.schemaVersion lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta => val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer) @@ -92,9 +92,9 @@ case class IndexEdge(srcVertex: Vertex, case None => /** - * TODO: agly hack - * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once - */ + * TODO: agly hack + * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once + */ val v = k match { case LabelMeta.timeStampSeq => InnerVal.withLong(version, schemaVer) case LabelMeta.toSeq => tgtVertex.innerId @@ -105,14 +105,14 @@ case class IndexEdge(srcVertex: Vertex, } k -> v - case Some(v) => k -> v + case Some(v) => k -> v.innerVal } } lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet - lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v + lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v.innerVal - lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } + // lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } //TODO: // lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList @@ -122,11 +122,11 @@ case class IndexEdge(srcVertex: Vertex, def propsWithName = for { (seq, v) <- props meta <- label.metaPropsMap.get(seq) if seq >= 0 - jsValue <- innerValToJsValue(v, meta.dataType) + jsValue <- innerValToJsValue(v.innerVal, meta.dataType) } yield meta.name -> jsValue - def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, propsWithTs) + def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, props) // only for debug def toLogString() = { @@ -154,8 +154,9 @@ case class Edge(srcVertex: Vertex, def props = propsWithTs.mapValues(_.innerVal) def relatedEdges = { - if (labelWithDir.isDirected) List(this, duplicateEdge) - else { + if (labelWithDir.isDirected) { + List(this, duplicateEdge) + } else { val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) val base = copy(labelWithDir = outDir) List(base, base.reverseSrcTgtEdge) @@ -202,15 +203,15 @@ case class Edge(srcVertex: Vertex, def isDegree = propsWithTs.contains(LabelMeta.degreeSeq) - def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { - case Some(_) => props - case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) - } + // def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { + // case Some(_) => props + // case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) + // } - def propsPlusTsValid = propsPlusTs.filter(kv => kv._1 >= 0) + def propsPlusTsValid = propsWithTs.filter(kv => kv._1 >= 0) def edgesWithIndex = for (labelOrder <- labelOrders) yield { - IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTs) + IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsWithTs) } def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index fd968add..48f36ff4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -1213,13 +1213,13 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { /** IndexEdge */ def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> InnerVal.withLong(amount, indexedEdge.schemaVer)) + val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) val _indexedEdge = indexedEdge.copy(props = newProps) indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) } def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> InnerVal.withLong(amount, indexedEdge.schemaVer)) + val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) val _indexedEdge = indexedEdge.copy(props = newProps) indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) } diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index da029efc..7ccea307 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -75,84 +75,87 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte (Array.empty[(Byte, InnerValLike)], 0) } - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - - // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") - var pos = 0 - val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version) - pos += srcIdLen - val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) - pos += 4 - val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) - pos += 1 - - val op = kv.row(pos) - pos += 1 - - if (pos == kv.row.length) { - // degree - // val degreeVal = Bytes.toLong(kv.value) - val degreeVal = bytesToLongFunc(kv.value, 0) - val ts = kv.timestamp - val props = Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, version), - LabelMeta.degreeSeq -> InnerVal.withLong(degreeVal, version)) - val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) - } else { - // not degree edge - - val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version) - pos = endAt - val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version) - } - - val allProps = immutable.Map.newBuilder[Byte, InnerValLike] - /** process index props */ - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } { - if (k == LabelMeta.degreeSeq) allProps += k -> v - else allProps += seq -> v - } - /** process props */ - if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - } else { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - props.foreach { case (k, v) => - allProps += (k -> v) - } - } - val _mergedProps = allProps.result() - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - - val tgtVertexId = - mergedProps.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } - - val ts = kv.timestamp - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) - - } - } - } + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + schemaVer: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val version = kv.timestamp + // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") + var pos = 0 + val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer) + pos += srcIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) + pos += 1 + + val op = kv.row(pos) + pos += 1 + + if (pos == kv.row.length) { + // degree + // val degreeVal = Bytes.toLong(kv.value) + val degreeVal = bytesToLongFunc(kv.value, 0) + val ts = kv.timestamp + val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer), + LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer)) + val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer)) + IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) + } else { + // not degree edge + + + val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer) + pos = endAt + val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer) + } + + val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + /** process indexProps */ + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) + else allProps += seq -> InnerValLikeWithTs(v, version) + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) + props.foreach { case (k, v) => + allProps += (k -> InnerValLikeWithTs(v, version)) + } + } + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + + /** process tgtVertexId */ + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } + + + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) + + } + } +} diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala index 116412ce..d00877ec 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala @@ -59,9 +59,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge val value = if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) else propsToKeyValues(indexEdge.metas.toSeq) val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index 20770c0e..edb49b42 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -28,111 +28,109 @@ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} import scala.collection.immutable class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { - import StorageDeserializable._ - - type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) - type ValueRaw = (Array[(Byte, InnerValLike)], Int) - - private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { - // val degree = Bytes.toLong(kv.value) - val degree = bytesToLongFunc(kv.value, 0) - val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) - val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) - } - - private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { - var qualifierLen = 0 - var pos = 0 - val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { - val (props, endAt) = bytesToProps(kv.qualifier, pos, version) - pos = endAt - qualifierLen += endAt - val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) - } - qualifierLen += tgtVertexIdLen - (props, endAt, tgtVertexId, tgtVertexIdLen) - } - val (op, opLen) = - if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) - else (kv.qualifier(qualifierLen), 1) - - qualifierLen += opLen - - (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) - } - - private def parseValue(kv: SKeyValue, version: String): ValueRaw = { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - (props, endAt) - } - - private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { - (Array.empty[(Byte, InnerValLike)], 0) - } - - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) - }.getOrElse(parseRow(kv, version)) - - val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = - if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) - else parseQualifier(kv, version) - - val allProps = immutable.Map.newBuilder[Byte, InnerValLike] - - /** process index props */ - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } { - if (k == LabelMeta.degreeSeq) allProps += k -> v - else allProps += seq -> v - } - - /** process props */ - if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - } else if (kv.qualifier.isEmpty) { - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.degreeSeq -> InnerVal.withLong(countVal, version)) - } else { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - props.foreach { case (k, v) => - allProps += (k -> v) - } - } - - val _mergedProps = allProps.result() - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - - val tgtVertexId = if (tgtVertexIdInQualifier) { - mergedProps.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } - } else tgtVertexIdRaw - // logger.error(s"$mergedProps") - // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong - - val ts = kv.timestamp - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) - - } - } + + import StorageDeserializable._ + + type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) + type ValueRaw = (Array[(Byte, InnerValLike)], Int) + + private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { + // val degree = Bytes.toLong(kv.value) + val degree = bytesToLongFunc(kv.value, 0) + val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) + val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) + (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) + } + + private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { + var qualifierLen = 0 + var pos = 0 + val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { + val (props, endAt) = bytesToProps(kv.qualifier, pos, version) + pos = endAt + qualifierLen += endAt + val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) + } + qualifierLen += tgtVertexIdLen + (props, endAt, tgtVertexId, tgtVertexIdLen) + } + val (op, opLen) = + if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) + else (kv.qualifier(qualifierLen), 1) + + qualifierLen += opLen + + (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) + } + + private def parseValue(kv: SKeyValue, version: String): ValueRaw = { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + (props, endAt) + } + + private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { + (Array.empty[(Byte, InnerValLike)], 0) + } + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + schemaVer: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val version = kv.timestamp + + val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => + (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) + }.getOrElse(parseRow(kv, schemaVer)) + + val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = + if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer) + else parseQualifier(kv, schemaVer) + + val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + /** process indexProps */ + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) + else allProps += seq -> InnerValLikeWithTs(v, version) + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else if (kv.qualifier.isEmpty) { + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else { + val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) + props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) } + } + + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + + /** process tgtVertexId */ + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } + + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) + + } +} diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala index 189de225..49e95b44 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -59,9 +59,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge val value = if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) else propsToKeyValues(indexEdge.metas.toSeq) val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala index 09d065f5..d60641f6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala @@ -248,6 +248,10 @@ object InnerValLikeWithTs extends HBaseDeserializable { InnerValLikeWithTs(InnerVal.withLong(l, version), ts) } + def withDouble(d: Double, ts: Long, version: String): InnerValLikeWithTs = { + InnerValLikeWithTs(InnerVal.withDouble(d, version), ts) + } + def withStr(s: String, ts: Long, version: String): InnerValLikeWithTs = { InnerValLikeWithTs(InnerVal.withStr(s, version), ts) } diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala index 5edd5b10..ea900616 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala @@ -35,7 +35,7 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { * @param to: to VertexId for edge. * @param props: expected props of edge. */ - def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, InnerValLike]): Unit = { + def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, InnerValLikeWithTs]): Unit = { val from = InnerVal.withLong(1, l.schemaVersion) val vertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, from) val tgtVertexId = TargetVertexId(HBaseType.DEFAULT_COL_ID, to) @@ -59,9 +59,9 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { l <- Seq(label, labelV2, labelV3, labelV4) } { val to = InnerVal.withLong(101, l.schemaVersion) - val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) - val props = Map(LabelMeta.timeStampSeq -> tsInnerVal, - 1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion)) + val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) + val props = Map(LabelMeta.timeStampSeq -> tsInnerValWithTs, + 1.toByte -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion)) check(l, ts, to, props) } @@ -73,10 +73,10 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { l <- Seq(label, labelV2, labelV3, labelV4) } { val to = InnerVal.withStr("0", l.schemaVersion) - val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) + val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) val props = Map( - LabelMeta.degreeSeq -> InnerVal.withLong(10, l.schemaVersion), - LabelMeta.timeStampSeq -> tsInnerVal) + LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion), + LabelMeta.timeStampSeq -> tsInnerValWithTs) check(l, ts, to, props) } @@ -88,13 +88,13 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { l <- Seq(label, labelV2, labelV3, labelV4) } { val to = InnerVal.withLong(101, l.schemaVersion) + val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) + val props = Map(LabelMeta.timeStampSeq -> tsInnerValWithTs, + 1.toByte -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion), + LabelMeta.countSeq -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion)) - val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) - val props = Map(LabelMeta.timeStampSeq -> tsInnerVal, - 1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion), - LabelMeta.countSeq -> InnerVal.withLong(10, l.schemaVersion)) check(l, ts, to, props) } } -} +} \ No newline at end of file From 0d5ed4b6cb5c5c1528116a77be86d4fa319e1a27 Mon Sep 17 00:00:00 2001 From: DO YUNG YOON Date: Tue, 29 Mar 2016 17:12:35 +0900 Subject: [PATCH 3/3] [S2GRAPH-81]: Separate Serializable's toKeyValues into 3, toRowKey, toQualifier, toValue. - split toKeyValues into toRowKey, toQualifier, toValue, so buildRequest only use toRowKey, toQualifier. --- .../core/storage/StorageSerializable.scala | 18 ++++++- .../storage/hbase/AsynchbaseStorage.scala | 19 ++++--- .../tall/IndexEdgeSerializable.scala | 33 +++++------- .../wide/IndexEdgeSerializable.scala | 54 +++++++++---------- .../tall/SnapshotEdgeSerializable.scala | 18 +++---- .../wide/SnapshotEdgeSerializable.scala | 22 ++++---- .../serde/vertex/VertexSerializable.scala | 12 ++++- 7 files changed, 94 insertions(+), 82 deletions(-) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala index b6435e46..b7326f52 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala @@ -56,5 +56,21 @@ object StorageSerializable { } trait StorageSerializable[E] { - def toKeyValues: Seq[SKeyValue] + val cf = Serializable.edgeCf + + val table: Array[Byte] + val ts: Long + + def toRowKey: Array[Byte] + def toQualifier: Array[Byte] + def toValue: Array[Byte] + + def toKeyValues: Seq[SKeyValue] = { + val row = toRowKey + val qualifier = toQualifier + val value = toValue + val kv = SKeyValue(table, row, cf, qualifier, value, ts) + + Seq(kv) + } } diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 66a1be48..4bd222fa 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -181,18 +181,17 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte val label = queryParam.label val edge = toRequestEdge(queryRequest) - val kv = if (queryParam.tgtVertexInnerIdOpt.isDefined) { + val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) { val snapshotEdge = edge.toSnapshotEdge - snapshotEdgeSerializer(snapshotEdge).toKeyValues.head - // new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) + snapshotEdgeSerializer(snapshotEdge) } else { - val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == queryParam.labelOrderSeq) - assert(indexedEdgeOpt.isDefined) - - val indexedEdge = indexedEdgeOpt.get - indexEdgeSerializer(indexedEdge).toKeyValues.head + val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.labelWithDir, + edge.op, edge.version, queryParam.labelOrderSeq, edge.propsWithTs) + indexEdgeSerializer(indexEdge) } + val (rowKey, qualifier) = (serializer.toRowKey, serializer.toQualifier) + val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue)) label.schemaVersion match { @@ -246,8 +245,8 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte scanner case _ => val get = - if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) - else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf) + if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier) + else new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf) get.maxVersions(1) get.setFailfast(true) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala index d00877ec..f17e41ce 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala @@ -29,14 +29,13 @@ import org.apache.s2graph.core.{GraphUtil, IndexEdge} class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { import StorageSerializable._ - val label = indexEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf + override val ts = indexEdge.version + override val table = indexEdge.label.hbaseTableName.getBytes() - val idxPropsMap = indexEdge.orders.toMap - val idxPropsBytes = propsToBytes(indexEdge.orders) + def idxPropsMap = indexEdge.orders.toMap + def idxPropsBytes = propsToBytes(indexEdge.orders) - override def toKeyValues: Seq[SKeyValue] = { + override def toRowKey: Array[Byte] = { val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes val labelWithDirBytes = indexEdge.labelWithDir.bytes val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) @@ -53,20 +52,16 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge } /** TODO search usage of op byte. if there is no, then remove opByte */ - val rowBytes = Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier) - // val qualifierBytes = Array.fill(1)(indexEdge.op) - val qualifierBytes = Array.empty[Byte] + Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier) + } - val value = - if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) - else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) - else propsToKeyValues(indexEdge.metas.toSeq) + override def toQualifier: Array[Byte] = Array.empty[Byte] - val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version) + override def toValue: Array[Byte] = + if (indexEdge.degreeEdge) + Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) + else if (indexEdge.op == GraphUtil.operations("incrementCount")) + Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) + else propsToKeyValues(indexEdge.metas.toSeq) - // logger.debug(s"[Ser]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") - Seq(kv) - } } diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala index 49e95b44..c700e534 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -28,44 +28,40 @@ import org.apache.s2graph.core.{GraphUtil, IndexEdge} class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { import StorageSerializable._ - val label = indexEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf + override val ts = indexEdge.version + override val table = indexEdge.label.hbaseTableName.getBytes() - val idxPropsMap = indexEdge.orders.toMap - val idxPropsBytes = propsToBytes(indexEdge.orders) + def idxPropsMap = indexEdge.orders.toMap + def idxPropsBytes = propsToBytes(indexEdge.orders) - override def toKeyValues: Seq[SKeyValue] = { + override def toRowKey: Array[Byte] = { val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes val labelWithDirBytes = indexEdge.labelWithDir.bytes val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) - val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") + Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + } + + override def toQualifier: Array[Byte] = { val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes - val qualifier = - if (indexEdge.degreeEdge) Array.empty[Byte] - else { - if (indexEdge.op == GraphUtil.operations("incrementCount")) { - Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op)) - } else { - idxPropsMap.get(LabelMeta.toSeq) match { - case None => Bytes.add(idxPropsBytes, tgtIdBytes) - case Some(vId) => idxPropsBytes - } + if (indexEdge.degreeEdge) Array.empty[Byte] + else { + if (indexEdge.op == GraphUtil.operations("incrementCount")) { + Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op)) + } else { + idxPropsMap.get(LabelMeta.toSeq) match { + case None => Bytes.add(idxPropsBytes, tgtIdBytes) + case Some(vId) => idxPropsBytes } } + } + } + override def toValue: Array[Byte] = + if (indexEdge.degreeEdge) + Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) + else if (indexEdge.op == GraphUtil.operations("incrementCount")) + Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) + else propsToKeyValues(indexEdge.metas.toSeq) - val value = - if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) - else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) - else propsToKeyValues(indexEdge.metas.toSeq) - - val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version) - - Seq(kv) - } } diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala index 716a6b9a..4f7c17b1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala @@ -29,9 +29,8 @@ import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { import StorageSerializable._ - val label = snapshotEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf + override val ts = snapshotEdge.version + override val table = snapshotEdge.label.hbaseTableName.getBytes() def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { val byte = (((statusCode << 4) | op).toByte) @@ -40,16 +39,18 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), propsToKeyValuesWithTs(snapshotEdge.props.toList)) - override def toKeyValues: Seq[SKeyValue] = { + override def toRowKey: Array[Byte] = { val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes val labelWithDirBytes = snapshotEdge.labelWithDir.bytes val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) - val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + } - val qualifier = Array.empty[Byte] + override def toQualifier: Array[Byte] = Array.empty[Byte] - val value = snapshotEdge.pendingEdgeOpt match { + override def toValue: Array[Byte] = + snapshotEdge.pendingEdgeOpt match { case None => valueBytes() case Some(pendingEdge) => val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) @@ -60,7 +61,4 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) } - val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) - Seq(kv) - } } diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala index 2eb2b1bd..757ef1b5 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala @@ -34,9 +34,8 @@ import org.apache.s2graph.core.types.VertexId class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { import StorageSerializable._ - val label = snapshotEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf + override val ts = snapshotEdge.version + override val table = snapshotEdge.label.hbaseTableName.getBytes() def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { val byte = (((statusCode << 4) | op).toByte) @@ -45,17 +44,20 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), propsToKeyValuesWithTs(snapshotEdge.props.toList)) - override def toKeyValues: Seq[SKeyValue] = { + + override def toRowKey: Array[Byte] = { val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes val labelWithDirBytes = snapshotEdge.labelWithDir.bytes val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) - val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes + Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + } - val qualifier = tgtIdBytes + override def toQualifier: Array[Byte] = + VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes - val value = snapshotEdge.pendingEdgeOpt match { + override def toValue: Array[Byte] = + snapshotEdge.pendingEdgeOpt match { case None => valueBytes() case Some(pendingEdge) => val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) @@ -64,7 +66,5 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) } - val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) - Seq(kv) - } + } diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala index a74031ac..6bb162c2 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala @@ -25,10 +25,18 @@ import org.apache.s2graph.core.storage.{SKeyValue, Serializable} case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] { - val cf = Serializable.vertexCf + override val table = vertex.hbaseTableName.getBytes + override val ts = vertex.ts + override val cf = Serializable.vertexCf + override def toRowKey: Array[Byte] = vertex.id.bytes + + override def toQualifier: Array[Byte] = Array.empty[Byte] + override def toValue: Array[Byte] = Array.empty[Byte] + + /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */ override def toKeyValues: Seq[SKeyValue] = { - val row = vertex.id.bytes + val row = toRowKey val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] } (base ++ belongsTo).map { case (qualifier, value) =>