Skip to content

Commit

Permalink
Queue limit and max length bytes (#123)
Browse files Browse the repository at this point in the history
* Добавлена возможность конфигурирования QueueLimit и QueueMaxLengthBytes.

Co-authored-by: Eugene Khramovich <ehramovich@gmail.com>
Co-authored-by: s.karol <s.karol@sdventures.com>
  • Loading branch information
3 people committed Apr 21, 2022
1 parent 8081806 commit c3df282
Show file tree
Hide file tree
Showing 25 changed files with 1,443 additions and 999 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.fake/
.axoCover/
.vs/
.idea/
bin/
obj/
paket-files/
Expand Down
59 changes: 2 additions & 57 deletions Contour.sln
Original file line number Diff line number Diff line change
@@ -1,36 +1,9 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27703.2000
# Visual Studio Version 16
VisualStudioVersion = 16.0.31105.61
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".paket", ".paket", "{164DE6AD-73A5-4F62-85AC-9090D2D720FF}"
ProjectSection(SolutionItems) = preProject
paket.dependencies = paket.dependencies
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Sources", "Sources", "{42F4CDA7-FF32-45B7-A516-30FC94845340}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{568D0EC3-36F1-40C3-9BBD-871AD5A1924A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contour.RabbitMq.Tests", "Tests\Contour.RabbitMq.Tests\Contour.RabbitMq.Tests.csproj", "{E47EE610-DAC2-4A18-B11A-E8CE32B86A20}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contour.Common.Tests", "Tests\Contour.Common.Tests\Contour.Common.Tests.csproj", "{0A47BD78-3FD2-45A9-A991-F7140CE6357B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contour.Configurator.Tests", "Tests\Contour.Configurator.Tests\Contour.Configurator.Tests.csproj", "{64CEECA7-25C5-4ADA-A977-CE57846E1472}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contour.Testing", "Tests\Contour.Testing\Contour.Testing.csproj", "{B3F3D491-5C15-4C21-A155-EA34896253A6}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{1AE4AE50-B6BC-42FC-B102-5B19849877B7}"
ProjectSection(SolutionItems) = preProject
.gitignore = .gitignore
appveyor.yml = appveyor.yml
build.fsx = build.fsx
fake.cmd = fake.cmd
LICENSE.md = LICENSE.md
paket.dependencies = paket.dependencies
README.md = README.md
RELEASE_NOTES.md = RELEASE_NOTES.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contour", "Sources\Contour\Contour.csproj", "{CA81CC00-B276-4DD4-95D9-598485766D8E}"
EndProject
Global
Expand All @@ -41,30 +14,6 @@ Global
Release|X64 = Release|X64
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{E47EE610-DAC2-4A18-B11A-E8CE32B86A20}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E47EE610-DAC2-4A18-B11A-E8CE32B86A20}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E47EE610-DAC2-4A18-B11A-E8CE32B86A20}.Debug|X64.ActiveCfg = Debug|Any CPU
{E47EE610-DAC2-4A18-B11A-E8CE32B86A20}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E47EE610-DAC2-4A18-B11A-E8CE32B86A20}.Release|Any CPU.Build.0 = Release|Any CPU
{E47EE610-DAC2-4A18-B11A-E8CE32B86A20}.Release|X64.ActiveCfg = Release|Any CPU
{0A47BD78-3FD2-45A9-A991-F7140CE6357B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0A47BD78-3FD2-45A9-A991-F7140CE6357B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0A47BD78-3FD2-45A9-A991-F7140CE6357B}.Debug|X64.ActiveCfg = Debug|Any CPU
{0A47BD78-3FD2-45A9-A991-F7140CE6357B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0A47BD78-3FD2-45A9-A991-F7140CE6357B}.Release|Any CPU.Build.0 = Release|Any CPU
{0A47BD78-3FD2-45A9-A991-F7140CE6357B}.Release|X64.ActiveCfg = Release|Any CPU
{64CEECA7-25C5-4ADA-A977-CE57846E1472}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{64CEECA7-25C5-4ADA-A977-CE57846E1472}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64CEECA7-25C5-4ADA-A977-CE57846E1472}.Debug|X64.ActiveCfg = Debug|Any CPU
{64CEECA7-25C5-4ADA-A977-CE57846E1472}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64CEECA7-25C5-4ADA-A977-CE57846E1472}.Release|Any CPU.Build.0 = Release|Any CPU
{64CEECA7-25C5-4ADA-A977-CE57846E1472}.Release|X64.ActiveCfg = Release|Any CPU
{B3F3D491-5C15-4C21-A155-EA34896253A6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B3F3D491-5C15-4C21-A155-EA34896253A6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B3F3D491-5C15-4C21-A155-EA34896253A6}.Debug|X64.ActiveCfg = Debug|Any CPU
{B3F3D491-5C15-4C21-A155-EA34896253A6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B3F3D491-5C15-4C21-A155-EA34896253A6}.Release|Any CPU.Build.0 = Release|Any CPU
{B3F3D491-5C15-4C21-A155-EA34896253A6}.Release|X64.ActiveCfg = Release|Any CPU
{CA81CC00-B276-4DD4-95D9-598485766D8E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CA81CC00-B276-4DD4-95D9-598485766D8E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CA81CC00-B276-4DD4-95D9-598485766D8E}.Debug|X64.ActiveCfg = Debug|Any CPU
Expand All @@ -78,10 +27,6 @@ Global
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{E47EE610-DAC2-4A18-B11A-E8CE32B86A20} = {568D0EC3-36F1-40C3-9BBD-871AD5A1924A}
{0A47BD78-3FD2-45A9-A991-F7140CE6357B} = {568D0EC3-36F1-40C3-9BBD-871AD5A1924A}
{64CEECA7-25C5-4ADA-A977-CE57846E1472} = {568D0EC3-36F1-40C3-9BBD-871AD5A1924A}
{B3F3D491-5C15-4C21-A155-EA34896253A6} = {568D0EC3-36F1-40C3-9BBD-871AD5A1924A}
{CA81CC00-B276-4DD4-95D9-598485766D8E} = {42F4CDA7-FF32-45B7-A516-30FC94845340}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
Expand Down
20 changes: 19 additions & 1 deletion Sources/Contour/Configuration/BusConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using Contour.Sending;
using Contour.Serialization;
using Contour.Validation;

