From 360daba9da38659cc1e1fb9b75ee667265b8700c Mon Sep 17 00:00:00 2001 From: DO YUNG YOON Date: Mon, 28 Mar 2016 22:10:42 +0900 Subject: [PATCH] [S2GRAPH-66]: Optimize toEdge, IndexEdgeDeserializable using mutable Map. use immutable.Map.newBuilder to build merged props. --- .../tall/IndexEdgeDeserializable.scala | 43 +++++++++------- .../wide/IndexEdgeDeserializable.scala | 51 ++++++++++--------- 2 files changed, 52 insertions(+), 42 deletions(-) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index f2339402..da029efc 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -26,6 +26,8 @@ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import scala.collection.immutable + class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { import StorageDeserializable._ @@ -107,7 +109,6 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) } else { // not degree edge - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version) pos = endAt @@ -117,32 +118,38 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version) } - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v - - val idxPropsMap = idxProps.toMap - - val tgtVertexId = - idxPropsMap.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } + val allProps = immutable.Map.newBuilder[Byte, InnerValLike] + /** process index props */ + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> v + else allProps += seq -> v + } + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) + allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) } else { - bytesToKeyValues(kv.value, 0, kv.value.length, version) + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + props.foreach { case (k, v) => + allProps += (k -> v) + } } - - val _mergedProps = (idxProps ++ props).toMap + val _mergedProps = allProps.result() val mergedProps = if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) + } + val ts = kv.timestamp IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index d8bef976..20770c0e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -25,6 +25,8 @@ import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import scala.collection.immutable + class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { import StorageDeserializable._ @@ -89,42 +91,43 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) else parseQualifier(kv, version) - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { + val allProps = immutable.Map.newBuilder[Byte, InnerValLike] + + /** process index props */ + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> v + else allProps += seq -> v + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) + allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) } else if (kv.qualifier.isEmpty) { - parseDegreeValue(kv, version) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.degreeSeq -> InnerVal.withLong(countVal, version)) } else { - parseValue(kv, version) + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + props.foreach { case (k, v) => + allProps += (k -> v) + } } - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - - // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size) - - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield { - if (k == LabelMeta.degreeSeq) k -> v - else seq -> v - } + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - val idxPropsMap = idxProps.toMap val tgtVertexId = if (tgtVertexIdInQualifier) { - idxPropsMap.get(LabelMeta.toSeq) match { + mergedProps.get(LabelMeta.toSeq) match { case None => tgtVertexIdRaw case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) } } else tgtVertexIdRaw - - val _mergedProps = (idxProps ++ props).toMap - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - // logger.error(s"$mergedProps") // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong