Skip to content

Commit

Permalink
AsyncSuroClient fixed with event Listener
Browse files Browse the repository at this point in the history
  • Loading branch information
metacret committed Nov 11, 2014
1 parent 0ad5bdd commit 8e593d1
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.suro.client;

import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
Expand All @@ -24,6 +25,7 @@
import com.netflix.servo.monitor.Monitors;
import com.netflix.suro.ClientConfig;
import com.netflix.suro.TagKey;
import com.netflix.suro.client.async.AsyncSuroClient;
import com.netflix.suro.connection.ConnectionPool;
import com.netflix.suro.message.Compression;
import com.netflix.suro.message.Message;
Expand All @@ -35,6 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -58,13 +61,12 @@ public SyncSuroClient(ClientConfig config, ConnectionPool connectionPool) {
Monitors.registerObject(this);
}

@Monitor(name = TagKey.SENT_COUNT, type = DataSourceType.COUNTER)
private AtomicLong sentMessageCount = new AtomicLong(0);
@Override
public long getSentMessageCount() {
return sentMessageCount.get();
}
@Monitor(name = TagKey.LOST_COUNT, type = DataSourceType.COUNTER)

private AtomicLong lostMessageCount = new AtomicLong(0);
@Override
public long getLostMessageCount() {
Expand All @@ -76,11 +78,11 @@ public long getNumOfPendingMessages() {
return 0;
}

@Monitor(name = TagKey.RETRIED_COUNT, type = DataSourceType.COUNTER)
private AtomicLong retriedCount = new AtomicLong(0);
public long getRetriedCount() {
return retriedCount.get();
}

@Monitor(name = "senderExceptionCount", type = DataSourceType.COUNTER)
private AtomicLong senderExceptionCount = new AtomicLong(0);

Expand All @@ -91,6 +93,8 @@ public void send(Message message) {
.withMessage(message.getRoutingKey(), message.getPayload()).build());
}

private List<AsyncSuroClient.Listener> emptyList = Lists.newArrayList();