namespace Contour.Configuration
{
/// <summary>
Expand Down Expand Up @@ -505,6 +505,24 @@ public void UseFaultQueueLimit(int queueLimit)
this.ReceiverDefaults.FaultQueueLimit = queueLimit;
}

/// <summary>
/// The message queue length limit.
/// </summary>
/// <param name="queueLimit">The message queue length limit.</param>
public void UseQueueLimit(int queueLimit)
{
this.ReceiverDefaults.QueueLimit = queueLimit;
}

/// <summary>
/// The message queue length limit in bytes.
/// </summary>
/// <param name="bytes">The message queue limit in bytes.</param>
public void UseQueueMaxLengthBytes(int bytes)
{
this.ReceiverDefaults.QueueMaxLengthBytes = bytes;
}

/// <summary>
/// The use payload converter.
/// </summary>
Expand Down
12 changes: 12 additions & 0 deletions Sources/Contour/Configuration/IBusConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,18 @@ public interface IBusConfigurator
/// <param name="queueLimit">The fault message queue length limit.</param>
void UseFaultQueueLimit(int queueLimit);

/// <summary>
/// The message queue length limit.
/// </summary>
/// <param name="queueLimit">The message queue length limit.</param>
void UseQueueLimit(int queueLimit);

/// <summary>
/// The message queue length limit in bytes.
/// </summary>
/// <param name="bytes">The message queue limit in bytes.</param>
void UseQueueMaxLengthBytes(int bytes);

/// <summary>
/// Устанаваливает конвертер тела сообщений.
/// </summary>
Expand Down
34 changes: 33 additions & 1 deletion Sources/Contour/Configurator/AppConfigConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ public IBusConfigurator Configure(string endpointName, IBusConfigurator cfg)
cfg.UseFaultQueueLimit(endpointConfig.FaultQueueLimit.Value);
}

if (endpointConfig.QueueLimit.HasValue)
{
cfg.UseQueueLimit(endpointConfig.QueueLimit.Value);
}

if (endpointConfig.QueueMaxLengthBytes.HasValue)
{
cfg.UseQueueMaxLengthBytes(endpointConfig.QueueMaxLengthBytes.Value);
}

