Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics, java: add support for Micrometer metrics #2496

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions client/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ ext {
junit5Version = '5.10.2'
lombokVersion = '1.18.32'
mockitoVersion = '5.2.0'
micrometerVersion = '1.12.4'
isReleaseVersion = !version.endsWith('SNAPSHOT')
}

Expand All @@ -63,10 +64,13 @@ dependencies {
implementation 'org.apache.httpcomponents:httpclient:4.5.14'
implementation 'commons-logging:commons-logging:1.3.0'
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation "io.micrometer:micrometer-core:${micrometerVersion}"

compileOnly 'com.google.code.findbugs:jsr305:3.0.2'
compileOnly 'org.apache.kafka:kafka-clients:3.7.0'
compileOnly 'com.amazonaws:amazon-kinesis-producer:0.15.10'
compileOnly "org.projectlombok:lombok:${lombokVersion}"
compileOnly "io.micrometer:micrometer-registry-statsd:${micrometerVersion}"
annotationProcessor "org.projectlombok:lombok:${lombokVersion}"

codeGenerator project(':generator')
Expand All @@ -77,6 +81,7 @@ dependencies {
testImplementation "org.mockito:mockito-inline:${mockitoVersion}"
testImplementation "org.mockito:mockito-junit-jupiter:${mockitoVersion}"
testImplementation "org.projectlombok:lombok:${lombokVersion}"
testImplementation "io.micrometer:micrometer-registry-statsd:${micrometerVersion}"
testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}"
}

Expand Down
21 changes: 18 additions & 3 deletions client/java/src/main/java/io/openlineage/client/Clients.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.openlineage.client;

import io.openlineage.client.circuitBreaker.CircuitBreakerFactory;
import io.openlineage.client.metrics.MicrometerProvider;
import io.openlineage.client.transports.NoopTransport;
import io.openlineage.client.transports.Transport;
import io.openlineage.client.transports.TransportFactory;
Expand All @@ -21,12 +22,18 @@ public static OpenLineageClient newClient() {
}

public static OpenLineageClient newClient(ConfigPathProvider configPathProvider) {
String isDisabled = Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED");
if (Boolean.parseBoolean(isDisabled)) {
if (isDisabled()) {
return OpenLineageClient.builder().transport(new NoopTransport()).build();
}
final OpenLineageYaml openLineageYaml =
OpenLineageClientUtils.loadOpenLineageYaml(configPathProvider);
return newClient(openLineageYaml);
}

public static OpenLineageClient newClient(OpenLineageYaml openLineageYaml) {
if (isDisabled()) {
return OpenLineageClient.builder().transport(new NoopTransport()).build();
}
final TransportFactory factory = new TransportFactory(openLineageYaml.getTransportConfig());
final Transport transport = factory.build();
// ...
Expand All @@ -37,9 +44,17 @@ public static OpenLineageClient newClient(ConfigPathProvider configPathProvider)
}

Optional.ofNullable(openLineageYaml.getCircuitBreaker())
.map(config -> new CircuitBreakerFactory(config))
.map(CircuitBreakerFactory::new)
.ifPresent(f -> builder.circuitBreaker(f.build()));

Optional.ofNullable(openLineageYaml.getMetricsConfig())
.map(MicrometerProvider::addMeterRegistryFromConfig)
.ifPresent(builder::meterRegistry);
return builder.transport(transport).build();
}

