Skip to content

Commit

Permalink
Merge branch 'master' of github.com:ServiceStack/ServiceStack
Browse files Browse the repository at this point in the history
  • Loading branch information
mythz committed Aug 11, 2015
2 parents b76de32 + 91e204c commit e976f6a
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 40 deletions.
6 changes: 3 additions & 3 deletions src/ServiceStack.RabbitMq/RabbitMqMessageFactory.cs
Expand Up @@ -63,7 +63,7 @@ public RabbitMqMessageFactory(ConnectionFactory connectionFactory)
ConnectionFactory = connectionFactory;
}

public IMessageQueueClient CreateMessageQueueClient()
public virtual IMessageQueueClient CreateMessageQueueClient()
{
return new RabbitMqQueueClient(this) {
RetryCount = RetryCount,
Expand All @@ -72,7 +72,7 @@ public IMessageQueueClient CreateMessageQueueClient()
};
}

public IMessageProducer CreateMessageProducer()
public virtual IMessageProducer CreateMessageProducer()
{
return new RabbitMqProducer(this) {
RetryCount = RetryCount,
Expand All @@ -81,7 +81,7 @@ public IMessageProducer CreateMessageProducer()
};
}

public void Dispose()
public virtual void Dispose()
{
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/ServiceStack.RabbitMq/RabbitMqProducer.cs
Expand Up @@ -50,7 +50,7 @@ public RabbitMqProducer(RabbitMqMessageFactory msgFactory)
this.msgFactory = msgFactory;
}

public void Publish<T>(T messageBody)
public virtual void Publish<T>(T messageBody)
{
var message = messageBody as IMessage;
if (message != null)
Expand All @@ -62,28 +62,28 @@ public void Publish<T>(T messageBody)
Publish(new Message<T>(messageBody));
}
}
public void Publish<T>(IMessage<T> message)

public virtual void Publish<T>(IMessage<T> message)
{
Publish(message.ToInQueueName(), message);
}

public void Publish(string queueName, IMessage message)
public virtual void Publish(string queueName, IMessage message)
{
Publish(queueName, message, QueueNames.Exchange);
}

public void SendOneWay(object requestDto)
public virtual void SendOneWay(object requestDto)
{
Publish(MessageFactory.Create(requestDto));
}

public void SendOneWay(string queueName, object requestDto)
public virtual void SendOneWay(string queueName, object requestDto)
{
Publish(queueName, MessageFactory.Create(requestDto));
}

public void SendAllOneWay(IEnumerable<object> requests)
public virtual void SendAllOneWay(IEnumerable<object> requests)
{
if (requests == null) return;
foreach (var request in requests)
Expand All @@ -92,7 +92,7 @@ public void SendAllOneWay(IEnumerable<object> requests)
}
}

public void Publish(string queueName, IMessage message, string exchange)
public virtual void Publish(string queueName, IMessage message, string exchange)
{
using (__requestAccess())
{
Expand Down Expand Up @@ -120,7 +120,7 @@ public void Publish(string queueName, IMessage message, string exchange)

static HashSet<string> Queues = new HashSet<string>();

public void PublishMessage(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body)
public virtual void PublishMessage(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body)
{
try
{
Expand Down Expand Up @@ -162,7 +162,7 @@ public void PublishMessage(string exchange, string routingKey, IBasicProperties
}
}

public BasicGetResult GetMessage(string queueName, bool noAck)
public virtual BasicGetResult GetMessage(string queueName, bool noAck)
{
try
{
Expand Down
14 changes: 7 additions & 7 deletions src/ServiceStack.RabbitMq/RabbitMqQueueClient.cs
Expand Up @@ -10,7 +10,7 @@ public class RabbitMqQueueClient : RabbitMqProducer, IMessageQueueClient
public RabbitMqQueueClient(RabbitMqMessageFactory msgFactory)
: base(msgFactory) {}

public void Notify(string queueName, IMessage message)
public virtual void Notify(string queueName, IMessage message)
{
using (__requestAccess())
{
Expand All @@ -23,7 +23,7 @@ public void Notify(string queueName, IMessage message)
}
}

public IMessage<T> Get<T>(string queueName, TimeSpan? timeOut = null)
public virtual IMessage<T> Get<T>(string queueName, TimeSpan? timeOut = null)
{
using (__requestAccess())
{
Expand All @@ -43,7 +43,7 @@ public IMessage<T> Get<T>(string queueName, TimeSpan? timeOut = null)
}
}

public IMessage<T> GetAsync<T>(string queueName)
public virtual IMessage<T> GetAsync<T>(string queueName)
{
using (__requestAccess())
{
Expand All @@ -52,13 +52,13 @@ public IMessage<T> GetAsync<T>(string queueName)
}
}

public void Ack(IMessage message)
public virtual void Ack(IMessage message)
{
var deliveryTag = ulong.Parse(message.Tag);
Channel.BasicAck(deliveryTag, multiple:false);
}

public void Nak(IMessage message, bool requeue, Exception exception = null)
public virtual void Nak(IMessage message, bool requeue, Exception exception = null)
{
try
{
Expand All @@ -80,7 +80,7 @@ public void Nak(IMessage message, bool requeue, Exception exception = null)
}
}

public IMessage<T> CreateMessage<T>(object mqResponse)
public virtual IMessage<T> CreateMessage<T>(object mqResponse)
{
using (__requestAccess())
{
Expand All @@ -94,7 +94,7 @@ public IMessage<T> CreateMessage<T>(object mqResponse)
}
}

public string GetTempQueueName()
public virtual string GetTempQueueName()
{
var anonMq = Channel.QueueDeclare(
queue: QueueNames.GetTempQueueName(),
Expand Down
28 changes: 14 additions & 14 deletions src/ServiceStack.RabbitMq/RabbitMqServer.cs
Expand Up @@ -159,22 +159,22 @@ private void Init(RabbitMqMessageFactory messageFactory)
}


public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn)
public virtual void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn)
{
RegisterHandler(processMessageFn, null, noOfThreads: 1);
}

public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, int noOfThreads)
public virtual void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, int noOfThreads)
{
RegisterHandler(processMessageFn, null, noOfThreads);
}

public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, Action<IMessageHandler, IMessage<T>, Exception> processExceptionEx)
public virtual void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, Action<IMessageHandler, IMessage<T>, Exception> processExceptionEx)
{
RegisterHandler(processMessageFn, processExceptionEx, noOfThreads: 1);
}