if (endpointConfig.Dynamic != null)
{
if (endpointConfig.Dynamic.Outgoing.HasValue)
Expand Down Expand Up @@ -287,7 +297,29 @@ public IBusConfigurator Configure(string endpointName, IBusConfigurator cfg)
{
configurator.WithParallelismLevel(incomingElement.ParallelismLevel.Value);
}


//Queue limit
if (endpointConfig.QueueLimit.HasValue)
{
configurator.WithQueueLimit(endpointConfig.QueueLimit.Value);
}

if (incomingElement.QueueLimit.HasValue)
{
configurator.WithQueueLimit(incomingElement.QueueLimit.Value);
}

//Queue max length bytes
if (endpointConfig.QueueMaxLengthBytes.HasValue)
{
configurator.WithQueueMaxLengthBytes(endpointConfig.QueueMaxLengthBytes.Value);
}

if (incomingElement.QueueMaxLengthBytes.HasValue)
{
configurator.WithQueueMaxLengthBytes(incomingElement.QueueMaxLengthBytes.Value);
}

// Accept
if (incomingElement.RequiresAccept)
{
Expand Down
4 changes: 4 additions & 0 deletions Sources/Contour/Configurator/Configuration/IEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public interface IEndpoint

int? FaultQueueLimit { get; }

int? QueueLimit { get; }

int? QueueMaxLengthBytes { get; }

string ConnectionStringProvider { get; }

IIncoming[] Incoming { get; }
Expand Down
4 changes: 4 additions & 0 deletions Sources/Contour/Configurator/Configuration/IIncoming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,9 @@ public interface IIncoming : IMessage
string ConnectionString { get; }

bool? ReuseConnection { get; }

int? QueueLimit { get; }

int? QueueMaxLengthBytes { get; }
}
}
19 changes: 19 additions & 0 deletions Sources/Contour/Configurator/EndpointElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,25 @@ public string ConnectionString
}
}

[ConfigurationProperty("queueLimit", IsRequired = false)]
public int? QueueLimit
{
get
{
return (int?)this["queueLimit"];
}
}

[ConfigurationProperty("queueMaxLengthBytes", IsRequired = false)]
public int? QueueMaxLengthBytes
{
get
{
return (int?)this["queueMaxLengthBytes"];
}
}


[ConfigurationProperty("connectionStringProvider", IsRequired = false, DefaultValue = null)]
public string ConnectionStringProvider => (string)this["connectionStringProvider"];

Expand Down
12 changes: 12 additions & 0 deletions Sources/Contour/Configurator/IncomingElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ public string ConnectionString
get { return (bool?) base["reuseConnection"]; }
}

[ConfigurationProperty("queueLimit", IsRequired = false)]
public int? QueueLimit
{
get { return (int?)base["queueLimit"]; }
}

[ConfigurationProperty("queueMaxLengthBytes", IsRequired = false)]
public int? QueueMaxLengthBytes
{
get { return (int?)base["queueMaxLengthBytes"]; }
}

IQos IIncoming.Qos => this.Qos;
}
}
4 changes: 2 additions & 2 deletions Sources/Contour/Contour.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
<TargetFramework>netstandard2.0</TargetFramework>
<PackageTags>contour servicebus bus</PackageTags>
<PackageReleaseNotes>Added easier access to headers</PackageReleaseNotes>
<Copyright>Social Discovery Ventures © 2020</Copyright>
<Copyright>Social Discovery Ventures © 2022</Copyright>
<Description>The package contains abstract interfaces of service bus and specific transport implementation for AMQP/RabbitMQ</Description>
<Company>Social Discovery Ventures</Company>
<Authors>SDV Team</Authors>
<Version>3.0.0-pr8</Version>
<Version>3.0.0-pr9</Version>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
</PropertyGroup>

Expand Down
5 changes: 5 additions & 0 deletions Sources/Contour/Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public class Headers
private static readonly HashSet<string> NotImmutableHeaders =
new HashSet<string>(new[] { CorrelationId, OriginalMessageId, MessageLabel, ReplyRoute });

/// <summary>
/// Максимальное количество байт, которые занимают сообщения в очереди
/// </summary>
public static readonly string QueueMaxLengthBytes = "x-max-length-bytes";

