From 607bbb244a6686e5fa9654761d7b66d8b9e9b60b Mon Sep 17 00:00:00 2001 From: DO YUNG YOON Date: Sat, 28 May 2016 14:20:09 +0900 Subject: [PATCH] [S2GRAPH-68]: Refactor write-write conflict resolving logic. + more comments and simplified code. + remove excessive re-fetch on every retry from partial failure. + remove Thread.sleep between retry from partial failure. --- .../scala/org/apache/s2graph/core/Graph.scala | 4 +- .../apache/s2graph/core/storage/Storage.scala | 712 ++++++++++-------- s2rest_play/conf/test.conf | 2 +- 3 files changed, 405 insertions(+), 313 deletions(-) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala index ac1d4c12..f4ce7b44 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala @@ -55,12 +55,14 @@ object Graph { "max.retry.number" -> java.lang.Integer.valueOf(100), "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10), "max.back.off" -> java.lang.Integer.valueOf(100), + "back.off.timeout" -> java.lang.Integer.valueOf(1000), "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1), "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000), "future.cache.max.size" -> java.lang.Integer.valueOf(100000), "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), - "s2graph.storage.backend" -> "hbase" + "s2graph.storage.backend" -> "hbase", + "query.hardlimit" -> java.lang.Integer.valueOf(100000) ) var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs) diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index fd968add..7ad4ff1e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -19,6 +19,8 @@ package org.apache.s2graph.core.storage +import java.util.concurrent.{TimeUnit, Executors} + import com.typesafe.config.Config import org.apache.hadoop.hbase.util.Bytes import org.apache.kafka.clients.producer.ProducerRecord @@ -35,7 +37,7 @@ import org.apache.s2graph.core.utils.{Extensions, logger} import scala.annotation.tailrec import scala.collection.Seq import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{Promise, ExecutionContext, Future} import scala.util.{Random, Try} abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { @@ -44,12 +46,18 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { /** storage dependent configurations */ val MaxRetryNum = config.getInt("max.retry.number") val MaxBackOff = config.getInt("max.back.off") + val BackoffTimeout = config.getInt("back.off.timeout") val DeleteAllFetchSize = config.getInt("delete.all.fetch.size") val FailProb = config.getDouble("hbase.fail.prob") - val LockExpireDuration = Math.max(MaxRetryNum * MaxBackOff * 2, 10000) - val maxSize = config.getInt("future.cache.max.size") - val expireAfterWrite = config.getInt("future.cache.expire.after.write") - val expireAfterAccess = config.getInt("future.cache.expire.after.access") + val LockExpireDuration = config.getInt("lock.expire.time") + val MaxSize = config.getInt("future.cache.max.size") + val ExpireAfterWrite = config.getInt("future.cache.expire.after.write") + val ExpireAfterAccess = config.getInt("future.cache.expire.after.access") + + /** retry scheduler */ + val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor() + + val failTopic = s"mutateFailed_${config.getString("phase")}" /** * Compatibility table @@ -348,7 +356,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { case head :: tail => // val strongConsistency = edges.head.label.consistencyLevel == "strong" // if (strongConsistency) { - val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , withWait)(Edge.buildOperation) + val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , withWait) //TODO: decide what we will do on failure on vertex put val puts = buildVertexPutsAsync(head) @@ -392,81 +400,416 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { def mutateEdgesInner(edges: Seq[Edge], checkConsistency: Boolean, - withWait: Boolean)(f: (Option[Edge], Seq[Edge]) => (Edge, EdgeMutate)): Future[Boolean] = { + withWait: Boolean): Future[Boolean] = { + assert(edges.nonEmpty) if (!checkConsistency) { val zkQuorum = edges.head.label.hbaseZkAddr val futures = edges.map { edge => - val (_, edgeUpdate) = f(None, Seq(edge)) - val mutations = - indexedEdgeMutations(edgeUpdate) ++ - snapshotEdgeMutations(edgeUpdate) ++ - increments(edgeUpdate) + val (_, edgeUpdate) = Edge.buildOperation(None, Seq(edge)) + val mutations = indexedEdgeMutations(edgeUpdate) ++ snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate) writeToStorage(zkQuorum, mutations, withWait) } Future.sequence(futures).map { rets => rets.forall(identity) } } else { - def commit(_edges: Seq[Edge], statusCode: Byte): Future[Boolean] = { + fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + retry(1)(edges, 0, snapshotEdgeOpt) + } + } + } - fetchSnapshotEdge(_edges.head) flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + def exponentialBackOff(tryNum: Int) = { + // time slot is divided by 10 ms + val slot = 10 + Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt) + } - val (newEdge, edgeUpdate) = f(snapshotEdgeOpt, _edges) - logger.debug(s"${snapshotEdgeOpt}\n${edgeUpdate.toLogString}") - //shouldReplace false. - if (edgeUpdate.newSnapshotEdge.isEmpty && statusCode <= 0) { - logger.debug(s"${newEdge.toLogString} drop.") - Future.successful(true) - } else { - commitUpdate(newEdge, statusCode)(snapshotEdgeOpt, kvOpt, edgeUpdate).map { ret => - if (ret) { - logger.info(s"[Success] commit: \n${_edges.map(_.toLogString).mkString("\n")}") + def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[Edge]): Future[Boolean] = { + if (tryNum >= MaxRetryNum) { + edges.foreach { edge => + logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}") + + val kafkaMessage = ExceptionHandler.toKafkaMessage(failTopic, element = edge) + ExceptionHandler.enqueue(kafkaMessage) + } + + Future.successful(false) + } else { + val future = commitUpdate(edges, statusCode, fetchedSnapshotEdgeOpt) + future.onSuccess { + case success => + logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n") + } + future recoverWith { + case FetchTimeoutException(retryEdge) => + logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") + /** fetch failed. re-fetch should be done */ + fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) + } + + + case PartialFailureException(retryEdge, failedStatusCode, faileReason) => + val status = failedStatusCode match { + case 0 => "AcquireLock failed." + case 1 => "Mutation failed." + case 2 => "Increment failed." + case 3 => "ReleaseLock failed." + case 4 => "Unknown" + } + logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}") + + /** retry logic */ + val promise = Promise[Boolean] + val backOff = exponentialBackOff(tryNum) + scheduledThreadPool.schedule(new Runnable { + override def run(): Unit = { + val future = if (failedStatusCode == 0) { + // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge. + /** fetch failed. re-fetch should be done */ + fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) + } } else { - throw new PartialFailureException(newEdge, 3, "commit failed.") + // partial failure occur while self locked and mutating. + // assert(fetchedSnapshotEdgeOpt.nonEmpty) + retry(tryNum + 1)(edges, failedStatusCode, fetchedSnapshotEdgeOpt) } - true + promise.completeWith(future) } + + }, backOff, TimeUnit.MILLISECONDS) + promise.future + + case ex: Exception => + logger.error("Unknown exception", ex) + Future.successful(false) + } + } + } + + protected def commitUpdate(edges: Seq[Edge], + statusCode: Byte, + fetchedSnapshotEdgeOpt: Option[Edge]): Future[Boolean] = { +// Future.failed(new PartialFailureException(edges.head, 0, "ahahah")) + assert(edges.nonEmpty) +// assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined) + + statusCode match { + case 0 => + fetchedSnapshotEdgeOpt match { + case None => + /** + * no one has never mutated this SN. + * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges) + * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1) + * lock = (squashedEdge, pendingE) + * releaseLock = (edgeMutate.newSnapshotEdge, None) + */ + val (squashedEdge, edgeMutate) = Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) + + assert(edgeMutate.newSnapshotEdge.isDefined) + + val lockTs = Option(System.currentTimeMillis()) + val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = squashedEdge.ts + 1) + val lockSnapshotEdge = squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) + val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, + pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) + + commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) + + case Some(snapshotEdge) => + snapshotEdge.pendingEdgeOpt match { + case None => + /** + * others finished commit on this SN. but there is no contention. + * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges) + * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ? + * lock = (snapshotEdge, pendingE) + * releaseLock = (edgeMutate.newSnapshotEdge, None) + */ + val (squashedEdge, edgeMutate) = Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) + if (edgeMutate.newSnapshotEdge.isEmpty) { + logger.debug(s"drop this requests: \n${edges.map(_.toLogString).mkString("\n")}") + Future.successful(true) + } else { + val lockTs = Option(System.currentTimeMillis()) + val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1) + val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) + val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, + pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) + commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) + } + case Some(pendingEdge) => + val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis() + if (isLockExpired) { + /** + * if pendingEdge.ts == snapshotEdge.ts => + * (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge)) + * else => + * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, Seq(pendingEdge)) + * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) + * lock = (snapshotEdge, pendingE) + * releaseLock = (edgeMutate.newSnapshotEdge, None) + */ + logger.debug(s"${pendingEdge.toLogString} has been expired.") + val (squashedEdge, edgeMutate) = + if (pendingEdge.ts == snapshotEdge.ts) Edge.buildOperation(None, pendingEdge +: edges) + else Edge.buildOperation(fetchedSnapshotEdgeOpt, pendingEdge +: edges) + + val lockTs = Option(System.currentTimeMillis()) + val newPendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1) + val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge)) + val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, + pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) + + commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) + } else { + /** + * others finished commit on this SN and there is currently contention. + * this can't be proceed so retry from re-fetch. + * throw EX + */ + val (squashedEdge, _) = Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) + Future.failed(new PartialFailureException(squashedEdge, 0, s"others[${pendingEdge.ts}] is mutating. me[${squashedEdge.ts}]")) + } + } + + } + case _ => + + /** + * statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock + */ + + /** + * this succeed to lock this SN. keep doing on commit process. + * if SN.isEmpty => + * no one never succed to commit on this SN. + * this is first mutation try on this SN. + * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges) + * else => + * assert(SN.pengingEdgeOpt.isEmpty) no-fetch after acquire lock when self retrying. + * there has been success commit on this SN. + * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges) + * releaseLock = (edgeMutate.newSnapshotEdge, None) + */ + val _edges = + if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges + else edges + val (squashedEdge, edgeMutate) = Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges) + val newVersion = fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2 + val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match { + case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion) + case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion) + } + // lockSnapshotEdge will be ignored. + commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, releaseLockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) + } + } + /** + * orchestrate commit process. + * we separate into 4 step to avoid duplicating each step over and over. + * @param statusCode: current statusCode of this thread to process edges. + * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. + * @param fetchedSnapshotEdgeOpt: fetched snapshotEdge from storage before commit process begin. + * @param lockSnapshotEdge: lockEdge that hold necessary data to lock this snapshotEdge for this thread. + * @param releaseLockSnapshotEdge: releaseLockEdge that will remove lock by storing new final merged states + * all from current request edges and fetched snapshotEdge. + * @param edgeMutate: mutations for indexEdge and snapshotEdge. + * @return + */ + protected def commitProcess(statusCode: Byte, + squashedEdge: Edge, + fetchedSnapshotEdgeOpt:Option[Edge], + lockSnapshotEdge: SnapshotEdge, + releaseLockSnapshotEdge: SnapshotEdge, + edgeMutate: EdgeMutate): Future[Boolean] = { + for { + locked <- acquireLock(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge) + mutated <- commitIndexEdgeMutations(locked, statusCode, squashedEdge, edgeMutate) + incremented <- commitIndexEdgeDegreeMutations(mutated, statusCode, squashedEdge, edgeMutate) + lockReleased <- releaseLock(incremented, statusCode, squashedEdge, releaseLockSnapshotEdge) + } yield lockReleased + } + + case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: String) extends Exception + + protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = { + val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n") + logger.debug(msg) + } + + protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, edgeMutate: EdgeMutate) = { + val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}", + s"${edgeMutate.toLogString}").mkString("\n") + logger.debug(msg) + } + + /** + * try to acquire lock on storage for this given snapshotEdge(lockEdge). + * @param statusCode: current statusCode of this thread to process edges. + * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug + * @param fetchedSnapshotEdgeOpt: fetched snapshot edge from storage. + * @param lockEdge: lockEdge to build RPC request(compareAndSet) into Storage. + * @return + */ + protected def acquireLock(statusCode: Byte, + squashedEdge: Edge, + fetchedSnapshotEdgeOpt: Option[Edge], + lockEdge: SnapshotEdge): Future[Boolean] = { + if (statusCode >= 1) { + logger.debug(s"skip acquireLock: [$statusCode]\n${squashedEdge.toLogString}") + Future.successful(true) + } else { + val p = Random.nextDouble() + if (p < FailProb) { + Future.failed(new PartialFailureException(squashedEdge, 0, s"$p")) + } else { + val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head + val oldPut = fetchedSnapshotEdgeOpt.map(e => snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head) + writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception => + logger.error(s"AcquireLock RPC Failed.") + throw new PartialFailureException(squashedEdge, 0, "AcquireLock RPC Failed") + }.map { ret => + if (ret) { + val log = Seq( + "\n", + "=" * 50, + s"[Success]: acquireLock", + s"[RequestEdge]: ${squashedEdge.toLogString}", + s"[LockEdge]: ${lockEdge.toLogString()}", + s"[PendingEdge]: ${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}", + "=" * 50, "\n").mkString("\n") + + logger.debug(log) + // debug(ret, "acquireLock", edge.toSnapshotEdge) + } else { + throw new PartialFailureException(squashedEdge, 0, "hbase fail.") } + true } } - def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte)(fn: (Seq[Edge], Byte) => Future[Boolean]): Future[Boolean] = { - if (tryNum >= MaxRetryNum) { - edges.foreach { edge => - logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}") - ExceptionHandler.enqueue(ExceptionHandler.toKafkaMessage(element = edge)) + } + } + + + /** + * change this snapshot's state on storage from locked into committed by + * storing new merged states on storage. merge state come from releaseLockEdge. + * note that releaseLock return Future.failed on predicate failure. + * @param predicate: indicate if this releaseLock phase should be proceed or not. + * @param statusCode: releaseLock do not use statusCode, only for debug. + * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug + * @param releaseLockEdge: final merged states if all process goes well. + * @return + */ + protected def releaseLock(predicate: Boolean, + statusCode: Byte, + squashedEdge: Edge, + releaseLockEdge: SnapshotEdge): Future[Boolean] = { + if (!predicate) { + Future.failed(new PartialFailureException(squashedEdge, 3, "predicate failed.")) + } else { + val p = Random.nextDouble() + if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 3, s"$p")) + else { + val releaseLockEdgePuts = snapshotEdgeSerializer(releaseLockEdge).toKeyValues + writeToStorage(squashedEdge.label.hbaseZkAddr, releaseLockEdgePuts, withWait = true).recoverWith { + case ex: Exception => + logger.error(s"ReleaseLock RPC Failed.") + throw new PartialFailureException(squashedEdge, 3, "ReleaseLock RPC Failed") + }.map { ret => + if (ret) { + debug(ret, "releaseLock", squashedEdge.toSnapshotEdge) + } else { + val msg = Seq("\nFATAL ERROR\n", + "=" * 50, + squashedEdge.toLogString, + releaseLockEdgePuts, + "=" * 50, + "\n" + ) + logger.error(msg.mkString("\n")) + // error(ret, "releaseLock", edge.toSnapshotEdge) + throw new PartialFailureException(squashedEdge, 3, "hbase fail.") } - Future.successful(false) - } else { - val future = fn(edges, statusCode) - future.onSuccess { - case success => - logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n") + true + } + } + } + } + + /** + * + * @param predicate: indicate if this commitIndexEdgeMutations phase should be proceed or not. + * @param statusCode: current statusCode of this thread to process edges. + * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug + * @param edgeMutate: actual collection of mutations. note that edgeMutate contains snapshotEdge mutations, + * but in here, we only use indexEdge's mutations. + * @return + */ + protected def commitIndexEdgeMutations(predicate: Boolean, + statusCode: Byte, + squashedEdge: Edge, + edgeMutate: EdgeMutate): Future[Boolean] = { + if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1, "predicate failed.")) + else { + if (statusCode >= 2) { + logger.debug(s"skip mutate: [$statusCode]\n${squashedEdge.toLogString}") + Future.successful(true) + } else { + val p = Random.nextDouble() + if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 1, s"$p")) + else + writeToStorage(squashedEdge.label.hbaseZkAddr, indexedEdgeMutations(edgeMutate), withWait = true).map { ret => + if (ret) { + debug(ret, "mutate", squashedEdge.toSnapshotEdge, edgeMutate) + } else { + throw new PartialFailureException(squashedEdge, 1, "hbase fail.") + } + true } - future recoverWith { - case FetchTimeoutException(retryEdge) => - logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") - retry(tryNum + 1)(edges, statusCode)(fn) - - case PartialFailureException(retryEdge, failedStatusCode, faileReason) => - val status = failedStatusCode match { - case 0 => "AcquireLock failed." - case 1 => "Mutation failed." - case 2 => "Increment failed." - case 3 => "ReleaseLock failed." - case 4 => "Unknown" - } + } + } + } - Thread.sleep(Random.nextInt(MaxBackOff)) - logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}") - retry(tryNum + 1)(Seq(retryEdge), failedStatusCode)(fn) - case ex: Exception => - logger.error("Unknown exception", ex) - Future.successful(false) + /** + * + * @param predicate: indicate if this commitIndexEdgeMutations phase should be proceed or not. + * @param statusCode: current statusCode of this thread to process edges. + * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug + * @param edgeMutate: actual collection of mutations. note that edgeMutate contains snapshotEdge mutations, + * but in here, we only use indexEdge's degree mutations. + * @return + */ + protected def commitIndexEdgeDegreeMutations(predicate: Boolean, + statusCode: Byte, + squashedEdge: Edge, + edgeMutate: EdgeMutate): Future[Boolean] = { + if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 2, "predicate failed.")) + if (statusCode >= 3) { + logger.debug(s"skip increment: [$statusCode]\n${squashedEdge.toLogString}") + Future.successful(true) + } else { + val p = Random.nextDouble() + if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 2, s"$p")) + else + writeToStorage(squashedEdge.label.hbaseZkAddr, increments(edgeMutate), withWait = true).map { ret => + if (ret) { + debug(ret, "increment", squashedEdge.toSnapshotEdge, edgeMutate) + } else { + throw new PartialFailureException(squashedEdge, 2, "hbase fail.") } + true } - } - retry(1)(edges, 0)(commit) } } + + + + /** end of methods for consistency */ + def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge], newEdge: Edge, edgeMutate: EdgeMutate) = Seq("----------------------------------------------", @@ -733,259 +1076,6 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { // } // } - case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: String) extends Exception - - protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = { - val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n") - logger.debug(msg) - } - - protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, edgeMutate: EdgeMutate) = { - val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}", - s"${edgeMutate.toLogString}").mkString("\n") - logger.debug(msg) - } - - protected def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge, kvOpt: Option[SKeyValue]) = { - val currentTs = System.currentTimeMillis() - val lockTs = snapshotEdgeOpt match { - case None => Option(currentTs) - case Some(snapshotEdge) => - snapshotEdge.pendingEdgeOpt match { - case None => Option(currentTs) - case Some(pendingEdge) => pendingEdge.lockTs - } - } - val newVersion = kvOpt.map(_.timestamp).getOrElse(edge.ts) + 1 - // snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1 - val pendingEdge = edge.copy(version = newVersion, statusCode = 1, lockTs = lockTs) - val base = snapshotEdgeOpt match { - case None => - // no one ever mutated on this snapshotEdge. - edge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) - case Some(snapshotEdge) => - // there is at least one mutation have been succeed. - snapshotEdgeOpt.get.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) - } - base.copy(version = newVersion, statusCode = 1, lockTs = None) - } - - protected def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: SnapshotEdge, - edgeMutate: EdgeMutate) = { - val newVersion = lockEdge.version + 1 - val base = edgeMutate.newSnapshotEdge match { - case None => - // shouldReplace false - assert(snapshotEdgeOpt.isDefined) - snapshotEdgeOpt.get.toSnapshotEdge - case Some(newSnapshotEdge) => newSnapshotEdge - } - base.copy(version = newVersion, statusCode = 0, pendingEdgeOpt = None) - } - - protected def acquireLock(statusCode: Byte, - edge: Edge, - oldSnapshotEdgeOpt: Option[Edge], - lockEdge: SnapshotEdge, - oldBytes: Array[Byte]): Future[Boolean] = { - if (statusCode >= 1) { - logger.debug(s"skip acquireLock: [$statusCode]\n${edge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 0, s"$p") - else { - val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head - val oldPut = oldSnapshotEdgeOpt.map(e => snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head) -// val lockEdgePut = buildPutAsync(lockEdge).head -// val oldPut = oldSnapshotEdgeOpt.map(e => buildPutAsync(e.toSnapshotEdge).head) - writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception => - logger.error(s"AcquireLock RPC Failed.") - throw new PartialFailureException(edge, 0, "AcquireLock RPC Failed") - }.map { ret => - if (ret) { - val log = Seq( - "\n", - "=" * 50, - s"[Success]: acquireLock", - s"[RequestEdge]: ${edge.toLogString}", - s"[LockEdge]: ${lockEdge.toLogString()}", - s"[PendingEdge]: ${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}", - "=" * 50, "\n").mkString("\n") - - logger.debug(log) - // debug(ret, "acquireLock", edge.toSnapshotEdge) - } else { - throw new PartialFailureException(edge, 0, "hbase fail.") - } - true - } - } - } - } - - - - protected def releaseLock(predicate: Boolean, - edge: Edge, - lockEdge: SnapshotEdge, - releaseLockEdge: SnapshotEdge, - _edgeMutate: EdgeMutate, - oldBytes: Array[Byte]): Future[Boolean] = { - if (!predicate) { - throw new PartialFailureException(edge, 3, "predicate failed.") - } - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 3, s"$p") - else { - val releaseLockEdgePut = snapshotEdgeSerializer(releaseLockEdge).toKeyValues.head - val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head - writeLock(releaseLockEdgePut, Option(lockEdgePut)).recoverWith { - case ex: Exception => - logger.error(s"ReleaseLock RPC Failed.") - throw new PartialFailureException(edge, 3, "ReleaseLock RPC Failed") - }.map { ret => - if (ret) { - debug(ret, "releaseLock", edge.toSnapshotEdge) - } else { - val msg = Seq("\nFATAL ERROR\n", - "=" * 50, - oldBytes.toList, - lockEdgePut, - releaseLockEdgePut, - // lockEdgePut.value.toList, - // releaseLockEdgePut.value().toList, - "=" * 50, - "\n" - ) - logger.error(msg.mkString("\n")) - // error(ret, "releaseLock", edge.toSnapshotEdge) - throw new PartialFailureException(edge, 3, "hbase fail.") - } - true - } - } - Future.successful(true) - } - - - protected def mutate(predicate: Boolean, - edge: Edge, - statusCode: Byte, - _edgeMutate: EdgeMutate): Future[Boolean] = { - if (!predicate) throw new PartialFailureException(edge, 1, "predicate failed.") - - if (statusCode >= 2) { - logger.debug(s"skip mutate: [$statusCode]\n${edge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 1, s"$p") - else - writeToStorage(edge.label.hbaseZkAddr, indexedEdgeMutations(_edgeMutate), withWait = true).map { ret => - if (ret) { - debug(ret, "mutate", edge.toSnapshotEdge, _edgeMutate) - } else { - throw new PartialFailureException(edge, 1, "hbase fail.") - } - true - } - } - } - - protected def increment(predicate: Boolean, - edge: Edge, - statusCode: Byte, _edgeMutate: EdgeMutate): Future[Boolean] = { - if (!predicate) throw new PartialFailureException(edge, 2, "predicate failed.") - if (statusCode >= 3) { - logger.debug(s"skip increment: [$statusCode]\n${edge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 2, s"$p") - else - writeToStorage(edge.label.hbaseZkAddr, increments(_edgeMutate), withWait = true).map { ret => - if (ret) { - debug(ret, "increment", edge.toSnapshotEdge, _edgeMutate) - } else { - throw new PartialFailureException(edge, 2, "hbase fail.") - } - true - } - } - } - - - /** this may be overrided by specific storage implementation */ - protected def commitProcess(edge: Edge, statusCode: Byte) - (snapshotEdgeOpt: Option[Edge], kvOpt: Option[SKeyValue]) - (lockEdge: SnapshotEdge, releaseLockEdge: SnapshotEdge, _edgeMutate: EdgeMutate): Future[Boolean] = { - val oldBytes = kvOpt.map(kv => kv.value).getOrElse(Array.empty[Byte]) - for { - locked <- acquireLock(statusCode, edge, snapshotEdgeOpt, lockEdge, oldBytes) - mutated <- mutate(locked, edge, statusCode, _edgeMutate) - incremented <- increment(mutated, edge, statusCode, _edgeMutate) - released <- releaseLock(incremented, edge, lockEdge, releaseLockEdge, _edgeMutate, oldBytes) - } yield { - released - } - } - - protected def commitUpdate(edge: Edge, - statusCode: Byte)(snapshotEdgeOpt: Option[Edge], - kvOpt: Option[SKeyValue], - edgeUpdate: EdgeMutate): Future[Boolean] = { - val label = edge.label - def oldBytes = kvOpt.map(_.value).getOrElse(Array.empty) - - val lockEdge = buildLockEdge(snapshotEdgeOpt, edge, kvOpt) - val releaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, edgeUpdate) - val _process = commitProcess(edge, statusCode)(snapshotEdgeOpt, kvOpt)_ - snapshotEdgeOpt match { - case None => - // no one ever did success on acquire lock. - _process(lockEdge, releaseLockEdge, edgeUpdate) - // process(lockEdge, releaseLockEdge, edgeUpdate, statusCode) - case Some(snapshotEdge) => - // someone did success on acquire lock at least one. - snapshotEdge.pendingEdgeOpt match { - case None => - // not locked - _process(lockEdge, releaseLockEdge, edgeUpdate) - // process(lockEdge, releaseLockEdge, edgeUpdate, statusCode) - case Some(pendingEdge) => - def isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis() - if (isLockExpired) { - val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) None else Option(snapshotEdge) - val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, Seq(pendingEdge)) - val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge, kvOpt) - val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, newLockEdge, newEdgeUpdate) - commitProcess(edge, statusCode = 0)(snapshotEdgeOpt, kvOpt)(newLockEdge, newReleaseLockEdge, newEdgeUpdate).flatMap { ret => - // process(newLockEdge, newReleaseLockEdge, newEdgeUpdate, statusCode = 0).flatMap { ret => - val log = s"[Success]: Resolving expired pending edge.\n${pendingEdge.toLogString}" - throw new PartialFailureException(edge, 0, log) - } - } else { - // locked - if (pendingEdge.ts == edge.ts && statusCode > 0) { - // self locked - val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) None else Option(snapshotEdge) - val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, Seq(edge)) - val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, newEdgeUpdate) - - /** lockEdge will be ignored */ - _process(lockEdge, newReleaseLockEdge, newEdgeUpdate) - // process(lockEdge, newReleaseLockEdge, newEdgeUpdate, statusCode) - } else { - throw new PartialFailureException(edge, statusCode, s"others[${pendingEdge.ts}] is mutating. me[${edge.ts}]") - } - } - } - } - } - - /** end of methods for consistency */ - // def futureCache[T] = Cache[Long, (Long, T)] diff --git a/s2rest_play/conf/test.conf b/s2rest_play/conf/test.conf index 2325dd98..a16b9262 100644 --- a/s2rest_play/conf/test.conf +++ b/s2rest_play/conf/test.conf @@ -18,4 +18,4 @@ # max.retry.number=10000 -hbase.fail.prob=0.1 +hbase.fail.prob=0.05