Skip to content

Commit

Permalink
[Spark] fix databricks environment
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Mar 27, 2024
1 parent fc48021 commit bdabc88
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.openlineage.client.OpenLineage.OutputDataset;
import io.openlineage.client.OpenLineage.RunEvent;
import io.openlineage.client.OpenLineage.RunEvent.EventType;
import io.openlineage.client.OpenLineage.RunFacet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -81,6 +82,41 @@ public void testCreateTableAsSelect() {

assertThat(inputDataset.getNamespace()).isEqualTo("dbfs");
assertThat(inputDataset.getName()).isEqualTo("/user/hive/warehouse/temp");

// test DatabricksEnvironmentFacetBuilder handler
RunEvent eventWithDatabricksProperties =
runEvents.stream()
.filter(
r ->
r.getRun()
.getFacets()
.getAdditionalProperties()
.containsKey("environment-properties"))
.findFirst()
.get();

RunFacet environmentFacet =
eventWithDatabricksProperties
.getRun()
.getFacets()
.getAdditionalProperties()
.get("environment-properties");

Map<String, Object> properties =
(Map<String, Object>)
environmentFacet.getAdditionalProperties().get("environment-properties");

assertThat(properties.get("spark.databricks.job.type")).isEqualTo("python");

List<Object> mounts = (List<Object>) properties.get("mountPoints");

assertThat(mounts).isNotEmpty();
Map<String, String> mountInfo = (Map<String, String>) mounts.get(0);

assertThat(mountInfo).containsKeys("mountPoint", "source");

assertThat(mountInfo.get("mountPoint")).startsWith("/databricks");
assertThat(mountInfo.get("source")).startsWith("databricks");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class DatabricksUtils {
public static final String CLUSTER_NAME = "openlineage-test-cluster";
public static final Map<String, String> PLATFORM_VERSIONS =
Stream.of(
new AbstractMap.SimpleEntry<>("3.4.1", "13.3.x-scala2.12"),
new AbstractMap.SimpleEntry<>("3.4.2", "13.3.x-scala2.12"),
new AbstractMap.SimpleEntry<>("3.5.0", "14.2.x-scala2.12"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
public static final String NODE_TYPE = "Standard_DS3_v2";
Expand Down Expand Up @@ -204,17 +204,18 @@ private static String getClusterName() {

private static String getSparkPlatformVersion() {
if (!PLATFORM_VERSIONS.containsKey(System.getProperty(SPARK_VERSION))) {
log.error("Unsupported spark_version for databricks test");
log.error("Unsupported spark_version for databricks test {}", SPARK_VERSION);
}

log.info("Databricks version {}", PLATFORM_VERSIONS.get(System.getProperty(SPARK_VERSION)));
return PLATFORM_VERSIONS.get(System.getProperty(SPARK_VERSION));
}

@SneakyThrows
private static void uploadOpenlineageJar(WorkspaceClient workspace) {
Path jarFile =
Files.list(Paths.get("../build/libs/"))
.filter(p -> p.getFileName().toString().startsWith("openlineage-spark-"))
.filter(p -> p.getFileName().toString().startsWith("openlineage-spark_"))
.filter(p -> p.getFileName().toString().endsWith("jar"))
.findAny()
.orElseThrow(() -> new RuntimeException("openlineage-spark jar not found"));
Expand Down
2 changes: 1 addition & 1 deletion integration/spark/databricks/open-lineage-init-script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
STAGE_DIR="/dbfs/databricks/openlineage"

echo "BEGIN: Upload Spark Listener JARs"
cp -f $STAGE_DIR/openlineage-spark-*.jar /mnt/driver-daemon/jars || { echo "Error copying Spark Listener library file"; exit 1;}
cp -f $STAGE_DIR/openlineage-spark_*.jar /mnt/driver-daemon/jars || { echo "Error copying Spark Listener library file"; exit 1;}
echo "END: Upload Spark Listener JARs"

echo "BEGIN: Modify Spark config settings"
Expand Down
2 changes: 1 addition & 1 deletion integration/spark/gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ jdk8.build=true
version=1.11.0-SNAPSHOT
org.gradle.jvmargs=-Xmx4G

spark.version=3.3.4
spark.version=3.4.1
scala.binary.version=2.12

shared.spark.version=3.2.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

package io.openlineage.spark.agent.facets.builder;

import com.databricks.sdk.scala.dbutils.DbfsUtils;
import com.databricks.sdk.scala.dbutils.MountInfo;
import io.openlineage.spark.agent.facets.EnvironmentFacet;
import io.openlineage.spark.agent.models.DatabricksMountpoint;
import io.openlineage.spark.agent.util.ReflectionUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.CustomFacetBuilder;
import io.openlineage.spark.api.OpenLineageContext;
Expand All @@ -17,24 +16,27 @@
import java.lang.reflect.Parameter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Seq;

/**
* {@link CustomFacetBuilder} that generates a {@link EnvironmentFacet} when using OpenLineage on
* Databricks.
*/
@Slf4j
public class DatabricksEnvironmentFacetBuilder
extends CustomFacetBuilder<SparkListenerJobStart, EnvironmentFacet> {
private Map<String, Object> dbProperties;
private Class dbutilsClass;
private DbfsUtils dbutils;

private static final Logger log =
LoggerFactory.getLogger(DatabricksEnvironmentFacetBuilder.class);

public static boolean isDatabricksRuntime() {
return System.getenv().containsKey("DATABRICKS_RUNTIME_VERSION");
Expand Down Expand Up @@ -88,59 +90,83 @@ private Map<String, Object> getDatabricksEnvironmentalAttributes(SparkListenerJo
* storage. However, that dbutils object is not available inside a spark listener. We must
* access it via reflection.
*/
try {
Optional<DbfsUtils> dbfsUtils = getDbfsUtils();
if (!dbfsUtils.isPresent()) {
dbProperties.put("mountPoints", new ArrayList<DatabricksMountpoint>());
} else {
dbProperties.put("mountPoints", getDatabricksMountpoints(dbfsUtils.get()));
}
dbProperties.put("mountPoints", getDatabricksMountpoints());

} catch (Exception e) {
log.warn("Failed to load dbutils in OpenLineageListener:", e);
dbProperties.put("mountPoints", new ArrayList<DatabricksMountpoint>());
}
return dbProperties;
}

// Starting in Databricks Runtime 11, there is a new constructor for DbFsUtils
// If running on an older version, the constructor has no parameters.
// If running on DBR 11 or above, you need to specify whether you allow mount operations (true or
// false)
private static Optional<DbfsUtils> getDbfsUtils()
throws ClassNotFoundException, InstantiationException, IllegalAccessException,
IllegalArgumentException, InvocationTargetException {
Class dbutilsClass = Class.forName("com.databricks.dbutils_v1.impl.DbfsUtilsImpl");
private static List<DatabricksMountpoint> getDatabricksMountpoints() {
Class dbutilsClass;
try {
dbutilsClass = Class.forName("com.databricks.dbutils_v1.impl.DbfsUtilsImpl");
} catch (ClassNotFoundException | NoClassDefFoundError e) {
log.warn("Class com.databricks.dbutils_v1.impl.DbfsUtilsImpl not found", e);
return Collections.emptyList();
}
Constructor[] dbutilsConstructors = dbutilsClass.getDeclaredConstructors();
if (dbutilsConstructors.length == 0) {
log.warn(
"Failed to load dbutils in OpenLineageListener as there were no declared constructors");
return Optional.empty();
return Collections.emptyList();
}
Constructor firstConstructor = dbutilsConstructors[0];
Parameter[] constructorParams = firstConstructor.getParameters();
Object dbfsUtils; // com.databricks.dbutils_v1.impl.DBUtilsV1Impl
if (constructorParams.length == 0) {
log.debug("DbUtils constructor had no parameters");
return Optional.of((DbfsUtils) firstConstructor.newInstance());
try {
dbfsUtils = firstConstructor.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
log.warn("DbUtils method thrown {}", e);
return Collections.emptyList();
}
} else if (constructorParams.length == 1
&& constructorParams[0].getName().equals("allowMountOperations")) {
log.debug("DbUtils constructor had one parameter named allowMountOperations");
return Optional.of((DbfsUtils) firstConstructor.newInstance(true));
try {
dbfsUtils = firstConstructor.newInstance(true);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
log.warn("DbUtils method thrown {}", e);
return Collections.emptyList();
}
} else {
log.warn(
"dbutils had {} constructors and the first constructor had {} params",
dbutilsConstructors.length,
constructorParams.length);
return Optional.empty();
return Collections.emptyList();
}
}

private static List<DatabricksMountpoint> getDatabricksMountpoints(DbfsUtils dbutils) {
List<DatabricksMountpoint> mountpoints = new ArrayList<>();
List<MountInfo> mountsList = ScalaConversionUtils.fromSeq(dbutils.mounts());
for (MountInfo mount : mountsList) {
mountpoints.add(new DatabricksMountpoint(mount.mountPoint(), mount.source()));

// list of com.databricks.backend.daemon.dbutils.MountInfo
List<Object> mountsList =
ScalaConversionUtils.fromSeq(
(Seq<Object>) ReflectionUtils.tryExecuteMethod(dbfsUtils, "mounts").get());

for (Object mount : mountsList) {
Optional<Object> mountPoint = ReflectionUtils.tryExecuteMethod(mount, "mountPoint");
Optional<Object> source = ReflectionUtils.tryExecuteMethod(mount, "source");

if (mountPoint.isPresent()
&& mountPoint.get() != null
&& source.isPresent()
&& source.get() != null) {
mountpoints.add(
new DatabricksMountpoint(mountPoint.get().toString(), source.get().toString()));
} else {
log.warn(
"Couldn't extract mountPoint and source through reflection. "
+ "mountPoint = {}, source = {}",
mountPoint,
source);
}
}

return mountpoints;
}
}

0 comments on commit bdabc88

Please sign in to comment.