Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Making the ctor of the definition public
- Loading branch information
1 parent
e8b766a
commit 9e76275
Showing
5 changed files
with
118 additions
and
118 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,100 +1,32 @@ | ||
namespace NServiceBus.Features | ||
namespace NServiceBus | ||
{ | ||
using System; | ||
using System.Linq; | ||
using Pipeline; | ||
using Settings; | ||
using Support; | ||
using Configuration.AdvanceExtensibility; | ||
using Features; | ||
using Transports; | ||
using Transports.SQLServer; | ||
using System.Configuration; | ||
|
||
/// <summary> | ||
/// Configures NServiceBus to use SqlServer as the default transport | ||
/// SqlServer Transport | ||
/// </summary> | ||
class SqlServerTransport : ConfigureTransport | ||
public class SqlServerTransport : TransportDefinition | ||
{ | ||
public const string UseCallbackReceiverSettingKey = "SqlServer.UseCallbackReceiver"; | ||
public const string MaxConcurrencyForCallbackReceiverSettingKey = "SqlServer.MaxConcurrencyForCallbackReceiver"; | ||
|
||
/// <summary> | ||
/// Ctor | ||
/// </summary> | ||
public SqlServerTransport() | ||
{ | ||
Defaults(s => | ||
{ | ||
s.SetDefault(UseCallbackReceiverSettingKey, true); | ||
s.SetDefault(MaxConcurrencyForCallbackReceiverSettingKey, 1); | ||
}); | ||
} | ||
|
||
protected override string ExampleConnectionStringForErrorMessage | ||
{ | ||
get { return @"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True"; } | ||
} | ||
|
||
protected override string GetLocalAddress(ReadOnlySettings settings) | ||
{ | ||
return settings.EndpointName(); | ||
RequireOutboxConsent = true; | ||
} | ||
|
||
protected override void Configure(FeatureConfigurationContext context, string connectionString) | ||
/// <summary> | ||
/// Gives implementations access to the <see cref="T:NServiceBus.BusConfiguration"/> instance at configuration time. | ||
/// </summary> | ||
protected override void Configure(BusConfiguration config) | ||
{ | ||
//Until we refactor the whole address system | ||
Address.IgnoreMachineName(); | ||
|
||
var useCallbackReceiver = context.Settings.Get<bool>(UseCallbackReceiverSettingKey); | ||
var maxConcurrencyForCallbackReceiver = context.Settings.Get<int>(MaxConcurrencyForCallbackReceiverSettingKey); | ||
|
||
var queueName = GetLocalAddress(context.Settings); | ||
var callbackQueue = string.Format("{0}.{1}", queueName, RuntimeEnvironment.MachineName); | ||
|
||
//Load all connectionstrings | ||
var collection = | ||
ConfigurationManager | ||
.ConnectionStrings | ||
.Cast<ConnectionStringSettings>() | ||
.Where(x => x.Name.StartsWith("NServiceBus/Transport/")) | ||
.ToDictionary(x => x.Name.Replace("NServiceBus/Transport/", String.Empty), y => y.ConnectionString); | ||
|
||
if (String.IsNullOrEmpty(connectionString)) | ||
{ | ||
throw new ArgumentException("Sql Transport connection string cannot be empty or null."); | ||
} | ||
|
||
var container = context.Container; | ||
|
||
container.ConfigureComponent<SqlServerQueueCreator>(DependencyLifecycle.InstancePerCall) | ||
.ConfigureProperty(p => p.ConnectionString, connectionString); | ||
|
||
container.ConfigureComponent<SqlServerMessageSender>(DependencyLifecycle.InstancePerCall) | ||
.ConfigureProperty(p => p.DefaultConnectionString, connectionString) | ||
.ConfigureProperty(p => p.ConnectionStringCollection, collection) | ||
.ConfigureProperty(p => p.CallbackQueue, callbackQueue); | ||
|
||
container.ConfigureComponent<SqlServerPollingDequeueStrategy>(DependencyLifecycle.InstancePerCall) | ||
.ConfigureProperty(p => p.ConnectionString, connectionString); | ||
|
||
context.Container.ConfigureComponent(b => new SqlServerStorageContext(b.Build<PipelineExecutor>(), connectionString), DependencyLifecycle.InstancePerUnitOfWork); | ||
|
||
if (useCallbackReceiver) | ||
{ | ||
var callbackAddress = Address.Parse(callbackQueue); | ||
|
||
context.Container.ConfigureComponent<CallbackQueueCreator>(DependencyLifecycle.InstancePerCall) | ||
.ConfigureProperty(p => p.Enabled, true) | ||
.ConfigureProperty(p => p.CallbackQueueAddress, callbackAddress); | ||
|
||
context.Pipeline.Register<PromoteCallbackQueueBehavior.Registration>(); | ||
} | ||
context.Container.RegisterSingleton(new SecondaryReceiveConfiguration(workQueue => | ||
{ | ||
//if this isn't the main queue we shouldn't use callback receiver | ||
if (!useCallbackReceiver || workQueue != queueName) | ||
{ | ||
return SecondaryReceiveSettings.Disabled(); | ||
} | ||
return SecondaryReceiveSettings.Enabled(callbackQueue, maxConcurrencyForCallbackReceiver); | ||
})); | ||
config.EnableFeature<SqlServerTransportFeature>(); | ||
config.EnableFeature<MessageDrivenSubscriptions>(); | ||
config.EnableFeature<TimeoutManagerBasedDeferral>(); | ||
config.GetSettings().EnableFeatureByDefault<StorageDrivenPublishing>(); | ||
config.GetSettings().EnableFeatureByDefault<TimeoutManager>(); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
namespace NServiceBus.Features | ||
{ | ||
using System; | ||
using System.Linq; | ||
using Pipeline; | ||
using Settings; | ||
using Support; | ||
using Transports; | ||
using Transports.SQLServer; | ||
using System.Configuration; | ||
|
||
class SqlServerTransportFeature : ConfigureTransport | ||
{ | ||
public const string UseCallbackReceiverSettingKey = "SqlServer.UseCallbackReceiver"; | ||
public const string MaxConcurrencyForCallbackReceiverSettingKey = "SqlServer.MaxConcurrencyForCallbackReceiver"; | ||
|
||
public SqlServerTransportFeature() | ||
{ | ||
Defaults(s => | ||
{ | ||
s.SetDefault(UseCallbackReceiverSettingKey, true); | ||
s.SetDefault(MaxConcurrencyForCallbackReceiverSettingKey, 1); | ||
}); | ||
} | ||
|
||
protected override string ExampleConnectionStringForErrorMessage | ||
{ | ||
get { return @"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True"; } | ||
} | ||
|
||
protected override string GetLocalAddress(ReadOnlySettings settings) | ||
{ | ||
return settings.EndpointName(); | ||
} | ||
|
||
protected override void Configure(FeatureConfigurationContext context, string connectionString) | ||
{ | ||
//Until we refactor the whole address system | ||
Address.IgnoreMachineName(); | ||
|
||
var useCallbackReceiver = context.Settings.Get<bool>(UseCallbackReceiverSettingKey); | ||
var maxConcurrencyForCallbackReceiver = context.Settings.Get<int>(MaxConcurrencyForCallbackReceiverSettingKey); | ||
|
||
var queueName = GetLocalAddress(context.Settings); | ||
var callbackQueue = string.Format("{0}.{1}", queueName, RuntimeEnvironment.MachineName); | ||
|
||
//Load all connectionstrings | ||
var collection = | ||
ConfigurationManager | ||
.ConnectionStrings | ||
.Cast<ConnectionStringSettings>() | ||
.Where(x => x.Name.StartsWith("NServiceBus/Transport/")) | ||
.ToDictionary(x => x.Name.Replace("NServiceBus/Transport/", String.Empty), y => y.ConnectionString); | ||
|
||
if (String.IsNullOrEmpty(connectionString)) | ||
{ | ||
throw new ArgumentException("Sql Transport connection string cannot be empty or null."); | ||
} | ||
|
||
var container = context.Container; | ||
|
||
container.ConfigureComponent<SqlServerQueueCreator>(DependencyLifecycle.InstancePerCall) | ||
.ConfigureProperty(p => p.ConnectionString, connectionString); | ||
|
||
container.ConfigureComponent<SqlServerMessageSender>(DependencyLifecycle.InstancePerCall) | ||
.ConfigureProperty(p => p.DefaultConnectionString, connectionString) | ||
.ConfigureProperty(p => p.ConnectionStringCollection, collection) | ||
.ConfigureProperty(p => p.CallbackQueue, callbackQueue); | ||
|
||
container.ConfigureComponent<SqlServerPollingDequeueStrategy>(DependencyLifecycle.InstancePerCall) | ||
.ConfigureProperty(p => p.ConnectionString, connectionString); | ||
|
||
context.Container.ConfigureComponent(b => new SqlServerStorageContext(b.Build<PipelineExecutor>(), connectionString), DependencyLifecycle.InstancePerUnitOfWork); | ||
|
||
if (useCallbackReceiver) | ||
{ | ||
var callbackAddress = Address.Parse(callbackQueue); | ||
|
||
context.Container.ConfigureComponent<CallbackQueueCreator>(DependencyLifecycle.InstancePerCall) | ||
.ConfigureProperty(p => p.Enabled, true) | ||
.ConfigureProperty(p => p.CallbackQueueAddress, callbackAddress); | ||
|
||
context.Pipeline.Register<PromoteCallbackQueueBehavior.Registration>(); | ||
} | ||
context.Container.RegisterSingleton(new SecondaryReceiveConfiguration(workQueue => | ||
{ | ||
//if this isn't the main queue we shouldn't use callback receiver | ||
if (!useCallbackReceiver || workQueue != queueName) | ||
{ | ||
return SecondaryReceiveSettings.Disabled(); | ||
} | ||
return SecondaryReceiveSettings.Enabled(callbackQueue, maxConcurrencyForCallbackReceiver); | ||
})); | ||
} | ||
} | ||
} |