Skip to content

Commit d312d5e

Browse files
Merge branch 'rockie-yang-master'
2 parents e12001f + 960a8fa commit d312d5e

17 files changed

+715
-0
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,7 @@ java*/.idea/encodings.xml
2323
.vscode/
2424
obj/
2525
bin/
26+
target/
27+
.DS_Store
28+
*.iml
29+
.idea/

scala/README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# RabbitMQ Tutorials in Scala
2+
3+
This is a minimalistic Scala port of the RabbitMQ tutorials in Java.
4+
The port is admittedly quite close to Java in terms of code style.
5+
This is primarily to the fact that RabbitMQ Java client still supports
6+
JDK 6 and doesn't have a lambda-friendly API.
7+
8+
9+
## Compiling the Code
10+
11+
mvn compile
12+
13+
14+
## Running Examples
15+
16+
### Hello World
17+
18+
Execute the following command to receive a hello world:
19+
20+
mvn exec:java -Dexec.mainClass="Recv"
21+
22+
Execute the following in a separate shell to send a hello world:
23+
24+
mvn exec:java -Dexec.mainClass="Send"
25+
26+
### Work Queues
27+
28+
Send a message which will be finshed immediately:
29+
30+
mvn exec:java -Dexec.mainClass="NewTask"
31+
32+
Send a message which need some second to execute each . is one second.
33+
34+
mvn exec:java -Dexec.mainClass="NewTask" -Dexec.args="rabbit1 ...."
35+
36+
To start a worker (run in a separate shell):
37+
38+
mvn exec:java -Dexec.mainClass="Worker"
39+
40+
Add more workers to the same queue, message will be distributed in the
41+
round robin manner.
42+
43+
### Publish and Subscriber
44+
45+
mvn exec:java -Dexec.mainClass="EmitLog" -Dexec.args="rabbit1 msg1"
46+
47+
mvn exec:java -Dexec.mainClass="ReceiveLogs"
48+
49+
### RPC
50+
51+
In one shell:
52+
53+
mvn exec:java -Dexec.mainClass="RPCServer"
54+
55+
In another shell:
56+
57+
mvn exec:java -Dexec.mainClass="RPCClient"

scala/pom.xml

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<groupId>explore</groupId>
4+
<artifactId>rabbitmq</artifactId>
5+
<version>0.1-SNAPSHOT</version>
6+
<properties>
7+
<scala.version>2.11.8</scala.version>
8+
</properties>
9+
10+
<repositories>
11+
<repository>
12+
<id>scala-tools.org</id>
13+
<name>Scala-Tools Maven2 Repository</name>
14+
<url>http://scala-tools.org/repo-releases</url>
15+
</repository>
16+
</repositories>
17+
18+
<pluginRepositories>
19+
<pluginRepository>
20+
<id>scala-tools.org</id>
21+
<name>Scala-Tools Maven2 Repository</name>
22+
<url>http://scala-tools.org/repo-releases</url>
23+
</pluginRepository>
24+
</pluginRepositories>
25+
26+
<dependencies>
27+
<dependency>
28+
<groupId>org.scala-lang</groupId>
29+
<artifactId>scala-library</artifactId>
30+
<version>${scala.version}</version>
31+
</dependency>
32+
<dependency>
33+
<groupId>com.rabbitmq</groupId>
34+
<artifactId>amqp-client</artifactId>
35+
<version>4.0.0</version>
36+
</dependency>
37+
</dependencies>
38+
39+
<build>
40+
<sourceDirectory>src/main/scala</sourceDirectory>
41+
<testSourceDirectory>src/test/scala</testSourceDirectory>
42+
<plugins>
43+
<plugin>
44+
<groupId>org.scala-tools</groupId>
45+
<artifactId>maven-scala-plugin</artifactId>
46+
<executions>
47+
<execution>
48+
<goals>
49+
<goal>compile</goal>
50+
<goal>testCompile</goal>
51+
</goals>
52+
</execution>
53+
</executions>
54+
<configuration>
55+
<scalaVersion>${scala.version}</scalaVersion>
56+
<args>
57+
<arg>-target:jvm-1.5</arg>
58+
</args>
59+
</configuration>
60+
</plugin>
61+
<plugin>
62+
<groupId>org.apache.maven.plugins</groupId>
63+
<artifactId>maven-eclipse-plugin</artifactId>
64+
<configuration>
65+
<downloadSources>true</downloadSources>
66+
<buildcommands>
67+
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
68+
</buildcommands>
69+
<additionalProjectnatures>
70+
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
71+
</additionalProjectnatures>
72+
<classpathContainers>
73+
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
74+
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
75+
</classpathContainers>
76+
</configuration>
77+
</plugin>
78+
</plugins>
79+
</build>
80+
<reporting>
81+
<plugins>
82+
<plugin>
83+
<groupId>org.scala-tools</groupId>
84+
<artifactId>maven-scala-plugin</artifactId>
85+
<configuration>
86+
<scalaVersion>${scala.version}</scalaVersion>
87+
</configuration>
88+
</plugin>
89+
</plugins>
90+
</reporting>
91+
</project>

