Skip to content

Commit

Permalink
Merge 8d1f9b7 into 8e223f2
Browse files Browse the repository at this point in the history
  • Loading branch information
a14n committed Mar 20, 2019
2 parents 8e223f2 + 8d1f9b7 commit 15efb8a
Show file tree
Hide file tree
Showing 81 changed files with 2,245 additions and 2,329 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Expand Up @@ -4,11 +4,12 @@ services:
jobs:
include:
- stage: run tests
dart: dev
dart: stable
script:
- export PATH=$PATH:"~/.pub-cache/bin"
- pub get
- pub global activate coverage
- dartfmt -n --set-exit-if-changed .
- dartanalyzer --fatal-warnings --fatal-hints --fatal-lints ./
- cd test
- bash run.sh
Expand Down
12 changes: 6 additions & 6 deletions example/hello/receive.dart
Expand Up @@ -2,7 +2,7 @@ import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

void main() {
Client client = new Client();
Client client = Client();

// Setup a signal handler to cleanly exit if CTRL+C is pressed
ProcessSignal.sigint.watch().listen((_) {
Expand All @@ -12,13 +12,13 @@ void main() {
});

client
.channel()
.then((Channel channel) => channel.queue("hello"))
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
.channel()
.then((Channel channel) => channel.queue("hello"))
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
print(" [*] Waiting for messages. To exit, press CTRL+C");
consumer.listen((AmqpMessage message) {
print(" [x] Received ${message.payloadAsString}");
});
});
}
}
18 changes: 9 additions & 9 deletions example/hello/send.dart
@@ -1,13 +1,13 @@
import "package:dart_amqp/dart_amqp.dart";

