This repository has been archived by the owner on Oct 4, 2018. It is now read-only.
/
TextConsumerTest.java
121 lines (95 loc) · 3.5 KB
/
TextConsumerTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package uk.co.fredemmott.jp.consumers;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import uk.co.fredemmott.jp.ClientFactory;
import uk.co.fredemmott.jp.Consumer;
import uk.co.fredemmott.jp.EmptyPool;
import uk.co.fredemmott.jp.Job;
import uk.co.fredemmott.jp.JobPool;
import uk.co.fredemmott.jp.NoSuchPool;
@RunWith(MockitoJUnitRunner.class)
public class TextConsumerTest {
private static final String TEST_STRING = "Test String";
private final static Charset utf8 = Charset.forName("UTF8");
private static final String ID_ONE = "id_one";
private static final String ID_TWO = "id_two";
private static final String MESSAGE_ONE = "string_one";
private static final String MESSAGE_TWO = "string_two";
protected static final String HOSTNAME = "localhost";
protected static final int PORT = 1234;
private static final String POOL = "test_pool";
@Mock private JobPool.Iface mockClient;
@Mock private ClientFactory mockFactory;
/**
* LoggingTextConsumer will log message it consumer and count them.
*
*/
private static class LoggingTextConsumer extends TextConsumer {
int count = 0;
List<String> messages = new ArrayList<String>();
public LoggingTextConsumer(String hostname, int port, String pool) throws TTransportException {
super(hostname, port, pool);
}
@Override
public boolean consume(String message) {
count++;
messages.add(message);
return true;
}
}
@Before
public void setUp() throws Exception {
ClientFactory.setInstance(mockFactory);
}
@After
public void tearDown() {
ClientFactory.setInstance(null);
}
@Test
public void testTextConsumer() throws TTransportException {
@SuppressWarnings("unused")
Consumer<String> consumer = new LoggingTextConsumer(HOSTNAME, PORT, POOL);
verify(mockFactory).createClient(HOSTNAME, PORT);
}
@Test
public void testDeserialise() throws TTransportException {
Consumer<String> consumer = new LoggingTextConsumer(HOSTNAME, PORT, POOL);
ByteBuffer bytes = utf8.encode(TEST_STRING);
String test = consumer.deserialise(bytes);
assertEquals(TEST_STRING, test);
}
@Test
public void testRun_OneItem() throws NoSuchPool, EmptyPool, TException, InterruptedException {
when(mockFactory.createClient(HOSTNAME, PORT)).thenReturn(mockClient);
// Set message on the queue
when(mockClient.acquire(POOL))
.thenReturn(new Job(utf8.encode(MESSAGE_ONE), utf8.encode(ID_ONE)))
.thenReturn(new Job(utf8.encode(MESSAGE_TWO), utf8.encode(ID_TWO)))
.thenThrow(new EmptyPool());
LoggingTextConsumer consumer = new LoggingTextConsumer(HOSTNAME, PORT, POOL);
// Run as background thread then tell to stop after processing documents.
Thread t = new Thread(consumer);
t.start();
Thread.sleep(10); // Could in case this if it starts to be a problem.
consumer.stop();
// Check it gets messages
assertEquals(2, consumer.count);
assertEquals(Arrays.asList(MESSAGE_ONE, MESSAGE_TWO), consumer.messages);
verify(mockClient).purge(POOL, utf8.encode(ID_ONE));
verify(mockClient).purge(POOL, utf8.encode(ID_TWO));
}
}