Skip to content

Commit

Permalink
[SPARK] write scala integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Oct 23, 2023
1 parent 782840b commit 483d364
Show file tree
Hide file tree
Showing 15 changed files with 531 additions and 56 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,10 @@

## [Unreleased](https://github.com/OpenLineage/OpenLineage/compare/1.4.1...HEAD)

### Added
* **Spark: support `rdd` and `toDF` operations available in Spark Scala API.** [`#2188`](https://github.com/OpenLineage/OpenLineage/pull/2188) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*This PR includes the first Scala integration test, fixes `ExternalRddVisitor` and adds support for extracting inputs from `MapPartitionsRDD` and `ParallelCollectionRDD` plan nodes.*

### Fixed
* **Spark: unify dataset naming for RDD jobs and Spark SQL.** [`2181`](https://github.com/OpenLineage/OpenLineage/pull/2181) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Use the same mechanism for RDD jobs to extract dataset identifier as used for Spark SQL.*
Expand Down
6 changes: 4 additions & 2 deletions integration/spark/app/integrations/sparkrdd/1.json
Expand Up @@ -17,11 +17,13 @@
},
"job" : {
"namespace" : "ns_name",
"name" : "test_rdd.shuffled_map_partitions_hadoop"
"name" : "test_rdd.map_partitions_shuffled_map_partitions_hadoop"
},
"inputs" : [ {
"namespace" : "gs.bucket",
"name" : "gs://bucket/data.txt"
} ],
"outputs" : [ ]
"outputs" : [ {
"namespace" : "file"
}]
}
6 changes: 4 additions & 2 deletions integration/spark/app/integrations/sparkrdd/2.json
Expand Up @@ -18,11 +18,13 @@
},
"job" : {
"namespace" : "ns_name",
"name" : "test_rdd.shuffled_map_partitions_hadoop"
"name" : "test_rdd.map_partitions_shuffled_map_partitions_hadoop"
},
"inputs" : [ {
"namespace" : "gs.bucket",
"name" : "gs://bucket/data.txt"
} ],
"outputs" : [ ]
"outputs" : [ {
"namespace" : "file"
}]
}
Expand Up @@ -99,9 +99,11 @@ public void end(SparkListenerStageCompleted stageCompleted) {}
@Override
@SuppressWarnings("PMD") // f.setAccessible(true);
public void setActiveJob(ActiveJob activeJob) {
log.debug("setActiveJob within RddExecutionContext {}", activeJob);
RDD<?> finalRDD = activeJob.finalStage().rdd();
this.jobSuffix = nameRDD(finalRDD);
Set<RDD<?>> rdds = Rdds.flattenRDDs(finalRDD);
log.debug("flattenRDDs {}", rdds);
this.inputs = findInputs(rdds);
Configuration jc = new JobConf();
if (activeJob.finalStage() instanceof ResultStage) {
Expand Down Expand Up @@ -197,17 +199,23 @@ static String nameRDD(RDD<?> rdd) {
@Override
public void start(SparkListenerSQLExecutionStart sqlStart) {
// do nothing
log.debug("start SparkListenerSQLExecutionStart {}", sqlStart);
}

@Override
public void end(SparkListenerSQLExecutionEnd sqlEnd) {
// do nothing
log.debug("start SparkListenerSQLExecutionEnd {}", sqlEnd);
}

@Override
public void start(SparkListenerJobStart jobStart) {
if (inputs.isEmpty() && outputs.isEmpty()) {
log.info("RDDs are empty: skipping sending OpenLineage event");
log.debug("start SparkListenerJobStart {}", jobStart);
if (outputs.isEmpty()) {
// Oftentimes SparkListener is triggered for actions which do not contain any meaningful
// lineage data and are useless in the context of lineage graph. We assume this occurs
// for RDD operations which have no output dataset
log.info("Output RDDs are empty: skipping sending OpenLineage event");
return;
}
OpenLineage ol = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
Expand All @@ -227,8 +235,12 @@ public void start(SparkListenerJobStart jobStart) {

@Override
public void end(SparkListenerJobEnd jobEnd) {
if (inputs.isEmpty() && outputs.isEmpty() && !(jobEnd.jobResult() instanceof JobFailed)) {
log.info("RDDs are empty: skipping sending OpenLineage event");
log.debug("end SparkListenerJobEnd {}", jobEnd);
if (outputs.isEmpty() && !(jobEnd.jobResult() instanceof JobFailed)) {
// Oftentimes SparkListener is triggered for actions which do not contain any meaningful
// lineage data and are useless in the context of lineage graph. We assume this occurs
// for RDD operations which have no output dataset
log.info("Output RDDs are empty: skipping sending OpenLineage event");
return;
}
OpenLineage ol = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
Expand Down Expand Up @@ -346,11 +358,12 @@ protected List<URI> findOutputs(RDD<?> rdd, Configuration config) {
if (outputPath != null) {
return Collections.singletonList(outputPath.toUri());
}
log.debug("Output path is null");
return Collections.emptyList();
}

protected List<URI> findInputs(Set<RDD<?>> rdds) {
log.debug("findInputs within RddExecutionContext");
log.debug("find Inputs within RddExecutionContext {}", rdds);
return PlanUtils.findRDDPaths(rdds.stream().collect(Collectors.toList())).stream()
.map(path -> path.toUri())
.collect(Collectors.toList());
Expand All @@ -373,10 +386,12 @@ protected static Path getOutputPath(RDD<?> rdd, Configuration config) {
} else {
jc = new JobConf(config);
}
log.debug("JobConf {}", jc);
path = org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(jc);
if (path == null) {
try {
// old fashioned mapreduce api
log.debug("Path is null, trying to use old fashioned mapreduce api");
path = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath(new Job(jc));
} catch (IOException exception) {
exception.printStackTrace(System.out);
Expand Down
Expand Up @@ -164,7 +164,7 @@ static GenericContainer<?> makePysparkContainerWithDefaultConf(
network, waitMessage, mockServerContainer, sparkSubmit.toArray(new String[0]));
}

static void addSparkConfig(List command, String value) {
public static void addSparkConfig(List command, String value) {
command.add("--conf");
command.add(value);
}
Expand Down Expand Up @@ -201,7 +201,7 @@ static void runPysparkContainerWithDefaultConf(
}

@SuppressWarnings("PMD")
private static void consumeOutput(org.testcontainers.containers.output.OutputFrame of) {
static void consumeOutput(org.testcontainers.containers.output.OutputFrame of) {
try {
switch (of.getType()) {
case STDOUT:
Expand Down
@@ -0,0 +1,173 @@
/*
/* Copyright 2018-2023 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.spark.agent;

import static io.openlineage.spark.agent.SparkContainerUtils.addSparkConfig;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockserver.model.HttpRequest.request;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineage.RunEvent;
import io.openlineage.client.OpenLineageClientUtils;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.mockserver.client.MockServerClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MockServerContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

/**
* This class runs integration test for a Spark job written in scala. All the other tests
* run python spark scripts instead. Having a Scala job allows us to test `toDF`/`rdd`
* methods which are slightly different for Spark jobs written in Scala.
*
* The integration test relies on bitnami/spark docker image. It requires `spark.version` to
* specify which Spark version should be tested. It also requires `openlineage.spark.jar`
* system property which is set in `build.gradle`.
*
* @See https://hub.docker.com/r/bitnami/spark/
*/
@Tag("integration-test")
@Testcontainers
@Slf4j
public class SparkScalaContainerTest {

private static final Network network = Network.newNetwork();

@Container
private static final MockServerContainer openLineageClientMockContainer =
SparkContainerUtils.makeMockServerContainer(network);

private static GenericContainer<?> spark;
private static MockServerClient mockServerClient;
private static final Logger logger = LoggerFactory.getLogger(SparkContainerIntegrationTest.class);

@BeforeAll
public static void setup() {
mockServerClient =
new MockServerClient(
openLineageClientMockContainer.getHost(),
openLineageClientMockContainer.getServerPort());
mockServerClient
.when(request("/api/v1/lineage"))
.respond(org.mockserver.model.HttpResponse.response().withStatusCode(201));

Awaitility.await().until(openLineageClientMockContainer::isRunning);
}

@AfterEach
public void cleanupSpark() {
mockServerClient.reset();
try {
if (spark != null) spark.stop();
} catch (Exception e) {
logger.error("Unable to shut down pyspark container", e);
}
}

@AfterAll
public static void tearDown() {
try {
openLineageClientMockContainer.stop();
} catch (Exception e) {
logger.error("Unable to shut down openlineage client container", e);
}
network.close();
}

private GenericContainer createSparkContainer(String script) {
return new GenericContainer<>(
DockerImageName.parse("bitnami/spark:" + System.getProperty("spark.version")))
.withNetwork(network)
.withNetworkAliases("spark")
.withFileSystemBind("src/test/resources/spark_scala_scripts", "/opt/spark_scala_scripts")
.withFileSystemBind("src/test/resources/log4j.properties", "/opt/log4j.properties")
.withFileSystemBind("build/libs", "/opt/libs")
.withLogConsumer(SparkContainerUtils::consumeOutput)
.waitingFor(Wait.forLogMessage(".*scala> :quit.*", 1))
.withStartupTimeout(Duration.of(10, ChronoUnit.MINUTES))
.dependsOn(openLineageClientMockContainer)
.withReuse(true)
.withCommand(
sparkShellCommandForScript("/opt/spark_scala_scripts/" + script)
.toArray(new String[] {}));
}

private List<String> sparkShellCommandForScript(String script) {
List<String> command = new ArrayList<>();
addSparkConfig(command, "spark.openlineage.transport.type=http");
addSparkConfig(
command,
"spark.openlineage.transport.url=http://openlineageclient:1080/api/v1/namespaces/scala-test");
addSparkConfig(command, "spark.openlineage.debugFacet=enabled");
addSparkConfig(command, "spark.extraListeners=" + OpenLineageSparkListener.class.getName());
addSparkConfig(command, "spark.sql.warehouse.dir=/tmp/warehouse");
addSparkConfig(command, "spark.sql.shuffle.partitions=1");
addSparkConfig(command, "spark.driver.extraJavaOptions=-Dderby.system.home=/tmp/derby");
addSparkConfig(command, "spark.sql.warehouse.dir=/tmp/warehouse");
addSparkConfig(command, "spark.jars.ivy=/tmp/.ivy2/");
addSparkConfig(command, "spark.openlineage.facets.disabled=");
addSparkConfig(
command, "spark.driver.extraJavaOptions=-Dlog4j.configuration=/opt/log4j.properties");

List<String> sparkShell =
new ArrayList(Arrays.asList("./bin/spark-shell", "--master", "local", "-i", script));
sparkShell.addAll(command);
sparkShell.addAll(
Arrays.asList("--jars", "/opt/libs/" + System.getProperty("openlineage.spark.jar")));

log.info("Running spark-shell command: ", String.join(" ", sparkShell));

return sparkShell;
}

@Test
void testScalaUnionRddToParquet() {
spark = createSparkContainer("rdd_union.scala");
spark.start();

await()
.atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofMillis(500))
.untilAsserted(
() -> {
List<OpenLineage.RunEvent> events =
Arrays.stream(
mockServerClient.retrieveRecordedRequests(
request().withPath("/api/v1/lineage")))
.map(r -> r.getBodyAsString())
.map(event -> OpenLineageClientUtils.runEventFromJson(event))
.collect(Collectors.toList());
RunEvent lastEvent = events.get(events.size() - 1);

assertThat(events).isNotEmpty();
assertThat(lastEvent.getOutputs().get(0))
.hasFieldOrPropertyWithValue("namespace", "file")
.hasFieldOrPropertyWithValue("name", "/tmp/scala-test/rdd_output");

assertThat(lastEvent.getInputs().stream().map(d -> d.getName()))
.contains("/tmp/scala-test/rdd_input1", "/tmp/scala-test/rdd_input2");
});
}
}
Expand Up @@ -20,6 +20,7 @@
import io.openlineage.spark.agent.SparkAgentTestExtension;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -36,6 +37,7 @@
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import scala.Tuple2;
Expand Down Expand Up @@ -95,7 +97,7 @@ class LibraryTest {
// }

@Test
void testRdd(SparkSession spark) throws IOException {
void testRdd(@TempDir Path tmpDir, SparkSession spark) throws IOException {
when(SparkAgentTestExtension.OPEN_LINEAGE_SPARK_CONTEXT.getJobNamespace())
.thenReturn("ns_name");
when(SparkAgentTestExtension.OPEN_LINEAGE_SPARK_CONTEXT.getParentJobName())
Expand All @@ -111,7 +113,7 @@ void testRdd(SparkSession spark) throws IOException {
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey(Integer::sum)
.count();
.saveAsTextFile(tmpDir.toString() + "/output");

sc.stop();

Expand Down
Expand Up @@ -364,7 +364,7 @@ void testInsertIntoDataSourceDirVisitor(@TempDir Path tempDir, SparkSession spar
List<InputDataset> inputs = event.getInputs();
assertEquals(1, inputs.size());
assertEquals(FILE, inputs.get(0).getNamespace());
assertEquals(testFile.toAbsolutePath().getParent().toString(), inputs.get(0).getName());
assertEquals(testFile.toAbsolutePath().toString(), inputs.get(0).getName());
}

@Test
Expand All @@ -383,9 +383,9 @@ void testWithExternalRdd(@TempDir Path tmpDir, SparkSession spark)

ArgumentCaptor<OpenLineage.RunEvent> lineageEvent =
ArgumentCaptor.forClass(OpenLineage.RunEvent.class);
Mockito.verify(SparkAgentTestExtension.OPEN_LINEAGE_SPARK_CONTEXT, times(6))
Mockito.verify(SparkAgentTestExtension.OPEN_LINEAGE_SPARK_CONTEXT, times(4))
.emit(lineageEvent.capture());
OpenLineage.RunEvent completeEvent = lineageEvent.getAllValues().get(5);
OpenLineage.RunEvent completeEvent = lineageEvent.getAllValues().get(3);
assertThat(completeEvent).hasFieldOrPropertyWithValue(EVENT_TYPE, RunEvent.EventType.COMPLETE);
assertThat(completeEvent.getInputs())
.first()
Expand Down Expand Up @@ -481,9 +481,13 @@ void testCreateDataSourceTableAsSelect(@TempDir Path tmpDir, SparkSession spark)

ArgumentCaptor<OpenLineage.RunEvent> lineageEvent =
ArgumentCaptor.forClass(OpenLineage.RunEvent.class);
Mockito.verify(SparkAgentTestExtension.OPEN_LINEAGE_SPARK_CONTEXT, atLeast(6))
Mockito.verify(SparkAgentTestExtension.OPEN_LINEAGE_SPARK_CONTEXT, atLeast(4))
.emit(lineageEvent.capture());
OpenLineage.RunEvent event = lineageEvent.getAllValues().get(5);
OpenLineage.RunEvent event =
lineageEvent.getAllValues().stream()
.filter(ev -> ev.getInputs() != null && !ev.getInputs().isEmpty())
.findFirst()
.get();

assertThat(lineageEvent.getAllValues().get(lineageEvent.getAllValues().size() - 1))
.hasFieldOrPropertyWithValue(EVENT_TYPE, RunEvent.EventType.COMPLETE);
Expand Down
3 changes: 2 additions & 1 deletion integration/spark/app/src/test/resources/log4j.properties
Expand Up @@ -9,4 +9,5 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}:
log4j.logger.io.openlineage=DEBUG
log4j.logger.io.openlineage.spark.shaded=WARN
log4j.logger.org.apache.spark.storage=WARN
log4j.logger.org.apache.spark.scheduler=WARN
log4j.logger.org.apache.spark.scheduler=WARN

0 comments on commit 483d364

Please sign in to comment.