scala/src/main/scala/EmitLog.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import com.rabbitmq.client.ConnectionFactory
2+
3+
object EmitLog {
4+
5+
private val EXCHANGE_NAME = "logs"
6+
7+
def main(argv: Array[String]) {
8+
val factory = new ConnectionFactory()
9+
factory.setHost("localhost")
10+
val connection = factory.newConnection()
11+
val channel = connection.createChannel()
12+
channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
13+
val message = getMessage(argv)
14+
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"))
15+
println(" [x] Sent '" + message + "'")
16+
channel.close()
17+
connection.close()
18+
}
19+
20+
private def getMessage(strings: Array[String]): String = {
21+
if (strings.length < 1) return "info: Hello World!"
22+
joinStrings(strings, " ")
23+
}
24+
25+
private def joinStrings(strings: Array[String], delimiter: String): String = {
26+
val length = strings.length
27+
if (length == 0) return ""
28+
val words = new StringBuilder(strings(0))
29+
for (i <- 1 until length) {
30+
words.append(delimiter).append(strings(i))
31+
}
32+
words.toString
33+
}
34+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import com.rabbitmq.client.ConnectionFactory
2+
3+
4+
object EmitLogDirect {
5+
6+
private val EXCHANGE_NAME = "direct_logs"
7+
8+
def main(argv: Array[String]) {
9+
val factory = new ConnectionFactory()
10+
factory.setHost("localhost")
11+
val connection = factory.newConnection()
12+
val channel = connection.createChannel()
13+
channel.exchangeDeclare(EXCHANGE_NAME, "direct")
14+
val severity = getSeverity(argv)
15+
val message = getMessage(argv)
16+
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"))
17+
println(" [x] Sent '" + severity + "':'" + message + "'")
18+
channel.close()
19+
connection.close()
20+
}
21+
22+
private def getSeverity(strings: Array[String]): String = {
23+
if (strings.length < 1) return "info"
24+
strings(0)
25+
}
26+
27+
private def getMessage(strings: Array[String]): String = {
28+
if (strings.length < 2) return "Hello World!"
29+
joinStrings(strings, " ", 1)
30+
}
31+
32+
private def joinStrings(strings: Array[String], delimiter: String, startIndex: Int): String = {
33+
val length = strings.length
34+
if (length == 0) return ""
35+
if (length < startIndex) return ""
36+
val words = new StringBuilder(strings(startIndex))
37+
for (i <- startIndex + 1 until length) {
38+
words.append(delimiter).append(strings(i))
39+
}
40+
words.toString
41+
}
42+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import java.util.HashMap
2+
3+
import com.rabbitmq.client._
4+
//remove if not needed
5+
6+
object EmitLogHeader {
7+
8+
private val EXCHANGE_NAME = "header_test"
9+
10+
def main(argv: Array[String]) {
11+
if (argv.length < 1) {
12+
System.err.println("Usage: EmitLogHeader message queueName [headers]...")
13+
System.exit(1)
14+
}
15+
val routingKey = "ourTestRoutingKey"
16+
val message = argv(0)
17+
val headers = new HashMap[String, Object]()
18+
for (i <- 1 until argv.length by 2) {
19+
println("Adding header " + argv(i) + " with value " + argv(i + 1) +
20+
" to Map")
21+
headers.put(argv(i), argv(i + 1))
22+
}
23+
val factory = new ConnectionFactory()
24+
factory.setHost("localhost")
25+
26+
val connection = factory.newConnection()
27+
val channel = connection.createChannel()
28+
channel.exchangeDeclare(EXCHANGE_NAME, "headers")
29+
val builder = new AMQP.BasicProperties.Builder()
30+
builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode)
31+
builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority)
32+
builder.headers(headers)
33+
val theProps = builder.build()
34+
channel.basicPublish(EXCHANGE_NAME, routingKey, theProps, message.getBytes("UTF-8"))
35+
println(" [x] Sent message: '" + message + "'")
36+
}
37+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
2+
3+
object EmitLogTopic {
4+
5+
private val EXCHANGE_NAME = "topic_logs"
6+
7+
def main(argv: Array[String]) {
8+
var connection: Connection = null
9+
var channel: Channel = null
10+
try {
11+
val factory = new ConnectionFactory()
12+
factory.setHost("localhost")
13+
connection = factory.newConnection()
14+
channel = connection.createChannel()
15+
channel.exchangeDeclare(EXCHANGE_NAME, "topic")
16+
val routingKey = getRouting(argv)
17+
val message = getMessage(argv)
18+
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"))
19+
println(" [x] Sent '" + routingKey + "':'" + message + "'")
20+
} catch {
21+
case e: Exception => e.printStackTrace()
22+
} finally {
23+
if (connection != null) {
24+
try {
25+
connection.close()
26+
} catch {
27+
case ignore: Exception =>
28+
}
29+
}
30+
}
31+
}
32+
33+
private def getRouting(strings: Array[String]): String = {
34+
if (strings.length < 1) return "anonymous.info"
35+
strings(0)
36+
}
37+
38+
private def getMessage(strings: Array[String]): String = {
39+
if (strings.length < 2) return "Hello World!"
40+
joinStrings(strings, " ", 1)
41+
}
42+
43+
private def joinStrings(strings: Array[String], delimiter: String, startIndex: Int): String = {
44+
val length = strings.length
45+
if (length == 0) return ""
46+
if (length < startIndex) return ""
47+
val words = new StringBuilder(strings(startIndex))
48+
for (i <- startIndex + 1 until length) {
49+
words.append(delimiter).append(strings(i))
50+
}
51+
words.toString
52+
}
53+
}

scala/src/main/scala/NewTask.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
2+
import com.rabbitmq.client.{ConnectionFactory, MessageProperties}
3+
4+
object NewTask {
5+
6+
private val TASK_QUEUE_NAME = "task_queue"
7+
8+
def main(argv: Array[String]) {
9+
val factory = new ConnectionFactory()
10+
factory.setHost("localhost")
11+
val connection = factory.newConnection()
12+
val channel = connection.createChannel()
13+
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null)
14+
val message = getMessage(argv)
15+
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
16+
println(" [x] Sent '" + message + "'")
17+
channel.close()
18+
connection.close()
19+
}
20+
21+
private def getMessage(strings: Array[String]): String = {
22+
if (strings.length < 1) return "Hello World!"
23+
joinStrings(strings, " ")
24+
}
25+
26+
private def joinStrings(strings: Array[String], delimiter: String): String = {
27+
val length = strings.length
28+
if (length == 0) return ""
29+
val words = new StringBuilder(strings(0))
30+
for (i <- 1 until length) {
31+
words.append(delimiter).append(strings(i))
32+
}
33+
words.toString
34+
}
35+
}

0 commit comments

Comments
 (0)