public boolean send(TMessageSet messageSet) {
if (messageSet == null) {
return false;
Expand Down Expand Up @@ -124,19 +128,19 @@ public boolean send(TMessageSet messageSet) {

MessageSetReader reader = new MessageSetReader(messageSet);
if (sent) {
sentMessageCount.addAndGet(incrementMessageCount(TagKey.SENT_COUNT, config.getApp(), reader));
sentMessageCount.addAndGet(incrementMessageCount(TagKey.SENT_COUNT, config.getApp(), reader, emptyList));
if (retried) {
retriedCount.incrementAndGet();
}

} else {
lostMessageCount.addAndGet(incrementMessageCount(TagKey.LOST_COUNT, config.getApp(), reader));
lostMessageCount.addAndGet(incrementMessageCount(TagKey.LOST_COUNT, config.getApp(), reader, emptyList));
}

return sent;
}

public static int incrementMessageCount(String counterName, String app, Iterable<Message> messages) {
public static int incrementMessageCount(String counterName, String app, Iterable<Message> messages, List<AsyncSuroClient.Listener> listeners) {
int count = 0;
for (Message message : messages) {
DynamicCounter.increment(
Expand All @@ -147,6 +151,10 @@ public static int incrementMessageCount(String counterName, String app, Iterable
++count;
}

for (AsyncSuroClient.Listener listener : listeners) {
listener.sentCallback(count);
}

return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.slf4j.LoggerFactory;

import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -70,7 +71,6 @@ public long getNumOfPendingMessages() {
return messageQueue.size();
}

@Monitor(name = TagKey.SENT_COUNT, type = DataSourceType.COUNTER)
private AtomicLong sentMessages = new AtomicLong(0);
@Override
public long getSentMessageCount() {
Expand Down Expand Up @@ -154,6 +154,9 @@ public void send(Message message) {
.withTag(TagKey.APP, config.getApp())
.withTag(TagKey.DATA_SOURCE, message.getRoutingKey())
.build());
for (Listener listener : listeners) {
listener.lostCallback(1);
}
}
}

Expand All @@ -164,6 +167,9 @@ public void restore(Message message) {
.withTag(TagKey.APP, config.getApp())
.withTag(TagKey.DATA_SOURCE, message.getRoutingKey())
.build());
for (Listener listener : listeners) {
listener.restoredCallback();
}
send(message);
}

Expand Down Expand Up @@ -250,9 +256,13 @@ public void updateSentDataStats(TMessageSet messageSet, boolean retried) {
SyncSuroClient.incrementMessageCount(
TagKey.SENT_COUNT,
config.getApp(),
new MessageSetReader(messageSet)));
new MessageSetReader(messageSet),
listeners));
if (retried) {
retriedCount.incrementAndGet();
for (Listener listener : listeners) {
listener.retriedCallback();
}
}
}

Expand All @@ -265,4 +275,16 @@ public ConnectionPool getConnectionPool() {
public void updateSenderException() {
senderExceptionCount.incrementAndGet();
}

public static interface Listener {
void sentCallback(int count);
void restoredCallback();
void lostCallback(int count);
void retriedCallback();
}

private List<Listener> listeners = new CopyOnWriteArrayList<>();
public void addListener(Listener listener) {
listeners.add(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@

import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class TestAsyncSuroClient {
@Rule
Expand Down Expand Up @@ -97,14 +99,15 @@ public void testMemory() throws Exception {

AsyncSuroClient client = injector.getInstance(AsyncSuroClient.class);

for (int i = 0; i < 3000; ++i) {
int messageCount = 10;
for (int i = 0; i < messageCount; ++i) {
client.send(new Message("routingKey", "testMessage".getBytes()));
}

client.shutdown();
TestConnectionPool.checkMessageCount(servers, 3000);
TestConnectionPool.checkMessageCount(servers, messageCount);

assertEquals(client.getSentMessageCount(), 3000);
assertEquals(client.getSentMessageCount(), messageCount);
}

@Test
Expand All @@ -113,79 +116,78 @@ public void testFile() throws Exception {

AsyncSuroClient client = injector.getInstance(AsyncSuroClient.class);

for (int i = 0; i < 3000; ++i) {
int messageCount = 10;
for (int i = 0; i < messageCount; ++i) {
client.send(new Message("routingKey", "testMessage".getBytes()));
}

client.shutdown();
TestConnectionPool.checkMessageCount(servers, 3000);
TestConnectionPool.checkMessageCount(servers, messageCount);

assertEquals(client.getSentMessageCount(), 3000);
assertEquals(client.getSentMessageCount(), messageCount);
}

@Test
public void testRestore() throws Exception {
Properties props = new Properties();
props.setProperty(ClientConfig.ASYNC_JOBQUEUE_CAPACITY, "3");
props.setProperty(ClientConfig.RETRY_COUNT, "1");
props.setProperty(ClientConfig.ASYNC_TIMEOUT, "1");
setupFile(props);

int messageCount = 30;
AsyncSuroClient client = injector.getInstance(AsyncSuroClient.class);

final CountDownLatch restoreLatch = new CountDownLatch(messageCount / 10);
final CountDownLatch sentLatch = new CountDownLatch(messageCount);
client.addListener(new AsyncSuroClient.Listener() {
@Override
public void sentCallback(int count) {
for (int i = 0; i < count; ++i) {
sentLatch.countDown();
}
}

@Override
public void restoredCallback() {
restoreLatch.countDown();
}

@Override
public void lostCallback(int count) {
fail("should not be lost");
}

@Override
public void retriedCallback() {

}
});

for (SuroServer4Test c : servers) {
c.setTryLater();
}

int messageCount = 300;
for (int i = 0; i < messageCount; ++i) {
client.send(new Message("routingKey", "testMessage".getBytes()));
}

// wait until some messages are restored
while (client.getRestoredMessageCount() < messageCount / 3) {
System.out.println("restored: " + client.getRestoredMessageCount());
Thread.sleep(1000);
}
restoreLatch.await(10, TimeUnit.SECONDS);
assertEquals(restoreLatch.getCount(), 0);

for (SuroServer4Test c : servers) {
c.cancelTryLater();
}
injector.getInstance(ConnectionPool.class).populateClients();

// wait until alll messages are sent
while (client.getSentMessageCount() < messageCount) {
System.out.println("sent: " + client.getSentMessageCount());
Thread.sleep(1000);
}
sentLatch.await(60, TimeUnit.SECONDS);
assertEquals(client.getSentMessageCount(), messageCount);
assertEquals(client.getLostMessageCount(), 0);

client.shutdown();
assertEquals(client.getLostMessageCount(), 0);
assertEquals(client.getSentMessageCount(), messageCount);

TestConnectionPool.checkMessageCount(servers, messageCount);
}

@Test
public void testRateLimit() throws Exception {
Properties props = new Properties();
props.put(AsyncSuroClient.asyncRateLimitConfig, "10");

setupFile(props);

AsyncSuroClient client = injector.getInstance(AsyncSuroClient.class);

long start = System.currentTimeMillis();
for (int i = 0; i < 100; ++i) {
client.send(new Message("routingKey", "testMessage".getBytes()));
}

while (client.getSentMessageCount() < 100) {
Thread.sleep(100);
}

long duration = System.currentTimeMillis() - start;
assertTrue(duration >= 5000);
}

@Test
public void shouldBeBlockedOnJobQueueFull() throws Exception {
for (SuroServer4Test c : servers) {
Expand Down

0 comments on commit 8e593d1

Please sign in to comment.