Skip to content
Permalink
Browse files
GERONIMO-6701 basic zipkin http sender
  • Loading branch information
rmannibucau committed Feb 24, 2019
1 parent 701d7c3 commit 8753b83cdba3877c113862f11a293da484d3e3cd
Showing 4 changed files with 252 additions and 5 deletions.
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.microprofile.opentracing.common.microprofile.zipkin;

import static java.util.Collections.singletonList;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static javax.ws.rs.client.Entity.entity;
import static javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Logger;
import java.util.stream.Stream;

import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.Response;

import org.apache.geronimo.microprofile.opentracing.common.config.GeronimoOpenTracingConfig;
import org.apache.geronimo.microprofile.opentracing.common.spi.Listener;
import org.eclipse.microprofile.opentracing.ClientTracingRegistrar;

// experimental
public class ZipkinHttp implements Listener<ZipkinSpan> {

private GeronimoOpenTracingConfig config;

private Jsonb jsonb;

private BlockingQueue<ZipkinSpan> spans;
private Client client;
private String collector;
private ScheduledExecutorService executor;
private ScheduledFuture<?> scheduledTask;
private int maxSpansPerBulk;
private int maxSpansIteration;

public void setConfig(final GeronimoOpenTracingConfig config) {
this.config = config;
}

public void setJsonb(final Jsonb jsonb) {
this.jsonb = jsonb;
}

public void init() {
if (jsonb == null) {
jsonb = JsonbBuilder.create();
}
final int capacity = Integer.parseInt(
config.read("span.converter.zipkin.http.bufferSize", "1000000"));
maxSpansPerBulk = Integer.parseInt(
config.read("span.converter.zipkin.http.maxSpansPerBulk", "250"));
maxSpansIteration = Integer.parseInt(
config.read("span.converter.zipkin.http.maxSpansIteration", "-1"));
collector = config.read("span.converter.zipkin.http.collector", null);
if (collector == null) {
return;
}

final long delay = Long.parseLong(
config.read("span.converter.zipkin.http.bulkSendInterval", "60000"));
if (delay < 0) {
logger().severe("No span.converter.zipkin.http.bulkSendInterval configured, skipping");
collector = null; // to skip anything
return;
}
if (delay > 0) {
executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
final Thread thread = new Thread(r, getClass().getName() + "-executor");
thread.setPriority(Thread.NORM_PRIORITY);
thread.setDaemon(false);
return thread;
}
});
scheduledTask = executor.scheduleAtFixedRate(this::onEmit, delay, delay, MILLISECONDS);
spans = new ArrayBlockingQueue<>(capacity);
} else { // == 0 => immediate send
spans = null;
}
final ClientBuilder clientBuilder = ClientBuilder.newBuilder()
.connectTimeout(Long.parseLong(config.read("span.converter.zipkin.http.connectTimeout", "30000")), MILLISECONDS)
.readTimeout(Long.parseLong(config.read("span.converter.zipkin.http.readTimeout", "30000")), MILLISECONDS);
ofNullable(config.read("span.converter.zipkin.http.providers", null))
.ifPresent(providers -> Stream.of(providers.split(","))
.map(String::trim)
.map(it -> {
try {
return Thread.currentThread().getContextClassLoader().loadClass(it)
.getConstructor().newInstance();
} catch (final Exception e) {
throw new IllegalArgumentException(e);
}
})
.forEach(clientBuilder::register));
if (Boolean.parseBoolean(config.read("span.converter.zipkin.http.selfTrace", "false"))) {
ClientTracingRegistrar.configure(clientBuilder);
}
client = clientBuilder.build();
logger().severe("Zipkin http sender configured");
}

private Logger logger() {
return Logger.getLogger("org.apache.geronimo.opentracing.zipkin.http");
}

public Jsonb getJsonb() {
return jsonb;
}