private static boolean isDisabled() {
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved
String disabled = Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED");
return (Boolean.parseBoolean(disabled));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@

package io.openlineage.client;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.openlineage.client.circuitBreaker.CircuitBreaker;
import io.openlineage.client.metrics.MicrometerProvider;
import io.openlineage.client.transports.ConsoleTransport;
import io.openlineage.client.transports.Transport;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -18,8 +25,14 @@
public final class OpenLineageClient {
final Transport transport;
final Optional<CircuitBreaker> circuitBreaker;
final MeterRegistry meterRegistry;
final String[] disabledFacets;

Counter emitStart;
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
Counter emitComplete;
AtomicInteger engagedCircuitBreaker;
Timer emitTime;

/** Creates a new {@code OpenLineageClient} object. */
public OpenLineageClient() {
this(new ConsoleTransport());
Expand All @@ -30,15 +43,23 @@ public OpenLineageClient(@NonNull final Transport transport) {
}

public OpenLineageClient(@NonNull final Transport transport, String... disabledFacets) {
this(transport, null, disabledFacets);
this(transport, null, null, disabledFacets);
}

public OpenLineageClient(
@NonNull final Transport transport, CircuitBreaker circuitBreaker, String... disabledFacets) {
@NonNull final Transport transport,
CircuitBreaker circuitBreaker,
MeterRegistry meterRegistry,
String... disabledFacets) {
this.transport = transport;
this.disabledFacets = Arrays.copyOf(disabledFacets, disabledFacets.length);
this.circuitBreaker = Optional.ofNullable(circuitBreaker);
if (meterRegistry == null) {
meterRegistry = MicrometerProvider.getMeterRegistry();
}
this.meterRegistry = meterRegistry;

initializeMetrics();
OpenLineageClientUtils.configureObjectMapper(disabledFacets);
}

Expand All @@ -54,10 +75,15 @@ public void emit(@NonNull OpenLineage.RunEvent runEvent) {
"OpenLineageClient will emit lineage event: {}", OpenLineageClientUtils.toJson(runEvent));
}
if (circuitBreaker.isPresent() && circuitBreaker.get().currentState().isClosed()) {
engagedCircuitBreaker.set(1);
log.warn("OpenLineageClient disabled with circuit breaker");
return;
} else {
engagedCircuitBreaker.set(0);
}
transport.emit(runEvent);
emitStart.increment();
emitTime.record(() -> transport.emit(runEvent));
emitComplete.increment();
}

/**
Expand All @@ -73,10 +99,15 @@ public void emit(@NonNull OpenLineage.DatasetEvent datasetEvent) {
OpenLineageClientUtils.toJson(datasetEvent));
}
if (circuitBreaker.isPresent() && circuitBreaker.get().currentState().isClosed()) {
engagedCircuitBreaker.set(1);
log.warn("OpenLineageClient disabled with circuit breaker");
return;
} else {
engagedCircuitBreaker.set(0);
}
transport.emit(OpenLineageClientUtils.toJson(datasetEvent));
emitStart.increment();
emitTime.record(() -> transport.emit(OpenLineageClientUtils.toJson(datasetEvent)));
emitComplete.increment();
}

/**
Expand All @@ -91,10 +122,33 @@ public void emit(@NonNull OpenLineage.JobEvent jobEvent) {
"OpenLineageClient will emit lineage event: {}", OpenLineageClientUtils.toJson(jobEvent));
}
if (circuitBreaker.isPresent() && circuitBreaker.get().currentState().isClosed()) {
engagedCircuitBreaker.set(1);
log.warn("OpenLineageClient disabled with circuit breaker");
return;
} else {
engagedCircuitBreaker.set(0);
}
transport.emit(OpenLineageClientUtils.toJson(jobEvent));
emitStart.increment();
emitTime.record(() -> transport.emit(OpenLineageClientUtils.toJson(jobEvent)));
emitComplete.increment();
}

public void initializeMetrics() {
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved
emitStart =
this.meterRegistry.counter(
"openlineage.emit.start", "openlineage.transport", transport.getClass().getName());
emitComplete =
this.meterRegistry.counter(
"openlineage.emit.complete", "openlineage.transport", transport.getClass().getName());
engagedCircuitBreaker =
this.meterRegistry.gauge(
"openlineage.circuitbreaker.engaged",
Collections.singletonList(
Tag.of("openlineage.circuitbreaker", circuitBreaker.getClass().getName())),
new AtomicInteger(0));
emitTime =
this.meterRegistry.timer(
"openlineage.emit.time", "openlineage.transport", transport.getClass().getName());
}

/**
Expand All @@ -121,6 +175,7 @@ public static final class Builder {
private Transport transport;
private String[] disabledFacets;
private CircuitBreaker circuitBreaker;
private MeterRegistry meterRegistry;

private Builder() {
this.transport = DEFAULT_TRANSPORT;
Expand All @@ -137,6 +192,11 @@ public Builder circuitBreaker(@NonNull CircuitBreaker circuitBreaker) {
return this;
}

public Builder meterRegistry(@NonNull MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
return this;
}

public Builder disableFacets(@NonNull String... disabledFacets) {
this.disabledFacets = Arrays.copyOf(disabledFacets, disabledFacets.length);
return this;
Expand All @@ -147,7 +207,7 @@ public Builder disableFacets(@NonNull String... disabledFacets) {
* OpenLineageClient.Builder}.
*/
public OpenLineageClient build() {
return new OpenLineageClient(transport, circuitBreaker, disabledFacets);
return new OpenLineageClient(transport, circuitBreaker, meterRegistry, disabledFacets);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ private OpenLineageClientUtils() {}

private static final ObjectMapper MAPPER = newObjectMapper();

private static final ObjectMapper YML = new ObjectMapper(new YAMLFactory());
private static final ObjectMapper JSON = new ObjectMapper(new JsonFactory());
private static final ObjectMapper YML = newObjectMapper(new YAMLFactory());
private static final ObjectMapper JSON = newObjectMapper();

@JsonFilter("disabledFacets")
public class DisabledFacetsMixin {}
Expand All @@ -58,7 +58,17 @@ public class DisabledFacetsMixin {}
* @return A configured {@link ObjectMapper} instance.
*/
public static ObjectMapper newObjectMapper() {
final ObjectMapper mapper = new ObjectMapper();
return newObjectMapper(new JsonFactory());
}

/**
* Creates a new {@link ObjectMapper} instance configured with modules for JDK8 and JavaTime,
* including settings to ignore unknown properties and to not write dates as timestamps.
*
* @return A configured {@link ObjectMapper} instance.
*/
public static ObjectMapper newObjectMapper(JsonFactory jsonFactory) {
final ObjectMapper mapper = new ObjectMapper(jsonFactory);
mapper.registerModule(new Jdk8Module());
mapper.registerModule(new JavaTimeModule());
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,27 @@

package io.openlineage.client;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.openlineage.client.circuitBreaker.CircuitBreakerConfig;
import io.openlineage.client.transports.FacetsConfig;
import io.openlineage.client.transports.TransportConfig;
import java.util.Map;
import lombok.Getter;

/** Configuration for {@link OpenLineageClient}. */
@JsonIgnoreProperties
@Getter
public class OpenLineageYaml {
@Getter
@JsonProperty("transport")
private TransportConfig transportConfig;

@Getter
@JsonProperty("facets")
private FacetsConfig facetsConfig;

@Getter
@JsonProperty("circuitBreaker")
private CircuitBreakerConfig circuitBreaker;

@JsonProperty("metrics")
private Map<String, Object> metricsConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

package io.openlineage.client.circuitBreaker;

import static io.openlineage.client.circuitBreaker.RuntimeUtils.freeMemory;
import static io.openlineage.client.circuitBreaker.RuntimeUtils.getGarbageCollectorMXBeans;
import static io.openlineage.client.circuitBreaker.RuntimeUtils.maxMemory;
import static io.openlineage.client.circuitBreaker.RuntimeUtils.totalMemory;
import static io.openlineage.client.utils.RuntimeUtils.freeMemory;
import static io.openlineage.client.utils.RuntimeUtils.getGarbageCollectorMXBeans;
import static io.openlineage.client.utils.RuntimeUtils.maxMemory;
import static io.openlineage.client.utils.RuntimeUtils.totalMemory;

import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

package io.openlineage.client.circuitBreaker;

import static io.openlineage.client.circuitBreaker.RuntimeUtils.freeMemory;
import static io.openlineage.client.circuitBreaker.RuntimeUtils.maxMemory;
import static io.openlineage.client.circuitBreaker.RuntimeUtils.totalMemory;
import static io.openlineage.client.utils.RuntimeUtils.freeMemory;
import static io.openlineage.client.utils.RuntimeUtils.maxMemory;
import static io.openlineage.client.utils.RuntimeUtils.totalMemory;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.metrics;

import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import java.util.List;
import java.util.Map;

/**
* A builder class that provides implementations to build composite meter registries. This class
* implements the MetricsBuilder interface with CompositeMeterRegistry as its type.
*
* <p>CompositeMeterRegistry is a type of MeterRegistry, that encapsulates two or more meter
* registries into one, and manages unified functionalities across all registries.
*/
public class CompositeMeterRegistryFactory implements MeterRegistryFactory<CompositeMeterRegistry> {

/**
* Constructs a CompositeMeterRegistry from a given map of configuration options. The "registries"
* key in the map is expected to provide a list of meter registry configurations. Each
* configuration is parsed and, if parsed successfully, added to the CompositeMeterRegistry.
*
* @param config The map containing the configurations for composite meter registry.
* @return A CompositeMeterRegistry built from the provided configuration. An empty
* CompositeMeterRegistry is returned if the the map doesn't contain a list of configurations
* extended by registries.
*/
@Override
public CompositeMeterRegistry registry(Map<String, Object> config) {
CompositeMeterRegistry meterRegistry = new CompositeMeterRegistry();
Object registries = config.get("registries");
if (!(registries instanceof List) || ((List<?>) registries).isEmpty()) {
return meterRegistry;
}
for (Object registryConfig : (List<Object>) registries) {
if (registryConfig instanceof Map || !((Map<?, ?>) registryConfig).isEmpty()) {
MicrometerProvider.parseMeterRegistryConfig((Map<String, Object>) registryConfig)
.ifPresent(meterRegistry::add);
}
}
return meterRegistry;
}

@Override
public String type() {
return "composite";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.metrics;

import io.micrometer.core.instrument.MeterRegistry;
import java.util.Map;

/**
* MeterRegistryFactory is an interface that defines methods to build different types of meter
* registries from OpenLineage config.
*/
public interface MeterRegistryFactory<T extends MeterRegistry> {
T registry(Map<String, Object> config);

String type();
}