Skip to content

Commit

Permalink
Issue #2282 - removing EventQueue from websocket tests
Browse files Browse the repository at this point in the history
  • Loading branch information
joakime committed Mar 5, 2018
1 parent 076f3a8 commit 1373025
Show file tree
Hide file tree
Showing 59 changed files with 709 additions and 721 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.websocket.ClientEndpoint;
Expand All @@ -36,14 +37,14 @@
import javax.websocket.OnMessage;
import javax.websocket.WebSocketContainer;

import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -90,7 +91,7 @@ public EventId decode(Reader reader) throws DecodeException, IOException
@ClientEndpoint(decoders = { EventIdDecoder.class })
public static class EventIdSocket
{
public EventQueue<EventId> messageQueue = new EventQueue<>();
public LinkedBlockingQueue<EventId> messageQueue = new LinkedBlockingQueue<>();
private CountDownLatch closeLatch = new CountDownLatch(1);

@OnClose
Expand All @@ -102,7 +103,7 @@ public void onClose(CloseReason close)
@OnMessage
public void onMessage(EventId msg)
{
messageQueue.add(msg);
messageQueue.offer(msg);
}

public void awaitClose() throws InterruptedException
Expand Down Expand Up @@ -208,12 +209,12 @@ public void testManyIds() throws Exception
idserver.writeSequentialIds(from,to);
idserver.close();
int count = from - to;
ids.messageQueue.awaitEventCount(count,4,TimeUnit.SECONDS);
ids.awaitClose();
// collect seen ids
List<Integer> seen = new ArrayList<>();
for(EventId id: ids.messageQueue)
for(int i=0; i<count; i++)
{
EventId id = ids.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
// validate that ids don't repeat.
Assert.assertFalse("Already saw ID: " + id.eventId, seen.contains(id.eventId));
seen.add(id.eventId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.eclipse.jetty.websocket.jsr356;

import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;

import java.io.BufferedReader;
import java.io.File;
Expand All @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.websocket.ClientEndpoint;
Expand All @@ -40,7 +41,6 @@
import javax.websocket.OnMessage;
import javax.websocket.WebSocketContainer;

import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.BufferUtil;
Expand All @@ -52,6 +52,7 @@
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -126,7 +127,7 @@ public Quotes decode(Reader reader) throws DecodeException, IOException
@ClientEndpoint(decoders = { QuotesDecoder.class })
public static class QuotesSocket
{
public EventQueue<Quotes> messageQueue = new EventQueue<>();
public LinkedBlockingQueue<Quotes> messageQueue = new LinkedBlockingQueue<>();
private CountDownLatch closeLatch = new CountDownLatch(1);

@OnClose
Expand All @@ -139,7 +140,7 @@ public void onClose(CloseReason close)
public synchronized void onMessage(Quotes msg)
{
Integer h=hashCode();
messageQueue.add(msg);
messageQueue.offer(msg);
System.out.printf("%x: Quotes from: %s%n",h,msg.author);
for (String quote : msg.quotes)
{
Expand Down Expand Up @@ -259,7 +260,7 @@ public void stopServer() throws Exception
server.stop();
}

// TODO analyse and fix
// TODO analyse and fix
@Ignore
@Test
public void testSingleQuotes() throws Exception
Expand All @@ -270,15 +271,14 @@ public void testSingleQuotes() throws Exception
client.connectToServer(quoter,server.getWsUri());
qserver.awaitConnect();
qserver.writeQuotes("quotes-ben.txt");
quoter.messageQueue.awaitEventCount(1,1000,TimeUnit.MILLISECONDS);
qserver.close();
quoter.awaitClose();
Quotes quotes = quoter.messageQueue.poll();
Quotes quotes = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Quotes Author",quotes.author,is("Benjamin Franklin"));
Assert.assertThat("Quotes Count",quotes.quotes.size(),is(3));
}

// TODO analyse and fix
// TODO analyse and fix
@Test
@Ignore ("Quotes appear to be able to arrive in any order?")
public void testTwoQuotes() throws Exception
Expand All @@ -290,11 +290,12 @@ public void testTwoQuotes() throws Exception
qserver.awaitConnect();
qserver.writeQuotes("quotes-ben.txt");
qserver.writeQuotes("quotes-twain.txt");
quoter.messageQueue.awaitEventCount(2,1000,TimeUnit.MILLISECONDS);
qserver.close();
quoter.awaitClose();
Quotes quotes = quoter.messageQueue.poll();
Quotes quotes = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Quotes Author",quotes.author,is("Benjamin Franklin"));
Assert.assertThat("Quotes Count",quotes.quotes.size(),is(3));
quotes = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Quotes Author",quotes.author,is("Mark Twain"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

import javax.websocket.ClientEndpointConfig;
import javax.websocket.ContainerProvider;
Expand All @@ -39,14 +40,14 @@
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -55,9 +56,8 @@

public class EncoderTest
{
private static class EchoServer implements Runnable
private static class EchoServer
{
private Thread thread;
private BlockheadServer server;
private IBlockheadServerConnection sconnection;
private CountDownLatch connectLatch = new CountDownLatch(1);
Expand All @@ -67,37 +67,32 @@ public EchoServer(BlockheadServer server)
this.server = server;
}

@Override
public void run()
{
try
{
sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
sconnection.startEcho();
}
catch (Exception e)
{
LOG.warn(e);
}
finally
{
connectLatch.countDown();
}
}

public void start()
{
this.thread = new Thread(this,"EchoServer");
this.thread.start();
CompletableFuture.runAsync(() -> {
try
{
sconnection = server.accept();
sconnection.setSoTimeout(10000);
sconnection.upgrade();
sconnection.enableIncomingEcho(true);
sconnection.startReadThread();
}
catch (Exception e)
{
LOG.warn(e);
}
finally
{
connectLatch.countDown();
}
});
}

public void stop()
{
if (this.sconnection != null)
{
this.sconnection.stopEcho();
try
{
this.sconnection.close();
Expand Down Expand Up @@ -166,12 +161,12 @@ public void init(EndpointConfig config)
public static class QuotesSocket extends Endpoint implements MessageHandler.Whole<String>
{
private Session session;
private EventQueue<String> messageQueue = new EventQueue<>();
private LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();

@Override
public void onMessage(String message)
{
messageQueue.add(message);
messageQueue.offer(message);
}

@Override
Expand Down Expand Up @@ -236,6 +231,7 @@ private Quotes getQuotes(String filename) throws IOException
public void initClient()
{
client = ContainerProvider.getWebSocketContainer();
client.setDefaultMaxSessionIdleTimeout(10000);
}

@After
Expand All @@ -257,7 +253,7 @@ public void stopServer() throws Exception
server.stop();
}

@Test
@Test(timeout = 10000)
public void testSingleQuotes() throws Exception
{
EchoServer eserver = new EchoServer(server);
Expand All @@ -277,9 +273,7 @@ public void testSingleQuotes() throws Exception
Quotes ben = getQuotes("quotes-ben.txt");
quoter.write(ben);

quoter.messageQueue.awaitEventCount(1,1000,TimeUnit.MILLISECONDS);

String result = quoter.messageQueue.poll();
String result = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertReceivedQuotes(result,ben);
}
finally
Expand All @@ -288,7 +282,7 @@ public void testSingleQuotes() throws Exception
}
}

@Test
@Test(timeout = 10000)
public void testTwoQuotes() throws Exception
{
EchoServer eserver = new EchoServer(server);
Expand All @@ -309,11 +303,9 @@ public void testTwoQuotes() throws Exception
quoter.write(ben);
quoter.write(twain);

quoter.messageQueue.awaitEventCount(2,1000,TimeUnit.MILLISECONDS);

String result = quoter.messageQueue.poll();
String result = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertReceivedQuotes(result,ben);
result = quoter.messageQueue.poll();
result = quoter.messageQueue.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
assertReceivedQuotes(result,twain);
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;

import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
import org.eclipse.jetty.websocket.jsr356.annotations.AnnotatedEndpointScanner;
import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
Expand Down Expand Up @@ -118,9 +119,8 @@ public void testOnCloseCall() throws Exception
driver.onClose(new CloseInfo(StatusCode.NORMAL,"normal"));

// Test captured event
EventQueue<String> events = endpoint.eventQueue;
Assert.assertThat("Number of Events Captured",events.size(),is(1));
String closeEvent = events.poll();
LinkedBlockingQueue<String> events = endpoint.eventQueue;
String closeEvent = events.poll(Timeouts.POLL_EVENT, Timeouts.POLL_EVENT_UNIT);
Assert.assertThat("Close Event",closeEvent,is(testcase.expectedCloseEvent));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import static org.hamcrest.Matchers.notNullValue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCode;

import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.Assert;
Expand All @@ -41,21 +41,21 @@ public abstract class TrackingSocket
private static final Logger LOG = Log.getLogger(TrackingSocket.class);

public CloseReason closeReason;
public EventQueue<String> eventQueue = new EventQueue<String>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
public LinkedBlockingQueue<Throwable> errorQueue = new LinkedBlockingQueue<>();
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch dataLatch = new CountDownLatch(1);

protected void addError(Throwable t)
{
LOG.warn(t);
errorQueue.add(t);
errorQueue.offer(t);
}

protected void addEvent(String format, Object... args)
{
eventQueue.add(String.format(format,args));
eventQueue.offer(String.format(format,args));
}

public void assertClose(CloseCode expectedCode, String expectedReason) throws InterruptedException
Expand All @@ -76,12 +76,6 @@ private void assertCloseReason(String expectedReason)
Assert.assertThat("Close Reason",closeReason.getReasonPhrase(),is(expectedReason));
}

public void assertEvent(String expected)
{
String actual = eventQueue.poll();
Assert.assertEquals("Event",expected,actual);
}

public void assertIsOpen() throws InterruptedException
{
assertWasOpened();
Expand Down
Loading

0 comments on commit 1373025

Please sign in to comment.