void main(){
Client client = new Client();
void main() {
Client client = Client();
client
.channel()
.then((Channel channel) => channel.queue("hello"))
.then((Queue queue){
queue.publish("Hello World!");
print(" [x] Sent 'Hello World!'");
return client.close();
.channel()
.then((Channel channel) => channel.queue("hello"))
.then((Queue queue) {
queue.publish("Hello World!");
print(" [x] Sent 'Hello World!'");
return client.close();
});
}
}
10 changes: 5 additions & 5 deletions example/pubsub/emit_log.dart
@@ -1,15 +1,15 @@
import "package:dart_amqp/dart_amqp.dart";

void main(List<String> args) {
Client client = new Client();
Client client = Client();
client
.channel()
.then((Channel channel) => channel.exchange("logs", ExchangeType.FANOUT))
.then((Exchange exchange) {
.channel()
.then((Channel channel) => channel.exchange("logs", ExchangeType.FANOUT))
.then((Exchange exchange) {
String message = args.join(' ');
// We dont care about the routing key as our exchange type is FANOUT
exchange.publish(message, null);
print(" [x] Sent ${message}");
return client.close();
});
}
}
16 changes: 8 additions & 8 deletions example/pubsub/receive_logs.dart
Expand Up @@ -2,7 +2,7 @@ import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

void main() {
Client client = new Client();
Client client = Client();

// Setup a signal handler to cleanly exit if CTRL+C is pressed
ProcessSignal.sigint.watch().listen((_) {
Expand All @@ -12,14 +12,14 @@ void main() {
});

client
.channel()
.then((Channel channel) => channel.exchange("logs", ExchangeType.FANOUT))
.then((Exchange exchange) => exchange.bindPrivateQueueConsumer(null))
.then((Consumer consumer) {
print(" [*] Waiting for logs on private queue ${consumer.queue.name}. To exit, press CTRL+C");
.channel()
.then((Channel channel) => channel.exchange("logs", ExchangeType.FANOUT))
.then((Exchange exchange) => exchange.bindPrivateQueueConsumer(null))
.then((Consumer consumer) {
print(
" [*] Waiting for logs on private queue ${consumer.queue.name}. To exit, press CTRL+C");
consumer.listen((AmqpMessage message) {
print(" [x] ${message.payloadAsString}");
});
});

}
}
11 changes: 6 additions & 5 deletions example/routing/emit_log_direct.dart
Expand Up @@ -17,15 +17,16 @@ void main(List<String> args) {

String severity = args.first;

Client client = new Client();
Client client = Client();
client
.channel()
.then((Channel channel) => channel.exchange("direct_logs", ExchangeType.DIRECT))
.then((Exchange exchange) {
.channel()
.then((Channel channel) =>
channel.exchange("direct_logs", ExchangeType.DIRECT))
.then((Exchange exchange) {
String message = args.sublist(1).join(' ');
// Use 'severity' as our routing key
exchange.publish(message, severity);
print(" [x] Sent [${severity}] ${message}");
return client.close();
});
}
}
23 changes: 13 additions & 10 deletions example/routing/receive_logs_direct.dart
Expand Up @@ -3,7 +3,8 @@ import "package:dart_amqp/dart_amqp.dart";

void main(List<String> args) {
if (args.isEmpty ||
!args.every((String arg) => ["info", "warning", "error"].indexOf(arg) != -1)) {
!args.every(
(String arg) => ["info", "warning", "error"].indexOf(arg) != -1)) {
print("""
Error: invalid arguments. Please invoke as:
Expand All @@ -19,7 +20,7 @@ void main(List<String> args) {
exit(1);
}

Client client = new Client();
Client client = Client();

// Setup a signal handler to cleanly exit if CTRL+C is pressed
ProcessSignal.sigint.watch().listen((_) {
Expand All @@ -29,14 +30,16 @@ void main(List<String> args) {
});

client
.channel()
.then((Channel channel) => channel.exchange("direct_logs", ExchangeType.DIRECT))
.then((Exchange exchange) => exchange.bindPrivateQueueConsumer(args))
.then((Consumer consumer) {
print(" [*] Waiting for [${args.join(', ')}] logs on private queue ${consumer.queue.name}. To exit, press CTRL+C");
.channel()
.then((Channel channel) =>
channel.exchange("direct_logs", ExchangeType.DIRECT))
.then((Exchange exchange) => exchange.bindPrivateQueueConsumer(args))
.then((Consumer consumer) {
print(
" [*] Waiting for [${args.join(', ')}] logs on private queue ${consumer.queue.name}. To exit, press CTRL+C");
consumer.listen((AmqpMessage message) {
print(" [x] [Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}");
print(
" [x] [Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}");
});
});

}
}
71 changes: 35 additions & 36 deletions example/rpc/rpc_client.dart
Expand Up @@ -4,28 +4,28 @@ import "package:dart_amqp/dart_amqp.dart";

class FibonacciRpcClient {
int _nextCorrelationId = 1;
final Completer connected = new Completer();
final Completer connected = Completer();
final Client client;
final Map<String, Completer> _pendingOperations = new Map<String, Completer>();
final Map<String, Completer> _pendingOperations = Map<String, Completer>();
Queue _serverQueue;
String _replyQueueName;

FibonacciRpcClient() : client = new Client() {
FibonacciRpcClient() : client = Client() {
client
.channel()
.then((Channel channel) => channel.queue("rpc_queue"))
.then((Queue rpcQueue) {
_serverQueue = rpcQueue;
.channel()
.then((Channel channel) => channel.queue("rpc_queue"))
.then((Queue rpcQueue) {
_serverQueue = rpcQueue;

// Allocate a private queue for server responses
return rpcQueue.channel.privateQueue();
})
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
_replyQueueName = consumer.queue.name;
consumer.listen(handleResponse);
connected.complete();
});
// Allocate a private queue for server responses
return rpcQueue.channel.privateQueue();
})
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
_replyQueueName = consumer.queue.name;
consumer.listen(handleResponse);
connected.complete();
});
}

void handleResponse(AmqpMessage message) {
Expand All @@ -35,50 +35,49 @@ class FibonacciRpcClient {
}

_pendingOperations
.remove(message.properties.corellationId)
.complete(int.parse(message.payloadAsString));
.remove(message.properties.corellationId)
.complete(int.parse(message.payloadAsString));
}

Future<int> call(int n) {
// Make sure we are connected before sending the request
return connected.future
.then((_) {
return connected.future.then((_) {
String uuid = "${_nextCorrelationId++}";
Completer<int> completer = new Completer<int>();
Completer<int> completer = Completer<int>();

MessageProperties properties = new MessageProperties()
MessageProperties properties = MessageProperties()
..replyTo = _replyQueueName
..corellationId = uuid;

_pendingOperations[ uuid ] = completer;
_pendingOperations[uuid] = completer;

_serverQueue.publish({"n" : n}, properties : properties);
_serverQueue.publish({"n": n}, properties: properties);

return completer.future;
});
}

Future close() {
// Kill any pending responses
_pendingOperations.forEach((_, Completer completer) => completer.completeError("RPC client shutting down"));
_pendingOperations.forEach((_, Completer completer) =>
completer.completeError("RPC client shutting down"));
_pendingOperations.clear();

return client.close();
}
}

main(List<String> args) {
FibonacciRpcClient client = new FibonacciRpcClient();
FibonacciRpcClient client = FibonacciRpcClient();

int n = args.isEmpty
? 30
: num.parse(args[0]);
int n = args.isEmpty ? 30 : num.parse(args[0]);

// Make 10 parallel calls and get fib(1) to fib(10)
client.call(n)
.then((int res) {
print(" [x] fib(${n}) = ${res}");
})
.then((_) => client.close())
.then((_) => exit(0));
}
client
.call(n)
.then((int res) {
print(" [x] fib(${n}) = ${res}");
})
.then((_) => client.close())
.then((_) => exit(0));
}
15 changes: 7 additions & 8 deletions example/rpc/rpc_server.dart
Expand Up @@ -12,8 +12,7 @@ int fib(int n) {
}

void main(List<String> args) {

Client client = new Client();
Client client = Client();

// Setup a signal handler to cleanly exit if CTRL+C is pressed
ProcessSignal.sigint.watch().listen((_) {
Expand All @@ -23,16 +22,16 @@ void main(List<String> args) {
});

client
.channel()
.then((Channel channel) => channel.qos(0, 1))
.then((Channel channel) => channel.queue("rpc_queue"))
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
.channel()
.then((Channel channel) => channel.qos(0, 1))
.then((Channel channel) => channel.queue("rpc_queue"))
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
print(" [x] Awaiting RPC request");
consumer.listen((AmqpMessage message) {
int n = message.payloadAsJson["n"];
print(" [.] fib(${n})");
message.reply(fib(n).toString());
});
});
}
}
11 changes: 6 additions & 5 deletions example/topics/emit_log_topic.dart
Expand Up @@ -17,15 +17,16 @@ void main(List<String> args) {

String routingKey = args.first;

Client client = new Client();
Client client = Client();
client
.channel()
.then((Channel channel) => channel.exchange("topic_logs", ExchangeType.TOPIC))
.then((Exchange exchange) {
.channel()
.then((Channel channel) =>
channel.exchange("topic_logs", ExchangeType.TOPIC))
.then((Exchange exchange) {
String message = args.sublist(1).join(' ');
// Use 'severity' as our routing key
exchange.publish(message, routingKey);
print(" [x] Sent [${routingKey}] ${message}");
return client.close();
});
}
}
20 changes: 11 additions & 9 deletions example/topics/receive_logs_topic.dart
Expand Up @@ -18,7 +18,7 @@ void main(List<String> args) {
exit(1);
}

Client client = new Client();
Client client = Client();

// Setup a signal handler to cleanly exit if CTRL+C is pressed
ProcessSignal.sigint.watch().listen((_) {
Expand All @@ -28,14 +28,16 @@ void main(List<String> args) {
});

client
.channel()
.then((Channel channel) => channel.exchange("topic_logs", ExchangeType.TOPIC))
.then((Exchange exchange) => exchange.bindPrivateQueueConsumer(args))
.then((Consumer consumer) {
print(" [*] Waiting for [${args.join(', ')}] logs on private queue ${consumer.queue.name}. To exit, press CTRL+C");
.channel()
.then((Channel channel) =>
channel.exchange("topic_logs", ExchangeType.TOPIC))
.then((Exchange exchange) => exchange.bindPrivateQueueConsumer(args))
.then((Consumer consumer) {
print(
" [*] Waiting for [${args.join(', ')}] logs on private queue ${consumer.queue.name}. To exit, press CTRL+C");
consumer.listen((AmqpMessage message) {
print(" [x] [Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}");
print(
" [x] [Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}");
});
});

}
}

0 comments on commit 15efb8a

Please sign in to comment.