Skip to content

Commit

Permalink
INTSAMPLES-137: Add WebSockets sample
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Sep 10, 2014
1 parent 8de0f77 commit a15aed1
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 1 deletion.
27 changes: 27 additions & 0 deletions basic/web-sockets/README.md
@@ -0,0 +1,27 @@
WebSockets Sample
==============

This example demonstrates Standard WebSocket protocol (without any sub-protocols) with Spring Integration Adapters.
It just sends current time from the server to all connected clients.

## Server

The server is presented only with single `org.springframework.integration.samples.websocket.standard.server.Application`
class, which is based on the Spring Boot AutoConfiguration and Spring Integration Java & Annotation configuration.
It is a `main` and starts embedded Tomcat on default `8080` port. The WebSocket endpoint is mapped to the `/time` path.

## Java Client

The `org.springframework.integration.samples.websocket.standard.client.Application` represents simple Java application,
which starts an integration flow (`client-context.xml`), connects to the WebSocket server and prints `Message`s to the
logs, which are received over WebSocket.

## Browser Client

The `index.html` in the `resource` directory of this project demonstrates a JavaScript `SockJS` client, which connects
to our server and just prints its messages in the middle of page.

## Test Case

The `org.springframework.integration.samples.websocket.standard.ApplicationTests` demonstrates the Spring Boot test
framework and starts Server & Client to check, that the last one receives correct data.
@@ -0,0 +1,34 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.samples.websocket.standard.client;

import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
* @author Artem Bilan
* @since 3.0
*/
public class Application {

public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx = new ClassPathXmlApplicationContext("client-context.xml", Application.class);
System.in.read();
ctx.close();
}

}
@@ -0,0 +1,141 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.samples.websocket.standard.server;

import java.text.DateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.Executors;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.splitter.DefaultMessageSplitter;
import org.springframework.integration.transformer.AbstractPayloadTransformer;
import org.springframework.integration.transformer.HeaderEnricher;
import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
import org.springframework.integration.websocket.ServerWebSocketContainer;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.support.GenericMessage;

/**
* @author Artem Bilan
* @since 3.0
*/
@Configuration
@EnableAutoConfiguration
public class Application {

public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx = SpringApplication.run(Application.class);
System.in.read();
ctx.close();
}

@Bean
public ServerWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/time").withSockJs();
}

@Bean
@InboundChannelAdapter(value = "splitChannel", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<?> webSocketSessionsMessageSource() {
return new MessageSource<Iterator<String>>() {

@Override
public Message<Iterator<String>> receive() {
return new GenericMessage<Iterator<String>>(serverWebSocketContainer().getSessions().keySet().iterator());
}

};
}

@Bean
public MessageChannel splitChannel() {
return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "splitChannel")
public MessageHandler splitter() {
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
splitter.setOutputChannelName("headerEnricherChannel");
return splitter;
}

@Bean
public MessageChannel headerEnricherChannel() {
return new ExecutorChannel(Executors.newCachedThreadPool());
}

@Bean
@Transformer(inputChannel = "headerEnricherChannel", outputChannel = "transformChannel")
public HeaderEnricher headerEnricher() {
return new HeaderEnricher(Collections.singletonMap(SimpMessageHeaderAccessor.SESSION_ID_HEADER,
new ExpressionEvaluatingHeaderValueMessageProcessor<Object>("payload", null)));
}

@Bean
@Transformer(inputChannel = "transformChannel", outputChannel = "sendTimeChannel")
public AbstractPayloadTransformer<?, ?> transformer() {
return new AbstractPayloadTransformer<Object, Object>() {
@Override
protected Object transformPayload(Object payload) throws Exception {
return DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.DEFAULT).format(new Date());
}

};
}


@Bean
public MessageChannel sendTimeChannel() {
return new PublishSubscribeChannel();
}


@Bean
@ServiceActivator(inputChannel = "sendTimeChannel")
public MessageHandler webSocketOutboundAdapter() {
return new WebSocketOutboundMessageHandler(serverWebSocketContainer());
}

@Bean
@ServiceActivator(inputChannel = "sendTimeChannel")
public MessageHandler loggingChannelAdapter() {
LoggingHandler loggingHandler = new LoggingHandler("info");
loggingHandler.setExpression("'The time ' + payload + ' has been sent to the WebSocketSession ' + headers.simpSessionId");
return loggingHandler;
}

}
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
http://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">