public void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, Action<IMessageHandler, IMessage<T>, Exception> processExceptionEx, int noOfThreads)
public virtual void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, Action<IMessageHandler, IMessage<T>, Exception> processExceptionEx, int noOfThreads)
{
if (handlerMap.ContainsKey(typeof(T)))
{
Expand Down Expand Up @@ -214,7 +214,7 @@ public long BgThreadCount
get { return Interlocked.CompareExchange(ref bgThreadCount, 0, 0); }
}

public IMessageHandlerStats GetStats()
public virtual IMessageHandlerStats GetStats()
{
lock (workers)
{
Expand All @@ -224,7 +224,7 @@ public IMessageHandlerStats GetStats()
}
}

public string GetStatus()
public virtual string GetStatus()
{
switch (Interlocked.CompareExchange(ref status, 0, 0))
{
Expand All @@ -242,7 +242,7 @@ public string GetStatus()
return null;
}

public string GetStatsDescription()
public virtual string GetStatsDescription()
{
lock (workers)
{
Expand All @@ -264,7 +264,7 @@ public string GetStatsDescription()
}
}

public void Init()
public virtual void Init()
{
if (workers != null) return;

Expand Down Expand Up @@ -325,7 +325,7 @@ public void Init()
}
}

public void Start()
public virtual void Start()
{
if (Interlocked.CompareExchange(ref status, 0, 0) == WorkerStatus.Started)
{
Expand Down Expand Up @@ -448,7 +448,7 @@ private void RunLoop()
Log.Debug("Exiting RunLoop()...");
}

public void Stop()
public virtual void Stop()
{
if (Interlocked.CompareExchange(ref status, 0, 0) == WorkerStatus.Disposed)
throw new ObjectDisposedException("MQ Host has been disposed");
Expand All @@ -463,7 +463,7 @@ public void Stop()
}
}

public void Restart()
public virtual void Restart()
{
if (Interlocked.CompareExchange(ref status, 0, 0) == WorkerStatus.Disposed)
throw new ObjectDisposedException("MQ Host has been disposed");
Expand All @@ -478,7 +478,7 @@ public void Restart()
}
}

public void StartWorkerThreads()
public virtual void StartWorkerThreads()
{
Log.Debug("Starting all Rabbit MQ Server worker threads...");
foreach (var worker in workers)
Expand All @@ -495,7 +495,7 @@ public void StartWorkerThreads()
}
}

public void StopWorkerThreads()
public virtual void StopWorkerThreads()
{
Log.Debug("Stopping all Rabbit MQ Server worker threads...");
foreach (var worker in workers)
Expand Down Expand Up @@ -555,7 +555,7 @@ private void KillBgThreadIfExists()
}
}

public void Dispose()
public virtual void Dispose()
{
if (Interlocked.CompareExchange(ref status, 0, 0) == WorkerStatus.Disposed)
return;
Expand Down
12 changes: 6 additions & 6 deletions src/ServiceStack.RabbitMq/RabbitMqWorker.cs
Expand Up @@ -64,7 +64,7 @@ public int MsgNotificationsReceived
this.AutoReconnect = autoConnect;
}

public RabbitMqWorker Clone()
public virtual RabbitMqWorker Clone()
{
return new RabbitMqWorker(mqFactory, messageHandler, QueueName, errorHandler, AutoReconnect);
}
Expand All @@ -88,7 +88,7 @@ private IModel GetChannel()
return channel;
}

public void Start()
public virtual void Start()
{
if (Interlocked.CompareExchange(ref status, 0, 0) == WorkerStatus.Started)
return;
Expand All @@ -111,7 +111,7 @@ public void Start()
}
}

public void ForceRestart()
public virtual void ForceRestart()
{
KillBgThreadIfExists();
Start();
Expand Down Expand Up @@ -289,7 +289,7 @@ private RabbitMqBasicConsumer ConnectSubscription()
return consumer;
}

public void Stop()
public virtual void Stop()
{
if (Interlocked.CompareExchange(ref status, 0, 0) == WorkerStatus.Disposed)
return;
Expand Down Expand Up @@ -365,12 +365,12 @@ public virtual void Dispose()
}
}

public IMessageHandlerStats GetStats()
public virtual IMessageHandlerStats GetStats()
{
return messageHandler.GetStats();
}

public string GetStatus()
public virtual string GetStatus()
{
return "[Worker: {0}, Status: {1}, ThreadStatus: {2}, LastMsgAt: {3}]"
.Fmt(QueueName, WorkerStatus.ToString(status), bgThread.ThreadState, LastMsgProcessed);
Expand Down

0 comments on commit e976f6a

Please sign in to comment.