Skip to content

Commit

Permalink
change correlation id to string
Browse files Browse the repository at this point in the history
  • Loading branch information
jijisv committed Jul 11, 2019
1 parent 89d6c31 commit 2e0a083
Show file tree
Hide file tree
Showing 22 changed files with 130 additions and 174 deletions.
3 changes: 1 addition & 2 deletions micro-client/README.md
Expand Up @@ -48,7 +48,6 @@ public class EndPoint {
private final EventBus bus;
private final NIORestClient restClient;
private final String URL;
private final AtomicLong correlationId = new AtomicLong(0);
@Autowired
public EndPoint(NIORestClient restClient,EventBus bus, @Value("${url:}") String URL){

Expand All @@ -67,7 +66,7 @@ public class EndPoint {
public void async(RequestType query,@Suspended AsyncResponse asyncResponse){


final long correlationId = this.correlationId.incrementAndGet();
final String correlationId = UUID.randomUUID().toString();
bus.post(RequestEvents.start(query, correlationId,"standard-query",HashMapBuilder.of("ip",QueryIPRetriever.getIpAddress())));
builder.from(this.restClient.postForEntity(URL, new HttpEntity(JacksonUtil.serializeToJson(convertList(query)),headers),String.class))
.sync()
Expand Down
Expand Up @@ -29,12 +29,12 @@ public EventStatusResource(EventBus bus ){
@Produces("text/plain")
@Path("/ping")
public String ping() {
bus.post(RequestEvents.start("get", 1l));
bus.post(RequestEvents.start("get", "1"));
try{
return "ok";
}finally{
bus.post(RequestEvents.finish("get",1l));
bus.post(RequestEvents.finish("get","1"));
}
}

}
}
Expand Up @@ -9,7 +9,7 @@

public class TimerManager {

private final Cache<Long, Context> contexts;
private final Cache<String, Context> contexts;

public TimerManager(long maxSize, int minutesUntilExpire) {
contexts = CacheBuilder.newBuilder()
Expand All @@ -18,13 +18,13 @@ public TimerManager(long maxSize, int minutesUntilExpire) {
.build();
}

public void complete(long id) {
public void complete(String id) {
Maybe.ofNullable(contexts.getIfPresent(id))
.forEach(c -> c.stop());
.forEach(Context::stop);
contexts.invalidate(id);
}

public void start(long id, Context context) {
public void start(String id, Context context) {
contexts.put(id, context);

}
Expand Down
Expand Up @@ -6,6 +6,8 @@
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import cyclops.reactive.collections.mutable.MapX;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -32,28 +34,28 @@ public EventStatusResource(EventBus bus, MetricRegistry metrics) {
@Produces("text/plain")
@Path("/ping")
public String ping() {
bus.post(RequestEvents.start("get", 1l));
bus.post(RequestEvents.start("get", "1"));
try {
return "ok";
} finally {
bus.post(RequestEvents.finish("get", 1l));
bus.post(RequestEvents.finish("get", "1"));
}
}

@GET
@Produces("application/json")
@Path("/counters")
public Map<String, Long> counters() {
return (MapX) MapX.fromMap(metrics.getCounters())
.bimap(k -> k, v -> v.getCount());
return MapX.fromMap(metrics.getCounters())
.bimap(k -> k, Counter::getCount);
}

@GET
@Produces("application/json")
@Path("/meters")
public Map<String, Long> meters() {
return (MapX) MapX.fromMap(metrics.getMeters())
.bimap(k -> k, v -> v.getCount());
return MapX.fromMap(metrics.getMeters())
.bimap(k -> k, Meter::getCount);
}

}
Expand Up @@ -35,9 +35,8 @@ public void setup() {
@Test
public void queriesStartMeterInc() {

catcher.requestStart(new AddQuery(
RequestData.builder()
.correlationId(10l)
catcher.requestStart(new AddQuery(RequestData.builder()
.correlationId("10")
.type("test")
.build()));
assertThat(registry.meter(this.config.getPrefix() + ".request-start-test")
Expand All @@ -48,9 +47,8 @@ public void queriesStartMeterInc() {
@Test
public void queriesEndMeterInc() {

catcher.requestComplete(new RemoveQuery(
RequestData.builder()
.correlationId(10l)
catcher.requestComplete(new RemoveQuery(RequestData.builder()
.correlationId("10")
.type("test")
.build()));
assertThat(registry.meter(this.config.getPrefix() + ".request-end-test")
Expand All @@ -61,9 +59,8 @@ public void queriesEndMeterInc() {
@Test
public void queriesCounterInc() {

catcher.requestStart(new AddQuery(
RequestData.builder()
.correlationId(10l)
catcher.requestStart(new AddQuery(RequestData.builder()
.correlationId("10")
.type("test")
.build()));
assertThat(registry.counter(this.config.getPrefix() + ".requests-active-test-count")
Expand All @@ -74,9 +71,8 @@ public void queriesCounterInc() {
@Test
public void queriesCounterDec() {

catcher.requestComplete(new RemoveQuery(
RequestData.builder()
.correlationId(10l)
catcher.requestComplete(new RemoveQuery(RequestData.builder()
.correlationId("10")
.type("test")
.build()));
assertThat(registry.counter(this.config.getPrefix() + ".requests-active-test-count")
Expand All @@ -87,8 +83,7 @@ public void queriesCounterDec() {
@Test
public void jobsCounterDec() {

catcher.jobComplete(new JobCompleteEvent(
10l, "test", 10l, 5l));
catcher.jobComplete(new JobCompleteEvent("10", "test", 10l, 5l));
assertThat(registry.counter(this.config.getPrefix() + ".jobs-active-test-count")
.getCount(),
equalTo(0l));
Expand All @@ -97,15 +92,13 @@ public void jobsCounterDec() {
@Test
public void queriesTimer() {

catcher.requestStart(new AddQuery(
RequestData.builder()
.correlationId(10l)
catcher.requestStart(new AddQuery(RequestData.builder()
.correlationId("10")
.type("test")
.build()));

catcher.requestComplete(new RemoveQuery(
RequestData.builder()
.correlationId(10l)
catcher.requestComplete(new RemoveQuery(RequestData.builder()
.correlationId("10")
.type("test")
.build()));
assertThat(registry.timer(this.config.getPrefix() + ".request-end-test-timer")
Expand All @@ -116,8 +109,7 @@ public void queriesTimer() {
@Test
public void jobsMeterInc() {

catcher.jobStarted(new JobStartEvent(
10l, "test"));
catcher.jobStarted(new JobStartEvent("10", "test"));
assertThat(registry.meter(this.config.getPrefix() + ".job-meter-test")
.getMeanRate(),
equalTo(0.0));
Expand All @@ -126,8 +118,7 @@ public void jobsMeterInc() {
@Test
public void jobsCounterInc() {

catcher.jobStarted(new JobStartEvent(
10l, "test"));
catcher.jobStarted(new JobStartEvent("10", "test"));
assertThat(registry.counter(this.config.getPrefix() + ".jobs-active-test-count")
.getCount(),
equalTo(0l));
Expand Down
Expand Up @@ -39,7 +39,7 @@ public void queriesStartMeterInc() {

catcher.requestStart(new AddQuery(
RequestData.builder()
.correlationId(10l)
.correlationId("10")
.type("test")
.build()));
assertThat(registry.meter(this.config.getPrefix() + ".request-start-test-meter")
Expand All @@ -52,7 +52,7 @@ public void queriesEndMeterInc() {

catcher.requestComplete(new RemoveQuery(
RequestData.builder()
.correlationId(10l)
.correlationId("10")
.type("test")
.build()));
assertThat(registry.meter(this.config.getPrefix() + ".request-end-test")
Expand All @@ -65,7 +65,7 @@ public void queriesCounterInc() {

catcher.requestStart(new AddQuery(
RequestData.builder()
.correlationId(10l)
.correlationId("10")
.type("test")
.build()));
assertThat(registry.counter(this.config.getPrefix() + ".requests-active-test-count")
Expand All @@ -78,7 +78,7 @@ public void queriesCounterDec() {

catcher.requestComplete(new RemoveQuery(
RequestData.builder()
.correlationId(10l)
.correlationId("10")
.type("test")
.build()));
assertThat(registry.counter(this.config.getPrefix() + ".requests-active-test-count")
Expand All @@ -91,7 +91,7 @@ public void queriesIntervalCounterInc() {

catcher.requestStart(new AddQuery(
RequestData.builder()
.correlationId(10l)
.correlationId("10")
.type("test")
.build()));
assertThat(registry.getGauges().size(), equalTo(2));
Expand All @@ -104,7 +104,7 @@ public void queriesIntervalCounterDec() {

catcher.requestComplete(new RemoveQuery(
RequestData.builder()
.correlationId(10l)
.correlationId("10")
.type("test")
.build()));
assertThat(registry.getGauges().size(), equalTo(2));
Expand All @@ -118,8 +118,7 @@ public void queriesIntervalCounterDec() {
@Test
public void jobsCounterDec() {

catcher.jobComplete(new JobCompleteEvent(
10l, "test", 10l, 5l));
catcher.jobComplete(new JobCompleteEvent("10", "test", 10l, 5l));
assertThat(registry.counter(this.config.getPrefix() + ".jobs-active-test-count")
.getCount(),
equalTo(-1l));
Expand All @@ -128,15 +127,13 @@ public void jobsCounterDec() {
@Test
public void queriesTimer() {

catcher.requestStart(new AddQuery(
RequestData.builder()
.correlationId(10l)
catcher.requestStart(new AddQuery(RequestData.builder()
.correlationId("10")
.type("test")
.build()));

catcher.requestComplete(new RemoveQuery(
RequestData.builder()
.correlationId(10l)
catcher.requestComplete(new RemoveQuery(RequestData.builder()
.correlationId("10")
.type("test")
.build()));
assertThat(registry.timer(this.config.getPrefix() + ".request-end-test-timer")
Expand All @@ -147,8 +144,7 @@ public void queriesTimer() {
@Test
public void jobsMeterInc() {

catcher.jobStarted(new JobStartEvent(
10l, "test"));
catcher.jobStarted(new JobStartEvent("10", "test"));
assertThat(registry.meter(this.config.getPrefix() + ".job-meter-test")
.getMeanRate(),
greaterThan(0.0));
Expand All @@ -157,8 +153,7 @@ public void jobsMeterInc() {
@Test
public void jobsCounterInc() {

catcher.jobStarted(new JobStartEvent(
10l, "test"));
catcher.jobStarted(new JobStartEvent("10", "test"));
assertThat(registry.counter(this.config.getPrefix() + ".jobs-active-test-count")
.getCount(),
equalTo(1l));
Expand Down
Expand Up @@ -18,14 +18,14 @@ public void setup() {

@Test
public void whenValueNotPresentNoError() {
manager.complete(1l);
manager.complete("1");
}

@Test
public void whenValueAddedAndRemovedStopCalled() {
Context c = Mockito.mock(Context.class);
manager.start(1l, c);
manager.complete(1l);
manager.start("1", c);
manager.complete("1");
Mockito.verify(c, Mockito.times(1))
.stop();

Expand Down
Expand Up @@ -10,7 +10,7 @@
public class JobCompleteEvent {

private final Date date = new Date();
private final long correlationId;
private final String correlationId;
private final String type;
private final long errors;
private final long dataSize;
Expand Down
Expand Up @@ -10,7 +10,7 @@
public class JobStartEvent {

private final Date date = new Date();
private final long correlationId;
private final String correlationId;
private final String type;

}
Expand Up @@ -79,9 +79,8 @@ private Object executeScheduledJob(final ProceedingJoinPoint pjp, final String t
events.active(id, data);

SystemData retVal = null;
long correlationId = r.nextLong();
eventBus.post(new JobStartEvent(
correlationId, type));
String correlationId = String.valueOf(r.nextLong());
eventBus.post(new JobStartEvent(correlationId, type));
try {

retVal = Optional.ofNullable(((SystemData) pjp.proceed()))
Expand All @@ -92,12 +91,11 @@ private Object executeScheduledJob(final ProceedingJoinPoint pjp, final String t
logSystemEvent(pjp, type, data, retVal);
retVal = Optional.ofNullable(retVal)
.orElse(SystemData.builder()
.correlationId("" + correlationId)
.correlationId(correlationId)
.errors(0l)
.processed(0l)
.build());
eventBus.post(new JobCompleteEvent(
correlationId, type, retVal.getErrors(), retVal.getProcessed()));
eventBus.post(new JobCompleteEvent(correlationId, type, retVal.getErrors(), retVal.getProcessed()));
}
}

Expand All @@ -108,12 +106,7 @@ private void logSystemEvent(final ProceedingJoinPoint pjp, final String type, Jo
.getClass());
loggingRateLimiter.capacityAvailable(pjp.getTarget()
.getClass(),
this.maxLoggingCapacity, new Runnable() {
@Override
public void run() {
postEvent(pjp, type, data, active);
}
});
this.maxLoggingCapacity, () -> postEvent(pjp, type, data, active));
}

private void postEvent(ProceedingJoinPoint pjp, String type, JobExecutingData data, SystemData retVal) {
Expand Down

0 comments on commit 2e0a083

Please sign in to comment.