From 4423825f40993d4c5ec8460925e9f151cb8983da Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 19 Nov 2018 12:34:57 -0500 Subject: [PATCH] GH-234: Add TCP Broadcast Sample Resolves https://github.com/spring-projects/spring-integration-samples/issues/234 --- .gitignore | 2 + basic/tcp-broadcast/README.adoc | 24 ++ basic/tcp-broadcast/pom.xml | 205 ++++++++++++++++++ .../tcpbroadcast/TcpBroadcastApplication.java | 200 +++++++++++++++++ .../src/main/resources/application.properties | 1 + build.gradle | 23 ++ 6 files changed, 455 insertions(+) create mode 100644 basic/tcp-broadcast/README.adoc create mode 100644 basic/tcp-broadcast/pom.xml create mode 100644 basic/tcp-broadcast/src/main/java/org/springframework/integration/samples/tcpbroadcast/TcpBroadcastApplication.java create mode 100644 basic/tcp-broadcast/src/main/resources/application.properties diff --git a/.gitignore b/.gitignore index 60bc22573..bc4753daa 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,5 @@ activemq-data /.gradle build/ /classes +.mvn +mvnw* diff --git a/basic/tcp-broadcast/README.adoc b/basic/tcp-broadcast/README.adoc new file mode 100644 index 000000000..77d525575 --- /dev/null +++ b/basic/tcp-broadcast/README.adoc @@ -0,0 +1,24 @@ +== TCP Sample + +This sample demonstrates broadcasting a message received by a web controller to all connected TCP clients. + +`curl -X POST http://localhost:8080/broadcast/foo` will send 'foo' to 5 connected clients. + +The TCP server listens on port 1234. + + +`2018-11-19 12:01:48.546 INFO 98411 --- [ main] com.example.TcpBroadcastApplication : Started TcpBroadcastApplication in 1.904 seconds (JVM running for 2.345)` +`connected! from client# 5` +`connected! from client# 4` +`connected! from client# 3` +`connected! from client# 2` +`connected! from client# 1` + +`curl -X POST http://localhost:8080/broadcast/foo` + +`foo from client# 4` +`foo from client# 5` +`foo from client# 2` +`foo from client# 1` +`foo from client# 3` + diff --git a/basic/tcp-broadcast/pom.xml b/basic/tcp-broadcast/pom.xml new file mode 100644 index 000000000..bb1eb5c89 --- /dev/null +++ b/basic/tcp-broadcast/pom.xml @@ -0,0 +1,205 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.1.0.RELEASE + + org.springframework.integration.samples + tcp-broadcast + 5.1.1.BUILD-SNAPSHOT + TCP Client Broadcast Sample + TCP Client Broadcast Sample + http://projects.spring.io/spring-integration + + SpringIO + https://spring.io + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + garyrussell + Gary Russell + grussell@pivotal.io + + project lead + + + + markfisher + Mark Fisher + mfisher@pivotal.io + + project founder and lead emeritus + + + + ghillert + Gunnar Hillert + ghillert@pivotal.io + + + abilan + Artem Bilan + abilan@pivotal.io + + + + scm:git:scm:git:git://github.com/spring-projects/spring-integration-samples.git + scm:git:scm:git:ssh://git@github.com:spring-projects/spring-integration-samples.git + https://github.com/spring-projects/spring-integration-samples + + + + org.springframework.boot + spring-boot-starter-web + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.boot + spring-boot-starter-integration + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.integration + spring-integration-ip + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + junit + junit + 4.12 + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + * + org.hamcrest + + + + + org.hamcrest + hamcrest-all + 1.3 + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.mockito + mockito-core + 2.18.0 + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + * + org.hamcrest + + + + + org.springframework + spring-test + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.boot + spring-boot-starter-test + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + + + repo.spring.io.milestone + Spring Framework Maven Milestone Repository + https://repo.spring.io/libs-milestone + + + repo.spring.io.snapshot + Spring Framework Maven Snapshot Repository + https://repo.spring.io/libs-snapshot + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + org.springframework.boot + spring-boot-dependencies + 2.1.0.RELEASE + import + pom + + + org.springframework + spring-framework-bom + 5.1.2.RELEASE + import + pom + + + org.springframework.integration + spring-integration-bom + 5.1.0.RELEASE + import + pom + + + + diff --git a/basic/tcp-broadcast/src/main/java/org/springframework/integration/samples/tcpbroadcast/TcpBroadcastApplication.java b/basic/tcp-broadcast/src/main/java/org/springframework/integration/samples/tcpbroadcast/TcpBroadcastApplication.java new file mode 100644 index 000000000..573269733 --- /dev/null +++ b/basic/tcp-broadcast/src/main/java/org/springframework/integration/samples/tcpbroadcast/TcpBroadcastApplication.java @@ -0,0 +1,200 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.tcpbroadcast; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.util.stream.IntStream; + +import javax.net.SocketFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; +import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.ip.IpHeaders; +import org.springframework.integration.ip.dsl.Tcp; +import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +@SpringBootApplication +public class TcpBroadcastApplication { + + private static final int PORT = 1234; + + @Configuration + public static class Config { + + /* + * Server connection factory. + */ + @Bean + public AbstractServerConnectionFactory serverFactory() { + return Tcp.netServer(PORT).get(); + } + + /* + * Inbound adapter - sends "connected!". + */ + @Bean + public IntegrationFlow tcpServer(AbstractServerConnectionFactory serverFactory) { + return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory)) + .transform(p -> "connected!") + .channel("toTcp.input") + .get(); + } + + /* + * Gateway flow for controller. + */ + @Bean + public IntegrationFlow gateway() { + return IntegrationFlows.from(Sender.class) + .channel("toTcp.input") + .get(); + } + + /* + * Outbound channel adapter flow. + */ + @Bean + public IntegrationFlow toTcp(AbstractServerConnectionFactory serverFactory) { + return f -> f.handle(Tcp.outboundAdapter(serverFactory)); + } + + /* + * Excutor for clients. + */ + @Bean + public ThreadPoolTaskExecutor exec() { + ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor(); + exec.setCorePoolSize(5); + return exec; + } + + /* + * Start 5 clients. + */ + @Bean + public ApplicationRunner runner(TaskExecutor exec, Broadcaster caster) { + return args -> { + IntStream.range(1, 6).forEach(i -> exec.execute(new Client())); + }; + } + + } + + /* + * Sender gateway sets the connection id header. + */ + public interface Sender { + + void send(String payload, @Header(IpHeaders.CONNECTION_ID) String connectionId); + + } + + @RestController + public static class Controller { + + @Autowired + private Broadcaster broadcaster; + + @Autowired + private ConfigurableApplicationContext applicationContext; + + @PostMapping("/broadcast/{what}") + public String broadcast(@PathVariable String what) { + this.broadcaster.send(what); + return "sent: " + what; + } + + @GetMapping("/shutdown") + public void shutDown() { + this.applicationContext.close(); + } + } + + @Component + @DependsOn("gateway") // Needed to ensure the gateway flow bean is created first + public static class Broadcaster { + + @Autowired + private Sender sender; + + @Autowired + private AbstractServerConnectionFactory server; + + public void send(String what) { + this.server.getOpenConnectionIds().forEach(cid -> sender.send(what, cid)); + } + + } + + public static class Client implements Runnable { + + private static final ByteArrayCrLfSerializer deserializer = new ByteArrayCrLfSerializer(); + + private static int next; + + private final int instance = ++next; + + @Override + public void run() { + Socket socket = null; + try { + socket = SocketFactory.getDefault().createSocket("localhost", PORT); + socket.getOutputStream().write("hello\r\n".getBytes()); + InputStream is = socket.getInputStream(); + while(true) { + System.out.println(new String(deserializer.deserialize(is)) + " from client# " + instance); + } + } + catch (IOException e) { + } + finally { + if (socket != null) { + try { + socket.close(); + } + catch (IOException e) { + } + } + } + } + + } + + public static void main(String[] args) { + SpringApplication.run(TcpBroadcastApplication.class, args); + } + +} diff --git a/basic/tcp-broadcast/src/main/resources/application.properties b/basic/tcp-broadcast/src/main/resources/application.properties new file mode 100644 index 000000000..6459d4a4f --- /dev/null +++ b/basic/tcp-broadcast/src/main/resources/application.properties @@ -0,0 +1 @@ +#logging.level.org.springframework.integration=debug diff --git a/build.gradle b/build.gradle index 929e5fe83..e082639e6 100644 --- a/build.gradle +++ b/build.gradle @@ -902,6 +902,29 @@ project('tcp-amqp') { } } +project('tcp-broadcast') { + description = 'TCP Client Broadcast Sample' + + apply plugin: 'org.springframework.boot' + + dependencies { + compile 'org.springframework.boot:spring-boot-starter-web' + compile 'org.springframework.boot:spring-boot-starter-integration' + compile "org.springframework.integration:spring-integration-ip" + + testCompile 'org.springframework.boot:spring-boot-starter-test' + } + + bootRun { + main = 'org.springframework.integration.samples.tcpbroadcast.Application' + } + + task run(type: JavaExec) { + main 'org.springframework.integration.samples.tcpbroadcast.Application' + classpath = sourceSets.main.runtimeClasspath + } +} + project('tcp-client-server') { description = 'TCP Client Server Sample'