/// <summary>
/// Получает значение заголовка из сообщения и удаляет его из списка заголовков сообщения.
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions Sources/Contour/Receiving/IReceiverConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public interface IReceiverConfigurator
/// <param name="storage">Хранилище заголовков входящего сообщения.</param>
/// <returns>Конфигуратор получателя с установленным хранилищем заголовков.</returns>
IReceiverConfigurator WithIncomingMessageHeaderStorage(IIncomingMessageHeaderStorage storage);

IReceiverConfigurator WithQueueLimit(int queueLimit);
IReceiverConfigurator WithQueueMaxLengthBytes(int maxLengthBytes);

}

/// <summary>
Expand Down Expand Up @@ -156,5 +160,8 @@ public interface IReceiverConfigurator<T> : IReceiverConfigurator
/// <param name="storage">Хранилище заголовков входящего сообщения.</param>
/// <returns>Конфигуратор получателя с установленным хранилищем заголовков.</returns>
new IReceiverConfigurator WithIncomingMessageHeaderStorage(IIncomingMessageHeaderStorage storage);

new IReceiverConfigurator WithQueueLimit(int queueLimit);
new IReceiverConfigurator WithQueueMaxLengthBytes(int maxLengthBytes);
}
}
15 changes: 15 additions & 0 deletions Sources/Contour/Receiving/ReceiverConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,5 +182,20 @@ public IReceiverConfigurator WithIncomingMessageHeaderStorage(IIncomingMessageHe

return this;
}


public IReceiverConfigurator WithQueueLimit(int queueLimit)
{
this.Options.QueueLimit = queueLimit;

return this;
}

public IReceiverConfigurator WithQueueMaxLengthBytes(int bytes)
{
this.Options.QueueMaxLengthBytes = bytes;

return this;
}
}
}
36 changes: 35 additions & 1 deletion Sources/Contour/Receiving/ReceiverOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ public ReceiverOptions(BusOptions parent)
/// </summary>
public Maybe<int> FaultQueueLimit { protected get; set; }

/// <summary>
/// Максимальное количество сообщений в потребляющей очереди.
/// </summary>
public Maybe<int> QueueLimit { protected get; set; }

/// <summary>
/// Максимальное количество байт, которые занимают сообщения в потребляющей очереди.
/// </summary>
public Maybe<int> QueueMaxLengthBytes { protected get; set; }


/// <summary>
/// Обработчик сообщений, для которых не найден потребитель.
/// </summary>
Expand Down Expand Up @@ -131,7 +142,7 @@ public Maybe<bool> IsAcceptRequired()

/// <summary>
/// Возвращает длительность хранения сообщений в Fault очереди.
/// </summary>
/// </summary>
/// <returns>
/// Возвращает длительность хранения сообщений в Fault очереди.
/// </returns>
Expand All @@ -150,5 +161,28 @@ public Maybe<int> GetFaultQueueLimit()
{
return this.Pick<ReceiverOptions, int>((o) => o.FaultQueueLimit);
}

/// <summary>
/// Возвращает максимальное количество сообщений в потребляющей очереди.
/// </summary>
/// <returns>
/// Возвращает максимальное количество сообщений в потребляющей очереди.
/// </returns>
public Maybe<int> GetQueueLimit()
{
return this.Pick<ReceiverOptions, int>((o) => o.QueueLimit);
}

/// <summary>
/// Возвращает максимальное количество байт, которые занимають сообщения в потребляющей очереди.
/// </summary>
/// <returns>
/// Возвращает максимальное количество байт, которые занимають сообщения в потребляющей очереди.
/// </returns>
public Maybe<int> GetQueueMaxLengthBytes()
{
return this.Pick<ReceiverOptions, int>((o) => o.QueueMaxLengthBytes);
}

}
}
10 changes: 10 additions & 0 deletions Sources/Contour/Receiving/TypedReceiverConfigurationDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -256,5 +256,15 @@ IReceiverConfigurator<T> IReceiverConfigurator<T>.WithParallelismLevel(uint para

return this;
}

public IReceiverConfigurator WithQueueLimit(int queueLimit)
{
return this.configuration.WithQueueLimit(queueLimit);
}

public IReceiverConfigurator WithQueueMaxLengthBytes(int maxLengthBytes)
{
return this.configuration.WithQueueMaxLengthBytes(maxLengthBytes);
}
}
}

0 comments on commit c3df282

Please sign in to comment.