Skip to content

Commit

Permalink
metrics: implement java/spark metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Mar 17, 2024
1 parent a10759e commit f673013
Show file tree
Hide file tree
Showing 77 changed files with 988 additions and 136 deletions.
5 changes: 5 additions & 0 deletions client/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ ext {
junit5Version = '5.10.2'
lombokVersion = '1.18.30'
mockitoVersion = '5.2.0'
micrometerVersion = '1.12.4'
isReleaseVersion = !version.endsWith('SNAPSHOT')
}

Expand All @@ -63,10 +64,13 @@ dependencies {
implementation 'org.apache.httpcomponents:httpclient:4.5.14'
implementation 'commons-logging:commons-logging:1.3.0'
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation "io.micrometer:micrometer-core:${micrometerVersion}"

compileOnly 'com.google.code.findbugs:jsr305:3.0.2'
compileOnly 'org.apache.kafka:kafka-clients:3.7.0'
compileOnly 'com.amazonaws:amazon-kinesis-producer:0.15.10'
compileOnly "org.projectlombok:lombok:${lombokVersion}"
compileOnly "io.micrometer:micrometer-registry-statsd:${micrometerVersion}"
annotationProcessor "org.projectlombok:lombok:${lombokVersion}"

codeGenerator project(':generator')
Expand All @@ -77,6 +81,7 @@ dependencies {
testImplementation "org.mockito:mockito-inline:${mockitoVersion}"
testImplementation "org.mockito:mockito-junit-jupiter:${mockitoVersion}"
testImplementation "org.projectlombok:lombok:${lombokVersion}"
testImplementation "io.micrometer:micrometer-registry-statsd:${micrometerVersion}"
testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}"
}

Expand Down
21 changes: 18 additions & 3 deletions client/java/src/main/java/io/openlineage/client/Clients.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.openlineage.client;

import io.openlineage.client.circuitBreaker.CircuitBreakerFactory;
import io.openlineage.client.metrics.MetricsResolver;
import io.openlineage.client.transports.NoopTransport;
import io.openlineage.client.transports.Transport;
import io.openlineage.client.transports.TransportFactory;
Expand All @@ -21,12 +22,18 @@ public static OpenLineageClient newClient() {
}

public static OpenLineageClient newClient(ConfigPathProvider configPathProvider) {
String isDisabled = Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED");
if (Boolean.parseBoolean(isDisabled)) {
if (isDisabled()) {
return OpenLineageClient.builder().transport(new NoopTransport()).build();
}
final OpenLineageYaml openLineageYaml =
OpenLineageClientUtils.loadOpenLineageYaml(configPathProvider);
return newClient(openLineageYaml);
}

public static OpenLineageClient newClient(OpenLineageYaml openLineageYaml) {
if (isDisabled()) {
return OpenLineageClient.builder().transport(new NoopTransport()).build();
}
final TransportFactory factory = new TransportFactory(openLineageYaml.getTransportConfig());
final Transport transport = factory.build();
// ...
Expand All @@ -37,9 +44,17 @@ public static OpenLineageClient newClient(ConfigPathProvider configPathProvider)
}

Optional.ofNullable(openLineageYaml.getCircuitBreaker())
.map(config -> new CircuitBreakerFactory(config))
.map(CircuitBreakerFactory::new)
.ifPresent(f -> builder.circuitBreaker(f.build()));

Optional.ofNullable(openLineageYaml.getMetricsConfig())
.map(MetricsResolver::addMeterRegistryFromConfig)
.ifPresent(builder::meterRegistry);
return builder.transport(transport).build();
}