<bean id="webSocketClient" class="org.springframework.web.socket.client.standard.StandardWebSocketClient"/>

<int-websocket:client-container id="clientWebSocketContainer"
client="webSocketClient"
uri="ws://localhost:8080/time/websocket"/>

<int-websocket:inbound-channel-adapter container="clientWebSocketContainer"
channel="webSocketInputChannel"/>

<int:logging-channel-adapter id="webSocketInputChannel"/>

</beans>
34 changes: 34 additions & 0 deletions basic/web-sockets/src/main/resources/static/index.html
@@ -0,0 +1,34 @@
<!DOCTYPE html>
<html>
<head>
<title>Time over WebSocket</title>
<script src="http://cdn.sockjs.org/sockjs-0.3.min.js"></script>
<script type="text/javascript">

var sock = new SockJS('http://localhost:8080/time');
sock.onopen = function () {
document.getElementById('time').innerHTML = 'Connecting...';
};
sock.onmessage = function (e) {
document.getElementById('time').innerHTML = e.data;
};
sock.onclose = function () {
document.getElementById('time').innerHTML = "Server closed connection or hasn't been started";
};
</script>
</head>

<body style="margin: 0">
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript!
WebSocket relies on Javascript being enabled. Please enable Javascript and reload this page!</h2></noscript>
<div id="time"
style="position: absolute;
bottom: 0;
font-size: 800%;
height: 200px;
margin: auto;
text-align: center;
top: 0;
width: 100%;">Starting...</div>
</body>
</html>
@@ -0,0 +1,85 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.samples.websocket.standard;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.text.DateFormat;
import java.text.ParseException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.boot.test.IntegrationTest;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.samples.websocket.standard.server.Application;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;

/**
* @author Artem Bilan
* @since 3.0
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
@WebAppConfiguration
@IntegrationTest
public class ApplicationTests {

@Test
public void testWebSockets() throws InterruptedException {
ConfigurableApplicationContext ctx = new ClassPathXmlApplicationContext("client-context.xml",
org.springframework.integration.samples.websocket.standard.client.Application.class);
DirectChannel webSocketInputChannel = ctx.getBean("webSocketInputChannel", DirectChannel.class);

final CountDownLatch stopLatch = new CountDownLatch(2);

webSocketInputChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
Object payload = message.getPayload();
assertThat(payload, instanceOf(String.class));
Date date = null;
try {
date = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.DEFAULT).parse((String) payload);
}
catch (ParseException e) {
fail("fail to parse date");
}
assertThat(new Date().compareTo(date), greaterThanOrEqualTo(0));
stopLatch.countDown();
}

});
assertTrue(stopLatch.await(10, TimeUnit.SECONDS));
ctx.close();
}

}
18 changes: 17 additions & 1 deletion build.gradle
Expand Up @@ -139,6 +139,7 @@ subprojects { subproject ->

repositories {
//TODO on release
// mavenLocal()
maven { url 'http://repo.spring.io/libs-snapshot' }
maven { url 'http://repo.spring.io/libs-milestone' }
// maven { url 'http://repo.spring.io/libs-staging-local' }
Expand Down Expand Up @@ -192,7 +193,7 @@ subprojects { subproject ->
subethasmtpVersion = '1.2'
slf4jVersion = '1.7.6'
springIntegrationVersion = '4.0.4.RELEASE'
springIntegration41Version = '4.1.0.M1'
springIntegration41Version = '4.1.0.BUILD-SNAPSHOT'
springIntegrationDslVersion = '1.0.0.M3'
springVersion = '4.0.7.RELEASE'
springSecurityVersion = '3.2.4.RELEASE'
Expand Down Expand Up @@ -1085,6 +1086,21 @@ project('tx-synch') {
}
}

project('web-sockets') {
description = 'Web Sockets Basic Sample'

apply plugin: 'spring-boot'

dependencies {
compile 'org.springframework.boot:spring-boot-starter-websocket'
compile "org.springframework.integration:spring-integration-websocket:$springIntegration41Version"

testCompile 'org.springframework.boot:spring-boot-starter-test'
}

mainClassName = 'org.springframework.integration.samples.websocket.standard.server.Application'
}

task wrapper(type: Wrapper) {
description = 'Generates gradlew[.bat] scripts'
gradleVersion = '1.12'
Expand Down

0 comments on commit a15aed1

Please sign in to comment.