public void destroy() {
try {
jsonb.close();
} catch (final Exception e) {
// no-op
}
scheduledTask.cancel(true);
executor.shutdownNow();
try {
executor.awaitTermination(1, MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@Override
public void onEvent(final ZipkinSpan zipkinSpan) {
if (collector != null) {
if (spans == null) {
doSend(singletonList(zipkinSpan));
} else {
spans.add(zipkinSpan);
}
}
}

private void onEmit() {
final int size = this.spans.size();
final List<ZipkinSpan> copy = new ArrayList<>(size <= 0 ? maxSpansPerBulk : Math.min(size, maxSpansPerBulk));
int toSend = maxSpansIteration <= 0 ? size : Math.min(size, maxSpansIteration);
while (toSend > 0) {
this.spans.drainTo(copy, Math.min(toSend, maxSpansPerBulk));
if (copy.isEmpty()) {
break;
}
doSend(copy);
toSend -= copy.size();
copy.clear();

}
}

private void doSend(final List<ZipkinSpan> copy) {
final Response result = client.target(collector)
.request(APPLICATION_JSON_TYPE)
.post(entity(copy, APPLICATION_JSON_TYPE));
if (result.getStatus() >= 300) {
// todo: better handling but at least log them to not loose them completely or explode in memory
throw new IllegalStateException("Can't send to zipkin: " + copy);
}
}
}
@@ -41,6 +41,7 @@
import org.apache.geronimo.microprofile.opentracing.common.microprofile.client.OpenTracingClientResponseFilter;
import org.apache.geronimo.microprofile.opentracing.common.microprofile.thread.OpenTracingExecutorService;
import org.apache.geronimo.microprofile.opentracing.microprofile.zipkin.CdiZipkinConverter;
import org.apache.geronimo.microprofile.opentracing.microprofile.zipkin.CdiZipkinHttp;
import org.apache.geronimo.microprofile.opentracing.microprofile.zipkin.CdiZipkinLogger;
import org.eclipse.microprofile.opentracing.Traced;

@@ -51,12 +52,12 @@ public class OpenTracingExtension implements Extension {
private GeronimoOpenTracingConfig config;

private boolean useZipkin;
private boolean useZipkinLogger;
private String zipkinSender;

void onStart(@Observes final BeforeBeanDiscovery beforeBeanDiscovery) {
config = GeronimoOpenTracingConfig.create();
useZipkin = Boolean.parseBoolean(config.read("span.converter.zipkin.active", "true"));
useZipkinLogger = useZipkin && Boolean.parseBoolean(config.read("span.converter.zipkin.logger.active", "true"));
zipkinSender = config.read("span.converter.zipkin.sender", "logger");
}

void vetoDefaultConfigIfScanned(@Observes final ProcessAnnotatedType<GeronimoOpenTracingConfig> config) {
@@ -101,11 +102,17 @@ void zipkinConverterToggle(@Observes final ProcessAnnotatedType<CdiZipkinConvert
}

void zipkinLoggerToggle(@Observes final ProcessAnnotatedType<CdiZipkinLogger> onZipkinLogger) {
if (!useZipkinLogger) {
if (!"logger".equalsIgnoreCase(zipkinSender) || !Boolean.parseBoolean(config.read("span.converter.zipkin.logger.active", "true"))) {
onZipkinLogger.veto();
}
}

void zipkinHttpToggle(@Observes final ProcessAnnotatedType<CdiZipkinHttp> onZipkinHttp) {
if (!"http".equalsIgnoreCase(zipkinSender) || !Boolean.parseBoolean(config.read("span.converter.zipkin.http.active", "true"))) {
onZipkinHttp.veto();
}
}

<T> void removeTracedFromJaxRsEndpoints(@Observes @WithAnnotations(Traced.class) final ProcessAnnotatedType<T> pat) {
if (isJaxRs(pat.getAnnotatedType())) { // we have filters with more accurate timing
final AnnotatedTypeConfigurator<T> configurator = pat.configureAnnotatedType();
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.microprofile.opentracing.microprofile.zipkin;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;

import org.apache.geronimo.microprofile.opentracing.common.config.GeronimoOpenTracingConfig;
import org.apache.geronimo.microprofile.opentracing.common.microprofile.zipkin.ZipkinHttp;
import org.apache.geronimo.microprofile.opentracing.common.microprofile.zipkin.ZipkinSpan;

@ApplicationScoped
public class CdiZipkinHttp extends ZipkinHttp {

@Inject
private GeronimoOpenTracingConfig config;

private Jsonb jsonb;

@PostConstruct
public void init() {
setConfig(config);
jsonb = JsonbBuilder.create();
super.init();
}

@PreDestroy
public void destroy() {
super.destroy();
}

public void onZipkinSpan(@Observes final ZipkinSpan zipkinSpan) {
super.onEvent(zipkinSpan);
}
}
@@ -37,8 +37,6 @@ public class CdiZipkinLogger extends ZipkinLogger {

private Jsonb jsonb;

private boolean wrapAsList;

@PostConstruct
public void init() {
setConfig(config);

0 comments on commit 8753b83

Please sign in to comment.