private static boolean isDisabled() {
String disabled = Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED");
return (Boolean.parseBoolean(disabled));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@

package io.openlineage.client;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.openlineage.client.circuitBreaker.CircuitBreaker;
import io.openlineage.client.metrics.MetricsResolver;
import io.openlineage.client.transports.ConsoleTransport;
import io.openlineage.client.transports.Transport;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -18,8 +25,14 @@
public final class OpenLineageClient {
final Transport transport;
final Optional<CircuitBreaker> circuitBreaker;
final MeterRegistry meterRegistry;
final String[] disabledFacets;

Counter emitStart;
Counter emitComplete;
AtomicInteger engagedCircuitBreaker;
Timer emitTime;

/** Creates a new {@code OpenLineageClient} object. */
public OpenLineageClient() {
this(new ConsoleTransport());
Expand All @@ -30,15 +43,23 @@ public OpenLineageClient(@NonNull final Transport transport) {
}

public OpenLineageClient(@NonNull final Transport transport, String... disabledFacets) {
this(transport, null, disabledFacets);
this(transport, null, null, disabledFacets);
}

public OpenLineageClient(
@NonNull final Transport transport, CircuitBreaker circuitBreaker, String... disabledFacets) {
@NonNull final Transport transport,
CircuitBreaker circuitBreaker,
MeterRegistry meterRegistry,
String... disabledFacets) {
this.transport = transport;
this.disabledFacets = Arrays.copyOf(disabledFacets, disabledFacets.length);
this.circuitBreaker = Optional.ofNullable(circuitBreaker);
if (meterRegistry == null) {
meterRegistry = MetricsResolver.getMeterRegistry();
}
this.meterRegistry = meterRegistry;

initializeMetrics();
OpenLineageClientUtils.configureObjectMapper(disabledFacets);
}

Expand All @@ -54,10 +75,15 @@ public void emit(@NonNull OpenLineage.RunEvent runEvent) {
"OpenLineageClient will emit lineage event: {}", OpenLineageClientUtils.toJson(runEvent));
}
if (circuitBreaker.isPresent() && circuitBreaker.get().currentState().isClosed()) {
engagedCircuitBreaker.set(1);
log.warn("OpenLineageClient disabled with circuit breaker");
return;
} else {
engagedCircuitBreaker.set(0);
}
transport.emit(runEvent);
emitStart.increment();
emitTime.record(() -> transport.emit(runEvent));
emitComplete.increment();
}

/**
Expand All @@ -73,10 +99,15 @@ public void emit(@NonNull OpenLineage.DatasetEvent datasetEvent) {
OpenLineageClientUtils.toJson(datasetEvent));
}
if (circuitBreaker.isPresent() && circuitBreaker.get().currentState().isClosed()) {
engagedCircuitBreaker.set(1);
log.warn("OpenLineageClient disabled with circuit breaker");
return;
} else {
engagedCircuitBreaker.set(0);
}
transport.emit(OpenLineageClientUtils.toJson(datasetEvent));
emitStart.increment();
emitTime.record(() -> transport.emit(OpenLineageClientUtils.toJson(datasetEvent)));
emitComplete.increment();
}

/**
Expand All @@ -91,10 +122,33 @@ public void emit(@NonNull OpenLineage.JobEvent jobEvent) {
"OpenLineageClient will emit lineage event: {}", OpenLineageClientUtils.toJson(jobEvent));
}
if (circuitBreaker.isPresent() && circuitBreaker.get().currentState().isClosed()) {
engagedCircuitBreaker.set(1);
log.warn("OpenLineageClient disabled with circuit breaker");
return;
} else {
engagedCircuitBreaker.set(0);
}
transport.emit(OpenLineageClientUtils.toJson(jobEvent));
emitStart.increment();
emitTime.record(() -> transport.emit(OpenLineageClientUtils.toJson(jobEvent)));
emitComplete.increment();
}

public void initializeMetrics() {
emitStart =
this.meterRegistry.counter(
"openlineage.emit.start", "openlineage.transport", transport.getClass().getName());
emitComplete =
this.meterRegistry.counter(
"openlineage.emit.complete", "openlineage.transport", transport.getClass().getName());
engagedCircuitBreaker =
this.meterRegistry.gauge(
"openlineage.circuitbreaker.engaged",
Collections.singletonList(
Tag.of("openlineage.circuitbreaker", circuitBreaker.getClass().getName())),
new AtomicInteger(0));
emitTime =
this.meterRegistry.timer(
"openlineage.emit.time", "openlineage.transport", transport.getClass().getName());
}

/**
Expand All @@ -121,6 +175,7 @@ public static final class Builder {
private Transport transport;
private String[] disabledFacets;
private CircuitBreaker circuitBreaker;
private MeterRegistry meterRegistry;

private Builder() {
this.transport = DEFAULT_TRANSPORT;
Expand All @@ -137,6 +192,11 @@ public Builder circuitBreaker(@NonNull CircuitBreaker circuitBreaker) {
return this;
}

public Builder meterRegistry(@NonNull MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
return this;
}

public Builder disableFacets(@NonNull String... disabledFacets) {
this.disabledFacets = Arrays.copyOf(disabledFacets, disabledFacets.length);
return this;
Expand All @@ -147,7 +207,7 @@ public Builder disableFacets(@NonNull String... disabledFacets) {
* OpenLineageClient.Builder}.
*/
public OpenLineageClient build() {
return new OpenLineageClient(transport, circuitBreaker, disabledFacets);
return new OpenLineageClient(transport, circuitBreaker, meterRegistry, disabledFacets);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ private OpenLineageClientUtils() {}

private static final ObjectMapper MAPPER = newObjectMapper();

private static final ObjectMapper YML = new ObjectMapper(new YAMLFactory());
private static final ObjectMapper JSON = new ObjectMapper(new JsonFactory());
private static final ObjectMapper YML = newObjectMapper(new YAMLFactory());
private static final ObjectMapper JSON = newObjectMapper();

@JsonFilter("disabledFacets")
public class DisabledFacetsMixin {}
Expand All @@ -58,7 +58,17 @@ public class DisabledFacetsMixin {}
* @return A configured {@link ObjectMapper} instance.
*/
public static ObjectMapper newObjectMapper() {
final ObjectMapper mapper = new ObjectMapper();
return newObjectMapper(new JsonFactory());
}

/**
* Creates a new {@link ObjectMapper} instance configured with modules for JDK8 and JavaTime,
* including settings to ignore unknown properties and to not write dates as timestamps.
*
* @return A configured {@link ObjectMapper} instance.
*/
public static ObjectMapper newObjectMapper(JsonFactory jsonFactory) {
final ObjectMapper mapper = new ObjectMapper(jsonFactory);
mapper.registerModule(new Jdk8Module());
mapper.registerModule(new JavaTimeModule());
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,27 @@

package io.openlineage.client;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.openlineage.client.circuitBreaker.CircuitBreakerConfig;
import io.openlineage.client.transports.FacetsConfig;
import io.openlineage.client.transports.TransportConfig;
import java.util.Map;
import lombok.Getter;

/** Configuration for {@link OpenLineageClient}. */
@JsonIgnoreProperties
@Getter
public class OpenLineageYaml {
@Getter
@JsonProperty("transport")
private TransportConfig transportConfig;

@Getter
@JsonProperty("facets")
private FacetsConfig facetsConfig;

@Getter
@JsonProperty("circuitBreaker")
private CircuitBreakerConfig circuitBreaker;

@JsonProperty("metrics")
private Map<String, Object> metricsConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

package io.openlineage.client.circuitBreaker;

import static io.openlineage.client.circuitBreaker.RuntimeUtils.freeMemory;
import static io.openlineage.client.circuitBreaker.RuntimeUtils.getGarbageCollectorMXBeans;
import static io.openlineage.client.circuitBreaker.RuntimeUtils.maxMemory;
import static io.openlineage.client.circuitBreaker.RuntimeUtils.totalMemory;
import static io.openlineage.client.utils.RuntimeUtils.freeMemory;
import static io.openlineage.client.utils.RuntimeUtils.getGarbageCollectorMXBeans;
import static io.openlineage.client.utils.RuntimeUtils.maxMemory;
import static io.openlineage.client.utils.RuntimeUtils.totalMemory;

import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

package io.openlineage.client.circuitBreaker;

import static io.openlineage.client.circuitBreaker.RuntimeUtils.freeMemory;
import static io.openlineage.client.circuitBreaker.RuntimeUtils.maxMemory;
import static io.openlineage.client.circuitBreaker.RuntimeUtils.totalMemory;
import static io.openlineage.client.utils.RuntimeUtils.freeMemory;
import static io.openlineage.client.utils.RuntimeUtils.maxMemory;
import static io.openlineage.client.utils.RuntimeUtils.totalMemory;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.metrics;

import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import java.util.List;
import java.util.Map;

/**
* A builder class that provides implementations to build composite meter registries. This class
* implements the MetricsBuilder interface with CompositeMeterRegistry as its type.
*
* <p>CompositeMeterRegistry is a type of MeterRegistry, that encapsulates two or more meter
* registries into one, and manages unified functionalities across all registries.
*/
public class CompositeMetricsBuilder implements MetricsBuilder<CompositeMeterRegistry> {

/**
* Constructs a CompositeMeterRegistry from a given map of configuration options. The "registries"
* key in the map is expected to provide a list of meter registry configurations. Each
* configuration is parsed and, if parsed successfully, added to the CompositeMeterRegistry.
*
* @param config The map containing the configurations for composite meter registry.
* @return A CompositeMeterRegistry built from the provided configuration. An empty
* CompositeMeterRegistry is returned if the the map doesn't contain a list of configurations
* extended by registries.
*/
@Override
public CompositeMeterRegistry registry(Map<String, Object> config) {
CompositeMeterRegistry meterRegistry = new CompositeMeterRegistry();
Object registries = config.get("registries");
if (!(registries instanceof List) || ((List<?>) registries).isEmpty()) {
return meterRegistry;
}
for (Object registryConfig : (List<Object>) registries) {
if (registryConfig instanceof Map || !((Map<?, ?>) registryConfig).isEmpty()) {
MetricsResolver.parseMeterRegistryConfig((Map<String, Object>) registryConfig)
.ifPresent(meterRegistry::add);
}
}
return meterRegistry;
}

@Override
public String type() {
return "composite";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.metrics;

import io.micrometer.core.instrument.MeterRegistry;
import java.util.Map;

/**
* MetricsBuilder is an interface that defines methods to build different types of meter registries
* from OpenLineage config.
*/
public interface MetricsBuilder<T extends MeterRegistry> {
T registry(Map<String, Object> config);

String type();
}

0 comments on commit f673013

Please sign in to comment.