Skip to content

Commit

Permalink
Adding simple support for backend telemetry. (#19257)
Browse files Browse the repository at this point in the history
* Adding posthog java library, setting up client.

* Adding capture method, adding cluster id.

* Capture only if telemetry is enabled.

* Establishing periodical and bindings for metrics suppliers.

* Make generated `TelemetryEvent` optional.

* Adding convenience methods.

* Adding license headers.

* Construct Posthog client for each submission, shutdown after.

* Replacing posthog client with simple homebrewn client.

* Adding path to Retrofit interface method.

* Handle errors while submitting events.

* Do not run periodial if telemetry is disabled.

* Do not run `capture` if metrics are empty.

* Including cluster id in groups and separate field.
  • Loading branch information
dennisoelkers committed May 14, 2024
1 parent 70b318c commit b569b1a
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.inject.Module;
import com.google.inject.spi.Message;
import com.mongodb.MongoException;
import jakarta.inject.Inject;
import org.graylog.enterprise.EnterpriseModule;
import org.graylog.events.EventsModule;
import org.graylog.events.processor.EventDefinitionConfiguration;
Expand Down Expand Up @@ -109,11 +110,10 @@
import org.graylog2.streams.StreamsModule;
import org.graylog2.system.processing.ProcessingStatusConfig;
import org.graylog2.system.shutdown.GracefulShutdown;
import org.graylog2.telemetry.TelemetryModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;

import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
Expand Down Expand Up @@ -212,7 +212,8 @@ protected List<Module> getCommandBindings(FeatureFlags featureFlags) {
new TracingModule(),
new DataTieringModule(),
new DatanodeMigrationBindings(),
new CaModule()
new CaModule(),
new TelemetryModule()
);

modules.add(new FieldTypeManagementModule());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.graylog2.shared.messageq.MessageQueueReader;
import org.graylog2.shared.messageq.MessageQueueWriter;
import org.graylog2.telemetry.scheduler.TelemetryMetricSupplier;
import org.graylog2.web.PluginUISettingsProvider;

import java.util.Collections;
Expand Down Expand Up @@ -440,4 +441,16 @@ protected void addEntityScope(Class<? extends EntityScope> entityScopeType) {
Multibinder<EntityScope> scopeBinder = Multibinder.newSetBinder(binder(), EntityScope.class);
scopeBinder.addBinding().to(entityScopeType);
}

protected MapBinder<String, TelemetryMetricSupplier> telemetryMetricSupplierBinder() {
return MapBinder.newMapBinder(binder(), String.class, TelemetryMetricSupplier.class);
}

protected void addTelemetryMetricProvider(String eventId, Class<? extends TelemetryMetricSupplier> eventSupplier) {
telemetryMetricSupplierBinder().addBinding(eventId).to(eventSupplier);
}

protected void addTelemetryMetricProvider(String eventId, TelemetryMetricSupplier eventSupplier) {
telemetryMetricSupplierBinder().addBinding(eventId).toInstance(eventSupplier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.telemetry;

import org.graylog2.plugin.PluginModule;
import org.graylog2.telemetry.scheduler.TelemetrySubmissionPeriodical;

public class TelemetryModule extends PluginModule {
@Override
protected void configure() {
// Initializing binder so it can be injected with no actual bindings
telemetryMetricSupplierBinder();

addPeriodical(TelemetrySubmissionPeriodical.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.telemetry.client;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import retrofit2.Call;
import retrofit2.http.Body;
import retrofit2.http.POST;

import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;

public interface PosthogAPI {
record BatchRequest(@JsonProperty("api_key") String apiKey, @JsonProperty("batch") Collection<Event> batch) {}

record Event(@JsonProperty("uuid") String uuid,
@JsonProperty("timestamp") String timestamp,
@JsonProperty("distinct_id") String distinctId,
@JsonProperty("event") String event,
@JsonProperty("properties") Map<String, Object> properties) {
public static Event create(String clusterId, String event, Map<String, Object> properties) {
final var groups = Map.of("cluster", clusterId);
final var propertiesWithGroups = ImmutableMap.<String, Object>builder()
.putAll(properties)
.put("$groups", groups)
.put("cluster_id", clusterId)
.build();
return new Event(UUID.randomUUID().toString(), Instant.now().toString(), clusterId, event, propertiesWithGroups);
}
}

@POST("/batch")
Call<Void> batchSend(@Body BatchRequest batchRequest);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.telemetry.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import okhttp3.OkHttpClient;
import org.graylog2.configuration.TelemetryConfiguration;
import org.graylog2.telemetry.cluster.TelemetryClusterService;
import org.graylog2.telemetry.scheduler.TelemetryEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import retrofit2.Retrofit;
import retrofit2.converter.jackson.JacksonConverterFactory;

import java.io.IOException;
import java.util.Map;

@Singleton
public class TelemetryClient {
private static final Logger LOG = LoggerFactory.getLogger(TelemetryClient.class);
private final PosthogAPI posthog;
private final String clusterId;
private final boolean isEnabled;
private final String apiKey;

@Inject
public TelemetryClient(TelemetryConfiguration telemetryConfiguration, TelemetryClusterService telemetryClusterService,
OkHttpClient okHttpClient, ObjectMapper objectMapper) {
this.isEnabled = telemetryConfiguration.isTelemetryEnabled();
this.apiKey = telemetryConfiguration.getTelemetryApiKey();
this.posthog = new Retrofit.Builder()
.baseUrl(telemetryConfiguration.getTelemetryApiHost())
.addConverterFactory(JacksonConverterFactory.create(objectMapper))
.client(okHttpClient)
.build()
.create(PosthogAPI.class);
this.clusterId = telemetryClusterService.getClusterId();
}

public void capture(Map<String, TelemetryEvent> events) throws IOException {
if (isEnabled) {
final var batch = events.entrySet()
.stream()
.map(entry -> PosthogAPI.Event.create(clusterId, entry.getKey(), entry.getValue().metrics()))
.toList();
final var request = new PosthogAPI.BatchRequest(apiKey, batch);
final var response = posthog.batchSend(request).execute();
if (!response.isSuccessful()) {
throw new RuntimeException("Submitting telemetry failed with status " + response.code() + " - message: " + response.message());
}
}
}

public boolean isEnabled() {
return isEnabled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.telemetry.scheduler;

import java.util.Map;

public record TelemetryEvent(Map<String, Object> metrics) {
public static TelemetryEvent of(Map<String, Object> metrics) {
return new TelemetryEvent(metrics);
}

public static TelemetryEvent of(String key, Object value) {
return TelemetryEvent.of(Map.of(key, value));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.telemetry.scheduler;

import java.util.Optional;
import java.util.function.Supplier;

public interface TelemetryMetricSupplier extends Supplier<Optional<TelemetryEvent>> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.telemetry.scheduler;

import com.github.joschi.jadconfig.util.Duration;
import jakarta.inject.Inject;
import org.graylog2.configuration.TelemetryConfiguration;
import org.graylog2.plugin.periodical.Periodical;
import org.graylog2.telemetry.client.TelemetryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.util.AbstractMap;
import java.util.Map;
import java.util.stream.Collectors;

public class TelemetrySubmissionPeriodical extends Periodical {

private static final Logger LOG = LoggerFactory.getLogger(TelemetrySubmissionPeriodical.class);
private final TelemetryClient telemetryClient;
private final Map<String, TelemetryMetricSupplier> metricsProviders;
private static final Duration runPeriod = Duration.days(1);
private final boolean isEnabled;

@Inject
public TelemetrySubmissionPeriodical(TelemetryClient telemetryClient,
TelemetryConfiguration telemetryConfiguration,
Map<String, TelemetryMetricSupplier> metricsProviders) {
this.telemetryClient = telemetryClient;
this.metricsProviders = metricsProviders;
this.isEnabled = telemetryConfiguration.isTelemetryEnabled();
}

@Override
public void doRun() {
final var telemetryMetrics = metricsProviders.entrySet()
.stream()
.map(entry -> entry(entry.getKey(), entry.getValue().get()))
.flatMap(entry -> entry.getValue().map(metrics -> entry(entry.getKey(), metrics)).stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
try {
if (!telemetryMetrics.isEmpty()) {
telemetryClient.capture(telemetryMetrics);
}
} catch (Exception e) {
LOG.warn("Error while submitting telemetry: ", e);
}
}

private <K, V> Map.Entry<K, V> entry(K key, V value) {
return new AbstractMap.SimpleEntry<>(key, value);
}

@Override
public boolean runsForever() {
return false;
}

@Override
public boolean stopOnGracefulShutdown() {
return true;
}

@Override
public boolean startOnThisNode() {
return isEnabled;
}

@Override
public boolean isDaemon() {
return true;
}

@Override
public int getInitialDelaySeconds() {
return Math.toIntExact(runPeriod.toSeconds());
}

@Override
public int getPeriodSeconds() {
return Math.toIntExact(runPeriod.toSeconds());
}

@Nonnull
@Override
protected Logger getLogger() {
return LOG;
}
}

0 comments on commit b569b1a

Please sign in to comment.