Skip to content

Commit

Permalink
metrics, spark: add support for Metrics mechanism to Spark integration
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Apr 4, 2024
1 parent 9cb16d0 commit 6d93dfe
Show file tree
Hide file tree
Showing 64 changed files with 490 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public static long maxMemory() {
return Runtime.getRuntime().maxMemory();
}

public static double getMemoryFractionUsage() {
return (100.0 * ((double) totalMemory())) / ((double) maxMemory());
}

public static List<GarbageCollectorMXBean> getGarbageCollectorMXBeans() {
return ManagementFactory.getGarbageCollectorMXBeans();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void testSimpleMetricsConfigFromYaml() {
@Test
void testCompositeMetricsConfigFromYaml() {
OpenLineageClient client =
Clients.newClient(new TestConfigPathProvider("config/metrics-complex.yaml"));
Clients.newClient(new TestConfigPathProvider("config/metrics-composite.yaml"));
CompositeMeterRegistry meterRegistry = (CompositeMeterRegistry) client.meterRegistry;
assertThat(meterRegistry.getRegistries().iterator().next())
.isInstanceOfSatisfying(
Expand Down
3 changes: 3 additions & 0 deletions integration/spark/app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ext {
mockitoVersion = '4.11.0'
postgresqlVersion = '42.7.1'
testcontainersVersion = '1.19.3'
micrometerVersion = '1.12.4'
}

// This workaround is needed because the version of Snappy that Spark 2.4.x runs with,
Expand Down Expand Up @@ -82,6 +83,7 @@ dependencies {
implementation(project(path: ":spark34", configuration: activeRuntimeElementsConfiguration))
implementation(project(path: ":spark35", configuration: activeRuntimeElementsConfiguration))
implementation("org.apache.httpcomponents.client5:httpclient5:5.3")
implementation("io.micrometer:micrometer-core:${micrometerVersion}")

compileOnly("org.apache.spark:spark-core_${scala}:${spark}")
compileOnly("org.apache.spark:spark-sql_${scala}:${spark}")
Expand Down Expand Up @@ -120,6 +122,7 @@ dependencies {
exclude group: 'com.fasterxml.jackson.dataformat'
exclude group: 'org.mock-server.mockserver-client-java'
}
testImplementation("io.micrometer:micrometer-registry-statsd:${micrometerVersion}")

testRuntimeOnly("org.slf4j:slf4j-api:1.7.36")
testRuntimeOnly(platform("org.apache.logging.log4j:log4j-bom:2.22.1"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"eventType": "COMPLETE",
"run": {
"facets": {
"debug": {
"metrics": {
"metrics": [{
"name" : "openlineage.spark.event.sql.start",
"tags" : [{
"key": "openlineage.spark.disabled.facets"
}, {
"key" : "openlineage.spark.integration.version"
}, {
"key" : "openlineage.spark.version"
}]
}, {
"name" : "openlineage.spark.unknownFacet.time"
}]
}
}
}
},
"job": {
"namespace": "testEmitMetrics",
"name": "open_lineage_integration_emit_metrics.execute_create_table_command.emit_test_create_table_emit_test"
},
"inputs": [],
"outputs": [
{
"namespace": "file",
"name": "/tmp/emit_test/create_table_emit_test"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,7 @@ public static OpenLineageYaml extractOpenlineageConfFromSparkConf(SparkConf conf
}

private static List<Tuple2<String, String>> filterProperties(SparkConf conf) {
return Arrays.stream(conf.getAllWithPrefix("spark.openlineage."))
.filter(
e ->
e._1.startsWith("transport")
|| e._1.startsWith("facets")
|| e._1.startsWith("circuitBreaker"))
.collect(Collectors.toList());
return Arrays.stream(conf.getAllWithPrefix("spark.openlineage.")).collect(Collectors.toList());
}

private static List<String> getJsonPath(String keyPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@
import static io.openlineage.spark.agent.util.ScalaConversionUtils.asJavaOptional;
import static io.openlineage.spark.agent.util.TimeUtils.toZonedTime;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.openlineage.client.Environment;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageYaml;
import io.openlineage.client.circuitBreaker.CircuitBreaker;
import io.openlineage.client.circuitBreaker.CircuitBreakerFactory;
import io.openlineage.client.circuitBreaker.NoOpCircuitBreaker;
import io.openlineage.client.metrics.MicrometerProvider;
import io.openlineage.client.utils.RuntimeUtils;
import io.openlineage.spark.agent.lifecycle.ContextFactory;
import io.openlineage.spark.agent.lifecycle.ExecutionContext;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
Expand Down Expand Up @@ -69,13 +76,16 @@ public class OpenLineageSparkListener extends org.apache.spark.scheduler.SparkLi

private static CircuitBreaker circuitBreaker = new NoOpCircuitBreaker();

String sparkVersion = package$.MODULE$.SPARK_VERSION();
private static MeterRegistry meterRegistry;

private static String sparkVersion = package$.MODULE$.SPARK_VERSION();

private static final boolean isDisabled = checkIfDisabled();

/** called by the tests */
public static void init(ContextFactory contextFactory) {
OpenLineageSparkListener.contextFactory = contextFactory;
meterRegistry = contextFactory.getMeterRegistry();
clear();
}

Expand All @@ -96,17 +106,20 @@ public void onOtherEvent(SparkListenerEvent event) {
private static void sparkSQLExecStart(SparkListenerSQLExecutionStart startEvent) {
getSparkSQLExecutionContext(startEvent.executionId())
.ifPresent(
context ->
circuitBreaker.run(
() -> {
context.start(startEvent);
return null;
}));
context -> {
meterRegistry.counter("openlineage.spark.event.sql.start").increment();
circuitBreaker.run(
() -> {
context.start(startEvent);
return null;
});
});
}

/** called by the SparkListener when a spark-sql (Dataset api) execution ends */
private static void sparkSQLExecEnd(SparkListenerSQLExecutionEnd endEvent) {
ExecutionContext context = sparkSqlExecutionRegistry.remove(endEvent.executionId());
meterRegistry.counter("openlineage.spark.event.sql.end").increment();
if (context != null) {
circuitBreaker.run(
() -> {
Expand All @@ -133,6 +146,7 @@ public void onJobStart(SparkListenerJobStart jobStart) {
return;
}
initializeContextFactoryIfNotInitialized();
meterRegistry.counter("openlineage.spark.event.job.start").increment();
Optional<ActiveJob> activeJob =
asJavaOptional(
SparkSession.getDefaultSession()
Expand Down Expand Up @@ -192,6 +206,7 @@ public void onJobEnd(SparkListenerJobEnd jobEnd) {
return;
}
ExecutionContext context = rddExecutionRegistry.remove(jobEnd.jobId());
meterRegistry.counter("openlineage.spark.event.job.end").increment();
circuitBreaker.run(
() -> {
if (context != null) {
Expand Down Expand Up @@ -257,6 +272,10 @@ private static void clear() {

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
meterRegistry.counter("openlineage.spark.event.app.end").increment();
meterRegistry
.counter("openlineage.spark.event.app.end.memoryusage")
.increment(RuntimeUtils.getMemoryFractionUsage());
circuitBreaker.run(
() -> {
emitApplicationEndEvent(applicationEnd.time());
Expand All @@ -271,14 +290,13 @@ public static void close() {
clear();
}

/**
* Check the {@link SparkConf} for open lineage configuration.
*
* @param applicationStart
*/
@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
initializeContextFactoryIfNotInitialized(applicationStart.appName());
meterRegistry.counter("openlineage.spark.event.app.start").increment();
meterRegistry
.counter("openlineage.spark.event.app.start.memoryusage")
.increment(RuntimeUtils.getMemoryFractionUsage());
circuitBreaker.run(
() -> {
emitApplicationStartEvent(applicationStart.time());
Expand Down Expand Up @@ -314,14 +332,45 @@ private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, Strin
}
try {
ArgumentParser args = ArgumentParser.parse(sparkConf);
contextFactory = new ContextFactory(new EventEmitter(args, appName));
// Needs to be done before initializing OpenLineageClient
initializeMetrics(args.getOpenLineageYaml());
contextFactory = new ContextFactory(new EventEmitter(args, appName), meterRegistry);
circuitBreaker =
new CircuitBreakerFactory(args.getOpenLineageYaml().getCircuitBreaker()).build();
} catch (URISyntaxException e) {
log.error("Unable to parse open lineage endpoint. Lineage events will not be collected", e);
}
}

private static void initializeMetrics(OpenLineageYaml openLineageYaml) {
meterRegistry =
MicrometerProvider.addMeterRegistryFromConfig(openLineageYaml.getMetricsConfig());
String disabledFacets;
if (openLineageYaml.getFacetsConfig() != null
&& openLineageYaml.getFacetsConfig().getDisabledFacets() != null) {
disabledFacets = String.join(";", openLineageYaml.getFacetsConfig().getDisabledFacets());
} else {
disabledFacets = "";
}
meterRegistry
.config()
.commonTags(
Tags.of(
Tag.of("openlineage.spark.integration.version", Versions.getVersion()),
Tag.of("openlineage.spark.version", sparkVersion),
Tag.of("openlineage.spark.disabled.facets", disabledFacets)));
((CompositeMeterRegistry) meterRegistry)
.getRegistries()
.forEach(
r ->
r.config()
.commonTags(
Tags.of(
Tag.of("openlineage.spark.integration.version", Versions.getVersion()),
Tag.of("openlineage.spark.version", sparkVersion),
Tag.of("openlineage.spark.disabled.facets", disabledFacets))));
}

private void emitApplicationEvent(Long time, OpenLineage.RunEvent.EventType eventType) {
OpenLineage openLineage = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
EventEmitter emitter = contextFactory.openLineageEventEmitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.openlineage.spark.agent.lifecycle;

import io.micrometer.core.instrument.MeterRegistry;
import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.Versions;
Expand All @@ -14,6 +15,7 @@
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.Optional;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.spark.sql.SparkSession;
Expand All @@ -25,10 +27,12 @@
public class ContextFactory {

public final EventEmitter openLineageEventEmitter;
@Getter private final MeterRegistry meterRegistry;
private final OpenLineageEventHandlerFactory handlerFactory;

public ContextFactory(EventEmitter openLineageEventEmitter) {
public ContextFactory(EventEmitter openLineageEventEmitter, MeterRegistry meterRegistry) {
this.openLineageEventEmitter = openLineageEventEmitter;
this.meterRegistry = meterRegistry;
handlerFactory = new InternalEventHandlerFactory();
}

Expand All @@ -54,6 +58,7 @@ public Optional<ExecutionContext> createSparkSQLExecutionContext(long executionI
.getCustomEnvironmentVariables()
.orElse(Collections.emptyList()))
.vendors(Vendors.getVendors())
.meterRegistry(meterRegistry)
.build();
OpenLineageRunEventBuilder runEventBuilder =
new OpenLineageRunEventBuilder(olContext, handlerFactory);
Expand All @@ -79,6 +84,7 @@ public Optional<ExecutionContext> createSparkSQLExecutionContext(
.getCustomEnvironmentVariables()
.orElse(Collections.emptyList()))
.vendors(Vendors.getVendors())
.meterRegistry(meterRegistry)
.build();
OpenLineageRunEventBuilder runEventBuilder =
new OpenLineageRunEventBuilder(olContext, handlerFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.openlineage.spark.agent;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
Expand All @@ -13,6 +14,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.openlineage.client.Environment;
import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.filters.EventFilterUtils;
Expand Down Expand Up @@ -84,6 +86,7 @@ void setup() {
.sparkContext(sparkSession.sparkContext())
.openLineage(new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI))
.queryExecution(qe)
.meterRegistry(new SimpleMeterRegistry())
.build();
}

Expand Down Expand Up @@ -113,7 +116,7 @@ void testSqlEventWithJobEventEmitsOnce() {
.getOutputDatasetQueryPlanVisitors()
.add(new InsertIntoHadoopFsRelationVisitor(olContext));
ExecutionContext executionContext =
new StaticExecutionContextFactory(emitter)
new StaticExecutionContextFactory(emitter, new SimpleMeterRegistry())
.createSparkSQLExecutionContext(1L, emitter, qe, olContext);

SparkListenerSQLExecutionStart event = mock(SparkListenerSQLExecutionStart.class);
Expand All @@ -138,11 +141,12 @@ void testSqlEventWithJobEventEmitsOnce() {
}

@Test
void testOpenlineageDisableDisablesExecution() throws URISyntaxException {
void testOpenLineageDisableDisablesExecution() throws URISyntaxException {
try (MockedStatic mocked = mockStatic(Environment.class)) {
when(Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED")).thenReturn("true");

ContextFactory contextFactory = mock(ContextFactory.class);
when(contextFactory.getMeterRegistry()).thenReturn(new SimpleMeterRegistry());

OpenLineageSparkListener.init(contextFactory);
OpenLineageSparkListener listener = new OpenLineageSparkListener();
Expand Down Expand Up @@ -187,7 +191,8 @@ void testSparkSQLEndGetsQueryExecutionFromEvent() {
.getOutputDatasetQueryPlanVisitors()
.add(new InsertIntoHadoopFsRelationVisitor(olContext));
OpenLineageSparkListener listener = new OpenLineageSparkListener();
OpenLineageSparkListener.init(new StaticExecutionContextFactory(emitter));
OpenLineageSparkListener.init(
new StaticExecutionContextFactory(emitter, new SimpleMeterRegistry()));

SparkListenerSQLExecutionEnd event = mock(SparkListenerSQLExecutionEnd.class);
try (MockedStatic<EventFilterUtils> utils = mockStatic(EventFilterUtils.class)) {
Expand All @@ -209,8 +214,9 @@ void testSparkSQLEndGetsQueryExecutionFromEvent() {

@Test
void testApplicationStartEvent() {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
OpenLineageSparkListener listener = new OpenLineageSparkListener();
OpenLineageSparkListener.init(new StaticExecutionContextFactory(emitter));
OpenLineageSparkListener.init(new StaticExecutionContextFactory(emitter, meterRegistry));
SparkListenerApplicationStart event = mock(SparkListenerApplicationStart.class);

listener.onApplicationStart(event);
Expand All @@ -224,7 +230,8 @@ void testApplicationStartEvent() {
@Test
void testApplicationEndEvent() {
OpenLineageSparkListener listener = new OpenLineageSparkListener();
OpenLineageSparkListener.init(new StaticExecutionContextFactory(emitter));
OpenLineageSparkListener.init(
new StaticExecutionContextFactory(emitter, new SimpleMeterRegistry()));
SparkListenerApplicationEnd event = mock(SparkListenerApplicationEnd.class);

listener.onApplicationEnd(event);
Expand All @@ -234,4 +241,19 @@ void testApplicationEndEvent() {

verify(emitter, times(1)).emit(lineageEvent.capture());
}

@Test
void testCheckSparkApplicationEventsAreEmitted() {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
OpenLineageSparkListener listener = new OpenLineageSparkListener();
OpenLineageSparkListener.init(new StaticExecutionContextFactory(emitter, meterRegistry));
SparkListenerApplicationStart startEvent = mock(SparkListenerApplicationStart.class);
SparkListenerApplicationEnd endEvent = mock(SparkListenerApplicationEnd.class);

listener.onApplicationStart(startEvent);
listener.onApplicationEnd(endEvent);

assertThat(meterRegistry.counter("openlineage.spark.event.app.start").count()).isEqualTo(1.0);
assertThat(meterRegistry.counter("openlineage.spark.event.app.end").count()).isEqualTo(1.0);
}
}

0 comments on commit 6d93dfe

Please sign in to comment.