Skip to content

Commit

Permalink
refined transaction types / allow reads without transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Feb 9, 2014
1 parent d6f8b60 commit 34b9027
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 68 deletions.
24 changes: 13 additions & 11 deletions src/main/scala/net/fwbrasil/radon/ref/Ref.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ trait Sink[-T] {
object notifyingFlag

class Ref[T](pValueOption: Option[T], initialize: Boolean)(implicit val context: TransactionContext)
extends Source[T] with Sink[T] with Lockable with java.io.Serializable {
extends Source[T] with Sink[T] with Lockable with java.io.Serializable {

import context.transactionManager._

Expand Down Expand Up @@ -91,17 +91,19 @@ class Ref[T](pValueOption: Option[T], initialize: Boolean)(implicit val context:
!creationTransactionIsTransient

def get: Option[T] =
nestTransactionIfHasListeners {
val result = getRequiredTransaction.get(this)
if (_weakListenersMap != null)
for (listener <- _weakListenersMap.keys)
listener.notifyGet(this)
result
}

def getOriginalValue: Option[T] =
getTransaction.map { transaction =>
nestTransactionIfHasListeners {
val result = transaction.get(this)
if (_weakListenersMap != null)
for (listener <- _weakListenersMap.keys)
listener.notifyGet(this)
result
}
}.getOrElse(refContent.value)

def getOriginalValue: Option[T] =
getRequiredTransaction.getOriginalValue(this)

def put(pValue: Option[T], pTransaction: => Transaction): Unit =
nestTransactionIfHasListeners {
val value = if (pValue == null) None else pValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import net.fwbrasil.radon.ref.Ref
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

final class NestedTransaction(val parent: Transaction, transactionType: TransactionType = readWrite)(override implicit val context: TransactionContext)
final class NestedTransaction(val parent: Transaction, transactionType: TransactionType = ReadWrite())(override implicit val context: TransactionContext)
extends Transaction(false, transactionType)(context) {

startTimestamp = parent.startTimestamp
Expand Down
80 changes: 42 additions & 38 deletions src/main/scala/net/fwbrasil/radon/transaction/Transaction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import net.fwbrasil.radon.util.ExclusiveThreadLocalItem
import net.fwbrasil.radon.util.Lockable.lockall
import java.util.concurrent.atomic.AtomicLong

class Transaction private[fwbrasil] (val transient: Boolean = false, transactionType: TransactionType = readWrite, val shadow: Boolean = false)(implicit val context: TransactionContext)
extends TransactionValidator
with ExclusiveThreadLocalItem {
class Transaction private[fwbrasil] (val transient: Boolean = false, transactionType: TransactionType = ReadWrite(), val shadow: Boolean = false)(implicit val context: TransactionContext)
extends TransactionValidator
with ExclusiveThreadLocalItem {

def this()(implicit context: TransactionContext) = this(false, readWrite)
def this()(implicit context: TransactionContext) = this(false, ReadWrite())

import context._

Expand All @@ -41,12 +41,12 @@ class Transaction private[fwbrasil] (val transient: Boolean = false, transaction
List()

private[radon] def put[T](ref: Ref[T], value: Option[T]) = {
if(transactionType == readOnly) throw new IllegalStateException("Trying to write on a read only transaction. Ref: " + ref + " Value: " + value)
if (transactionType == readOnly) throw new IllegalStateException("Trying to write on a read only transaction. Ref: " + ref + " Value: " + value)
val anyRef = ref.asInstanceOf[Ref[Any]]
snapshotWrite(anyRef, value)
}

private[radon] def get[T](ref: Ref[T]): Option[T] =
private[radon] def get[T](ref: Ref[T]): Option[T] =
snapshotRead(ref.asInstanceOf[Ref[Any]]).asInstanceOf[Option[T]]

private[radon] def getOriginalValue[T](ref: Ref[T]): Option[T] =
Expand Down Expand Up @@ -112,29 +112,31 @@ class Transaction private[fwbrasil] (val transient: Boolean = false, transaction
}

private def commit(rollback: Boolean): Unit = {
if (!rollback) beforeCommit(this)
if (!rollback) transactionManager.runInTransaction(this)(beforeCommit(this))
updateReadsAndWrites
startIfNotStarted
try {
acquireLocks
validateTransaction
if (!transient && !rollback)
context.makeDurable(this)
flushTransaction
if (!rollback) afterCommit(this)
attachments.clear
} catch {
case e: Throwable =>
prepareRollback
if (transactionType.validateCommit || !refsWrite.isEmpty) {
startIfNotStarted
try {
acquireLocks
if (!shadow)
validateTransaction
if (!transient && !rollback)
context.makeDurable(this)
flushTransaction
if (!rollback) afterCommit(this)
attachments.clear
throw e
} catch {
case e: Throwable =>
prepareRollback
flushTransaction
attachments.clear
throw e
}
}

}

private def asyncCommit(rollback: Boolean)(implicit ectx: ExecutionContext): Future[Unit] = {
if (!rollback) beforeCommit(this)
if (!rollback) transactionManager.runInTransaction(this)(beforeCommit(this))
updateReadsAndWrites
startIfNotStarted
Future {
Expand Down Expand Up @@ -174,25 +176,27 @@ class Transaction private[fwbrasil] (val transient: Boolean = false, transaction
snapshotsIterator.foreach(setRefContent)
}

private def validateTransaction = {
refsReadOnly.foreach(e => {
validateContext(e)
validateConcurrentRefCreation(e)
})
private def validateTransaction =
if (transactionType.validateCommit) {
refsReadOnly.foreach(e => {
validateContext(e)
validateConcurrentRefCreation(e)
})

refsRead.foreach { e =>
validateRead(e)
validateDestroyed(e)
if (transactionType.validateReads)
refsRead.foreach { e =>
validateRead(e)
validateDestroyed(e)
}

refsWrite.foreach(e => {
validateContext(e)
validateConcurrentRefCreation(e)
validateWrite(e)
validateDestroyed(e)
})
}

refsWrite.foreach(e => {
validateContext(e)
validateConcurrentRefCreation(e)
validateWrite(e)
validateDestroyed(e)
})
}

private def setRefContent(snapshot: RefSnapshot) = {
val ref = snapshot.ref
val refContent = ref.refContent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import net.fwbrasil.radon.RetryLimitTransactionException

sealed trait TransactionType
case object readOnly extends TransactionType
case object readWrite extends TransactionType

trait TransactionContext extends PropagationContext {

protected[fwbrasil] val transactionManager =
Expand All @@ -21,9 +17,12 @@ trait TransactionContext extends PropagationContext {
val milisToWaitBeforeRetry = 1

def executionContext = ExecutionContext.Implicits.global

val readOnly = net.fwbrasil.radon.transaction.readOnly
val readWrite = net.fwbrasil.radon.transaction.readWrite

type ReadOnly = net.fwbrasil.radon.transaction.ReadOnly
type ReadWrite = net.fwbrasil.radon.transaction.ReadWrite

val readOnly = net.fwbrasil.radon.transaction.ReadOnly()
val readWrite = net.fwbrasil.radon.transaction.ReadWrite()

private[fwbrasil] implicit val ectx = executionContext

Expand Down Expand Up @@ -61,10 +60,16 @@ trait TransactionContext extends PropagationContext {
}

def asyncTransactional[A](f: => A): Future[A] =
asyncTransactionalChain(Future(f)(_))
asyncTransactionalChain(readWrite)(Future(f)(_))

def asyncTransactional[A](typ: TransactionType)(f: => A): Future[A] =
asyncTransactionalChain(typ)(Future(f)(_))

def asyncTransactionalChain[A](fFuture: TransactionalExecutionContext => Future[A]) = {
val ctx = new TransactionalExecutionContext()(this)
def asyncTransactionalChain[A](fFuture: TransactionalExecutionContext => Future[A]): Future[A] =
asyncTransactionalChain(readWrite)(fFuture)

def asyncTransactionalChain[A](typ: TransactionType)(fFuture: TransactionalExecutionContext => Future[A]) = {
val ctx = new TransactionalExecutionContext(typ)(this)
transactionManager.runInTransactionWithRetryAsync(fFuture(ctx), ctx)
}

Expand All @@ -86,16 +91,16 @@ trait TransactionContext extends PropagationContext {

def makeDurableAsync(transaction: Transaction)(implicit ectx: ExecutionContext): Future[Unit] =
Future()

def beforeCommit(transaction: Transaction) = {}
def afterCommit(transaction: Transaction) = {}

def makeDurable(transaction: Transaction) = {}

}

class TransactionalExecutionContext(implicit val ctx: TransactionContext) extends ExecutionContext {
val transaction = new Transaction
class TransactionalExecutionContext(typ: TransactionType = ReadWrite())(implicit val ctx: TransactionContext) extends ExecutionContext {
val transaction = new Transaction(transactionType = typ)
override def execute(runnable: Runnable): Unit =
ctx.ectx.execute {
new Runnable {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package net.fwbrasil.radon.transaction

sealed trait TransactionType {
val validateCommit: Boolean
val validateReads: Boolean
}
case class ReadOnly(validateCommit: Boolean = true, validateReads: Boolean = true) extends TransactionType
case class ReadWrite(validateCommit: Boolean = true, validateReads: Boolean = true) extends TransactionType
16 changes: 12 additions & 4 deletions src/test/scala/net/fwbrasil/radon/STMSpecification.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,21 @@ class STMSpecification extends Specification {
ref must not be null
(ref := null.asInstanceOf[Long]) must throwA[RequiredTransactionException]
}
"not be read" in {
"be read" in {
val ref = transactional {
new Ref[Long](100)
}
!ref must throwA[RequiredTransactionException]
ref.get must throwA[RequiredTransactionException]
(ref + 1) must throwA[RequiredTransactionException]
!ref === 100
ref.get === Some(100)
(ref + 1) === 101
}
"read None if it is creating" in {
var ref: Ref[Int] = null
val transaction = new Transaction
transactional(transaction) {
ref = new Ref(100)
}
ref.get === None
}
}

Expand Down

0 comments on commit 34b9027

Please sign in to comment.