Permalink
Browse files

Started working on a fix for #855 - topics

  • Loading branch information...
einari committed Jul 2, 2017
1 parent 3e274a3 commit 4a0d6a473024e3e90d7ddabf537ddb93dbf68b0b
@@ -46,7 +46,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="0.0.5-preview" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="0.0.6-preview" />
</ItemGroup>
<ItemGroup>
@@ -22,7 +22,7 @@ public class CommittedEventStreamReceiver : ICanReceiveCommittedEventStream
readonly ISerializer _serializer;
readonly IApplicationResourceIdentifierConverter _applicationResourceIdentifierConverter;
readonly IApplicationResourceResolver _applicationResourceResolver;
readonly QueueClient _queueClient;
readonly ISubscriptionClient _subscriptionClient;
/// <inheritdoc/>
public event CommittedEventStreamReceived Received = (e) => { };
@@ -34,19 +34,22 @@ public class CommittedEventStreamReceiver : ICanReceiveCommittedEventStream
/// <param name="applicationResourceIdentifierConverter"><see cref="IApplicationResourceIdentifierConverter"/> used for converting resource identifiers</param>
/// <param name="applicationResourceResolver"><see cref="IApplicationResourceResolver"/> used for resolving types from <see cref="IApplicationResourceIdentifier"/></param>
/// <param name="connectionStringProvider"><see cref="ICanProvideConnectionStringToReceiver">Provider</see> of connection string</param>
/// <param name="subscriptionNameProvider"><see cref="ICanProvideSubscriptionNameToReceiver">Provider</see> of subscription name</param>
public CommittedEventStreamReceiver(
ISerializer serializer,
IApplicationResourceIdentifierConverter applicationResourceIdentifierConverter,
IApplicationResourceResolver applicationResourceResolver,
ICanProvideConnectionStringToReceiver connectionStringProvider)
ICanProvideConnectionStringToReceiver connectionStringProvider,
ICanProvideSubscriptionNameToReceiver subscriptionNameProvider)
{
_serializer = serializer;
_applicationResourceIdentifierConverter = applicationResourceIdentifierConverter;
_applicationResourceResolver = applicationResourceResolver;
var connectionString = connectionStringProvider();
_queueClient = new QueueClient(connectionString, Constants.QueueName, ReceiveMode.PeekLock);
_queueClient.RegisterMessageHandler(Receive);
_subscriptionClient = new SubscriptionClient(connectionString, Constants.TopicName, subscriptionNameProvider(), ReceiveMode.PeekLock);
_subscriptionClient.RegisterMessageHandler(Receive);
}
@@ -102,7 +105,7 @@ Task Receive(Message message, CancellationToken token)
var stream = new CommittedEventStream(eventsAndEnvelopes.First().Envelope.EventSourceId, eventsAndEnvelopes);
Received(stream);
_queueClient.CompleteAsync(message.SystemProperties.LockToken);
_subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
return Task.CompletedTask;
}
@@ -18,7 +18,7 @@ public class CommittedEventStreamSender : ICanSendCommittedEventStream
{
readonly ISerializer _serializer;
readonly string _connectionString;
readonly IQueueClient _queueClient;
readonly ITopicClient _topicClient;
/// <summary>
/// Initializes a new instance of <see cref="CommittedEventStreamSender"/>
@@ -30,7 +30,7 @@ public CommittedEventStreamSender(ICanProvideConnectionStringToSender connection
_serializer = serializer;
_connectionString = connectionStringProvider();
_queueClient = new QueueClient(_connectionString, Constants.QueueName, ReceiveMode.PeekLock, RetryPolicy.Default);
_topicClient = new TopicClient(_connectionString, Constants.TopicName, RetryPolicy.Default);
}
/// <inheritdoc/>
@@ -48,7 +48,7 @@ public void Send(CommittedEventStream committedEventStream)
var eventsToSendAsJson = _serializer.ToJson(eventsToSend);
var messageBodyBytes = Encoding.UTF8.GetBytes(eventsToSendAsJson);
var message = new Message(messageBodyBytes);
_queueClient.SendAsync(message);
_topicClient.SendAsync(message);
}
}
}
@@ -31,11 +31,13 @@ public static CommittedEventStreamSenderConfiguration UsingServiceBus(this Commi
/// </summary>
/// <param name="configuration"><see cref="CommittedEventStreamReceiverConfiguration"/> to configure</param>
/// <param name="connectionString">ConnectionString to connect with</param>
/// <param name="subscriptionName">Name of subscription used</param>
/// <returns>Chained <see cref="CommittedEventStreamReceiverConfiguration"/></returns>
public static CommittedEventStreamReceiverConfiguration UsingServiceBus(this CommittedEventStreamReceiverConfiguration configuration, string connectionString)
public static CommittedEventStreamReceiverConfiguration UsingServiceBus(this CommittedEventStreamReceiverConfiguration configuration, string connectionString, string subscriptionName)
{
configuration.CommittedEventStreamReceiver = typeof(CommittedEventStreamReceiver);
Configure.Instance.Container.Bind<ICanProvideConnectionStringToReceiver>(() => connectionString);
Configure.Instance.Container.Bind<ICanProvideSubscriptionNameToReceiver>(() => subscriptionName);
return configuration;
}
}
@@ -12,6 +12,6 @@ public class Constants
/// <summary>
/// The name of the queue being used
/// </summary>
public const string QueueName = "BifrostQueue";
public const string TopicName = "BifrostTopic";
}
}
@@ -0,0 +1,11 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2008-2017 Dolittle. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for license information.
*--------------------------------------------------------------------------------------------*/
namespace Bifrost.Events.Azure.ServiceBus
{
/// <summary>
/// Defines something that can provide a connection string for Redis
/// </summary>
public delegate string ICanProvideSubscriptionNameToReceiver();
}
@@ -40,10 +40,10 @@ public void Configure(IConfigure configure)
e.EventProcessorStates.UsingFiles(eventProcessorsStatePath);
e.EventSourceVersions.UsingFiles(eventSourceVersionsPath);
/*
e.CommittedEventStreamSender.UsingServiceBus(serviceBus);
e.CommittedEventStreamReceiver.UsingServiceBus(serviceBus);
*/
e.CommittedEventStreamReceiver.UsingServiceBus(serviceBus, "MyClient");
/*
var rabbitMQ = "amqp://guest:guest@localhost:5672/";

0 comments on commit 4a0d6a4

Please sign in to comment.