Skip to content

Commit

Permalink
ARTEMIS-1948 dotnet example with high performant load
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jun 21, 2018
1 parent 3534b23 commit d805074
Show file tree
Hide file tree
Showing 7 changed files with 480 additions and 0 deletions.
73 changes: 73 additions & 0 deletions examples/protocols/amqp/dotnet/HighPerformanceLoad/App.cs
@@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using Amqp.Framing;
using Amqp;
using System.Threading.Tasks;

namespace Artemis.Perf
{
class App
{
static long ReceivedMessages = 0;

static void TheCallback(int id, Session session, ReceiverLink link, Message message)
{
Interlocked.Increment(ref ReceivedMessages);
link.Accept(message);
}

static void Main(string[] args) {

if (args.Length == 0) {
args = new string[1];
args[0] = "amqp://127.0.0.1:5672";
}

// it will start one client towards each server
for (int i = 0; i < args.Length; i++) {
string addr0 = args.Length >= 1 ? args[0] : "amqp://127.0.0.1:5672";
processOn(addr0, "orders", 100000000, 25000, "p1");
}

while (true) {
long previousRead = Interlocked.Read(ref ReceivedMessages);
long previousSent = Interlocked.Read(ref Producer.totalSent);
Thread.Sleep(1000);
long currentRead = Interlocked.Read(ref ReceivedMessages);
long currentSent = Interlocked.Read(ref Producer.totalSent);
Console.WriteLine("Received: " + currentRead + " TotalSent: " +
currentSent);
Console.WriteLine("Rate reading: " + (currentRead - previousRead) + ", Rate sending: " + (currentSent - previousSent));
}

}
static void processOn(string addr, string queue, int totalSend, int maxRateSend, String processName) {
Address address = new Address(addr);

Connection connection = new Connection(address);

ReceiverPool pool = new ReceiverPool(connection, 1, queue, 200, TheCallback);
pool.start();

Producer Producer = new Producer(processName, addr, queue, totalSend, maxRateSend);
Producer.produce(); // this will start an asynchronous producer

}
}
}
@@ -0,0 +1,29 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AMQPNetLite" Version="2.1.3" />
</ItemGroup>

</Project>
96 changes: 96 additions & 0 deletions examples/protocols/amqp/dotnet/HighPerformanceLoad/Producer.cs
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Threading;
using Amqp.Framing;
using Amqp;
using System.Threading.Tasks;

namespace Artemis.Perf
{
public class Producer
{
string name;

string addr;
string queue;
int numberOfMessages;
int messagesPerSecond;
long messagesSent;

public static long totalSent;

public Producer(string name, string addr, string queue, int numberOfMessages, int messagesPerSecond) {
this.name = name;
this.addr = addr;
this.queue = queue;
this.numberOfMessages = numberOfMessages;
this.messagesPerSecond = messagesPerSecond;
}


public void produce() {
Address address = new Address(addr);

Connection connection = new Connection(address);


Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender", queue);

OutcomeCallback callback = (l, msg, o, s) => {
Interlocked.Increment(ref messagesSent);
Interlocked.Increment(ref totalSent);
};

// This is just to limit the number of messages per second we are sending
TokenBucketLimiterImpl tokens = new TokenBucketLimiterImpl(messagesPerSecond);

Task.Factory.StartNew(() => {
Console.WriteLine("Sending {0} messages...", numberOfMessages);
for (var i = 0; i < numberOfMessages; i++)
{
tokens.limit();
Message message = new Message("a message!" + i);
message.Header = new Header();
message.Header.Durable = true;
// The callback here is to make the sending to happen as fast as possible
sender.Send(message, callback, null);
}
Console.WriteLine(".... Done sending");
}, TaskCreationOptions.LongRunning);

// Trace.TraceLevel = TraceLevel.Verbose | TraceLevel.Error |
// TraceLevel.Frame | TraceLevel.Information | TraceLevel.Warning;
// Trace.TraceListener = (l, f, o) => Console.WriteLine(DateTime.Now.ToString("[hh:mm:ss.fff]") + " " + string.Format(f, o));

// sender.Close();

// Task.Factory.StartNew(() => {
// while (true) {
// Console.WriteLine("Sent " + Interlocked.Read(ref messagesSent) + " on queue " + queue + " producer " + this.name);
// Thread.Sleep(1000);
// }
// }, TaskCreationOptions.LongRunning);



}
}
}
110 changes: 110 additions & 0 deletions examples/protocols/amqp/dotnet/HighPerformanceLoad/ReceiverPool.cs
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Threading;
using Amqp.Framing;
using Amqp;
using System.Threading.Tasks;

namespace Artemis.Perf
{
/**
* This class will start many consumers underneath it to satisfy a pool of consumers
* While calling a single callback for when messages are received.
*/
public class ReceiverPool
{
public delegate void MessageReceived(int id, Session session, ReceiverLink link, Message msg);

public int MessagesReceived;

MessageReceived _callback;
int _Workers;
private Object receiverLock = new Object();
private Boolean running = true;

private ReceiverLink[] Receivers;
private Session[] Sessions;

private Connection _Connection;

private int Credits;

public ReceiverPool(Connection Connection, int Workers, String queue, int Credits, MessageReceived callback)
{
this._Connection = Connection;
this.Receivers = new ReceiverLink[Workers];
this.Sessions = new Session[Workers];


for (int i = 0; i < Workers; i++)
{

// I was playing with using a single session versus multiple sessions
if (i == 0) {
Sessions[i] = new Session(Connection);
}
else {
Sessions[i] = Sessions[0];
}
Receivers[i] = new ReceiverLink(Sessions[i], "receiver " + queue + " " + i, queue);
}
this._Workers = Workers;
this._callback = callback;
this.Credits = Credits;
}


public void stop() {
running = false;
for (int i = 0; i < _Workers; i++) {
Receivers[i].Close();
Sessions[i].Close();
}
}


public void start() {
for (int i = 0; i < _Workers; i++) {
{
// This variable exists otherwise we would get an olderValue of i
int value = i;
Task.Factory.StartNew(() => WorkerRun(value), TaskCreationOptions.LongRunning);
}
}
}

void WorkerRun(int i) {
try {
Receivers[i].SetCredit(Credits);
while (running)
{
Message theMessage = Receivers[i].Receive(TimeSpan.FromSeconds(1));

if (theMessage != null)
{
_callback(i, Sessions[i], Receivers[i], theMessage);
}
}
} catch (Exception e) {
Console.WriteLine(e);
}
}
}

}
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Threading;
using Amqp.Framing;
using Amqp;
using System.Threading.Tasks;

namespace Artemis.Perf
{

// this has been copied from Artemis' TokenBucketLimiter with some modifications
public class TokenBucketLimiterImpl {

private int rate;

/**
* Even thought we don't use TokenBucket in multiThread
* the implementation should keep this volatile for correctness
*/
private long last;

/**
* Even thought we don't use TokenBucket in multiThread
* the implementation should keep this volatile for correctness
*/
private int tokens;

public TokenBucketLimiterImpl(int rate) {
this.rate = rate;
}
private bool checkRate() {

long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

if (last == 0) {
last = now;
}

long diff = now - last;

if (diff >= 1000) {
last = now;

tokens = rate;
}

if (tokens > 0) {
tokens--;

return true;
} else {
return false;
}
}

public void limit() {
if (!checkRate()) {
// Console.WriteLine("Limiting messages per max rate");
do {
Thread.Sleep(1);
} while (!checkRate());
}
}
}
}

0 comments on commit d805074

Please sign in to comment.