Skip to content
Permalink
Browse files
  • Loading branch information
Timothy A. Bish committed Nov 16, 2011
1 parent c223650 commit dc4c097111e0dae3ab6c685e58b3b33c3d9a9cea
Showing 3 changed files with 174 additions and 0 deletions.
@@ -82,6 +82,31 @@ public Session(Connection connection, SessionInfo info, AcknowledgementMode ackn
Dispose(false);
}

#region Session Transaction Events

// We delegate the events to the TransactionContext since it knows
// what the state is at all times.

public event SessionTxEventDelegate TransactionStartedListener
{
add { this.transactionContext.TransactionStartedListener += value; }
remove { this.transactionContext.TransactionStartedListener += value; }
}

public event SessionTxEventDelegate TransactionCommittedListener
{
add { this.transactionContext.TransactionCommittedListener += value; }
remove { this.transactionContext.TransactionCommittedListener += value; }
}

public event SessionTxEventDelegate TransactionRolledBackListener
{
add { this.transactionContext.TransactionRolledBackListener += value; }
remove { this.transactionContext.TransactionRolledBackListener += value; }
}

#endregion

#region Property Accessors

/// <summary>
@@ -71,6 +71,14 @@ public void ResetTransactionInProgress()
}
}

#region Transaction State Events

public event SessionTxEventDelegate TransactionStartedListener;
public event SessionTxEventDelegate TransactionCommittedListener;
public event SessionTxEventDelegate TransactionRolledBackListener;

#endregion

public void Begin()
{
if(!InTransaction)
@@ -83,6 +91,11 @@ public void Begin()
info.Type = (int) TransactionType.Begin;

this.session.Connection.Oneway(info);

if(this.TransactionStartedListener != null)
{
this.TransactionStartedListener(this.session);
}
}
}

@@ -147,6 +160,11 @@ internal void AfterCommit()
{
synchronization.AfterCommit();
}

if(this.TransactionCommittedListener != null)
{
this.TransactionCommittedListener(this.session);
}
}
}

@@ -158,6 +176,11 @@ internal void AfterRollback()
{
synchronization.AfterRollback();
}

if(this.TransactionRolledBackListener != null)
{
this.TransactionRolledBackListener(this.session);
}
}
}
}
@@ -627,5 +627,131 @@ private void WaitReceiveAck()
Assert.IsFalse(ackMessages.Count < MESSAGE_COUNT);
}

[Test]
public void TestTransactionEventsFired()
{
IMessage[] outbound = new IMessage[]
{session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")};

session.TransactionStartedListener += TransactionStarted;
session.TransactionCommittedListener += TransactionCommitted;
session.TransactionRolledBackListener += TransactionRolledBack;

// sends a message
BeginTx();
producer.Send(outbound[0]);
Assert.IsTrue(this.transactionStarted);
CommitTx();
Assert.IsFalse(this.transactionStarted);
Assert.IsTrue(this.transactionCommitted);

// sends a message that gets rollbacked
BeginTx();
producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
Assert.IsTrue(this.transactionStarted);
RollbackTx();
Assert.IsFalse(this.transactionStarted);
Assert.IsTrue(this.transactionRolledBack);

// sends a message
BeginTx();
producer.Send(outbound[1]);
Assert.IsTrue(this.transactionStarted);
CommitTx();
Assert.IsFalse(this.transactionStarted);
Assert.IsTrue(this.transactionCommitted);

// receives the first message
BeginTx();
LinkedList<IMessage> messages = new LinkedList<IMessage>();
IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
messages.AddLast(message);

// receives the second message
message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
Assert.IsTrue(this.transactionStarted);
messages.AddLast(message);

// validates that the rollbacked was not consumed
CommitTx();
Assert.IsFalse(this.transactionStarted);
Assert.IsTrue(this.transactionCommitted);

IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound, 0);
AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
}

[Test]
public void TestMessageListenerGeneratesTxEvents()
{
messageReceived = false;

session.TransactionStartedListener += TransactionStarted;
session.TransactionCommittedListener += TransactionCommitted;
session.TransactionRolledBackListener += TransactionRolledBack;

// Send messages
for(int i = 0; i < MESSAGE_COUNT; i++)
{
producer.Send(session.CreateTextMessage(MESSAGE_TEXT + i));
}

Assert.IsTrue(this.transactionStarted);
CommitTx();
Assert.IsFalse(this.transactionStarted);
Assert.IsTrue(this.transactionCommitted);

consumer.Listener += new MessageListener(OnAsyncTxMessage);

// wait receive
WaitForMessageToBeReceived();
Assert.IsTrue(this.transactionStarted);

CommitTx();
Assert.IsFalse(this.transactionStarted);
Assert.IsTrue(this.transactionCommitted);
}

private bool transactionStarted = false;
private bool transactionCommitted = false;
private bool transactionRolledBack = false;
private bool messageReceived = false;

public void OnAsyncTxMessage(IMessage message)
{
messageReceived = true;
}

private void WaitForMessageToBeReceived()
{
for(int i = 0; i < 100 && !messageReceived; i++)
{
Thread.Sleep(100);
}

Assert.IsTrue(messageReceived);
}

private void TransactionStarted(ISession session)
{
transactionStarted = true;
transactionCommitted = false;
transactionRolledBack = false;
}

private void TransactionCommitted(ISession session)
{
transactionStarted = false;
transactionCommitted = true;
transactionRolledBack = false;
}

private void TransactionRolledBack(ISession session)
{
transactionStarted = false;
transactionCommitted = false;
transactionRolledBack = true;
}
}
}

0 comments on commit dc4c097

Please sign in to comment.