Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3305] test(spark-connector): move spark connector integration test from integration-test module to spark-connector #3307

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ kafka = "3.4.0"
curator = "2.12.0"
awaitility = "4.2.1"
servlet = "3.1.0"
jodd = "3.5.2"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TrinoQueryRunner use this lib


protobuf-plugin = "0.9.2"
spotless-plugin = '6.11.0'
Expand Down Expand Up @@ -98,6 +99,7 @@ jersey-media-json-jackson = { group = "org.glassfish.jersey.media", name = "jers
jersey-hk2 = { group = "org.glassfish.jersey.inject", name = "jersey-hk2", version.ref = "jersey" }
jersey-test-framework-core = { group = "org.glassfish.jersey.test-framework", name = "jersey-test-framework-core", version.ref = "jersey" }
jersey-test-framework-provider-jetty = { group = "org.glassfish.jersey.test-framework.providers", name = "jersey-test-framework-provider-jetty", version.ref = "jersey" }
jodd-core = { group = "org.jodd", name = "jodd-core", version.ref = "jodd" }
mockito-core = { group = "org.mockito", name = "mockito-core", version.ref = "mockito" }
hive2-metastore = { group = "org.apache.hive", name = "hive-metastore", version.ref = "hive2"}
hive2-exec = { group = "org.apache.hive", name = "hive-exec", version.ref = "hive2"}
Expand Down
25 changes: 1 addition & 24 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@ plugins {
id("idea")
}

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val kyuubiVersion: String = libs.versions.kyuubi.get()
val icebergVersion: String = libs.versions.iceberg.get()
val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get()

dependencies {
testAnnotationProcessor(libs.lombok)

Expand Down Expand Up @@ -95,6 +88,7 @@ dependencies {
}
testImplementation(libs.httpclient5)
testImplementation(libs.jline.terminal)
testImplementation(libs.jodd.core)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.minikdc) {
Expand All @@ -104,23 +98,6 @@ dependencies {
testImplementation(libs.mybatis)
testImplementation(libs.mysql.driver)

testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
exclude("org.apache.hadoop", "hadoop-client-api")
exclude("org.apache.hadoop", "hadoop-client-runtime")
}
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")
testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") {
exclude("org.apache.avro")
exclude("org.apache.hadoop")
exclude("org.apache.zookeeper")
exclude("io.dropwizard.metrics")
exclude("org.rocksdb")
}
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")

testImplementation(libs.okhttp3.loginterceptor)
testImplementation(libs.postgresql.driver)
testImplementation(libs.rauschig)
Expand Down
12 changes: 8 additions & 4 deletions spark-connector/spark-connector-runtime/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ plugins {
alias(libs.plugins.shadow)
}

dependencies {
implementation(project(":spark-connector:spark-connector"))
}

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg.get()
val baseName = "${rootProject.name}-spark-connector-runtime-${sparkMajorVersion}_$scalaVersion"

dependencies {
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
implementation(project(":spark-connector:spark-connector"))

implementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to package the iceberg runtime jar into spark connector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd better not package Iceberg runtime jar, but there are some limits now, we now register Iceberg extensions on startup. I prefer to do it in #3396

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a big work? I think it should be fixed in this version, we cannot ship a Spark connector with iceberg runtime packaged in, it will introduce lots of issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big work, but we have to keep consistent about this,

  1. if not registering Iceberg extensions, Iceberg catalogs couldn't be dymatic loaded in the future
  2. when not registering Iceberg extensions, I prefer to add a configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix this before 0.5.1 is shipped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

tasks.withType<ShadowJar>(ShadowJar::class.java) {
isZip64 = true
configurations = listOf(project.configurations.runtimeClasspath.get())
Expand Down
92 changes: 87 additions & 5 deletions spark-connector/spark-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,106 @@ val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg.get()
val kyuubiVersion: String = libs.versions.kyuubi.get()
val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get()

dependencies {
implementation(project(":catalogs:bundled-catalog", configuration = "shadow"))
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
implementation(libs.guava)
implementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
implementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")

// unable to run IT in embedded mode if including client-java-runtime and common module
compileOnly(project(":clients:client-java-runtime", configuration = "shadow"))
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
compileOnly("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion")
annotationProcessor(libs.lombok)
compileOnly(libs.lombok)

testAnnotationProcessor(libs.lombok)
testCompileOnly(libs.lombok)

testImplementation(project(":api"))
testImplementation(project(":clients:client-java"))
testImplementation(project(":core"))
testImplementation(project(":common"))
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))

testImplementation(libs.hive2.common) {
exclude("com.sun.jersey")
exclude("org.apache.curator")
// use hadoop from Spark
exclude("org.apache.hadoop")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
}
testImplementation(libs.hive2.metastore) {
exclude("co.cask.tephra")
exclude("com.github.joshelser")
exclude("com.google.code.findbugs", "jsr305")
exclude("com.google.code.findbugs", "sr305")
exclude("com.tdunning", "json")
exclude("com.zaxxer", "HikariCP")
exclude("com.sun.jersey")
exclude("io.dropwizard.metricss")
exclude("javax.transaction", "transaction-api")
exclude("org.apache.avro")
exclude("org.apache.curator")
exclude("org.apache.hbase")
exclude("org.apache.hadoop")
exclude("org.apache.parquet", "parquet-hadoop-bundle")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.slf4j")
}
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
testImplementation("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion")
testImplementation(libs.mysql.driver)
testImplementation(libs.testcontainers)

// org.apache.iceberg.rest.RESTSerializers#registerAll(ObjectMapper) has different method signature for iceberg-core and iceberg-spark-runtime package, we must make sure iceberg-core is in front to start up MiniGravitino server.
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
// conflict with Gravitino server jersey
exclude("org.glassfish.jersey.core")
exclude("org.glassfish.jersey.containers")
exclude("org.glassfish.jersey.inject")
exclude("com.sun.jersey")
}
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")

testRuntimeOnly(libs.junit.jupiter.engine)
}

tasks.test {
val skipUTs = project.hasProperty("skipTests")
if (skipUTs) {
// Only run integration tests
include("**/integration/**")
}

val skipITs = project.hasProperty("skipITs")
if (skipITs) {
// Exclude integration tests
exclude("**/integration/**")
} else {
dependsOn(tasks.jar)

doFirst {
environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12")
}

val init = project.extra.get("initIntegrationTest") as (Test) -> Unit
init(this)
}
}

tasks.clean {
delete("spark-warehouse")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.ws.rs.NotSupportedException;
import lombok.Getter;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -117,7 +116,7 @@ public Transform[] toGravitinoPartitionings(
getFieldNameFromGravitinoNamedReference(
(NamedReference) toGravitinoNamedReference(transform.references()[0])));
} else {
throw new NotSupportedException(
throw new UnsupportedOperationException(
"Doesn't support Spark transform: " + transform.name());
}
})
Expand Down Expand Up @@ -146,7 +145,7 @@ public DistributionAndSortOrdersInfo toGravitinoDistributionAndSortOrders(
Distribution distribution = toGravitinoDistribution(bucketTransform);
distributionAndSortOrdersInfo.setDistribution(distribution);
} else {
throw new NotSupportedException(
throw new UnsupportedOperationException(
"Only support BucketTransform and SortedBucketTransform, but get: "
+ transform.name());
}
Expand Down Expand Up @@ -300,7 +299,7 @@ private static org.apache.spark.sql.connector.expressions.Transform toSparkBucke
}
// Spark doesn't support EVEN or RANGE distribution
default:
throw new NotSupportedException(
throw new UnsupportedOperationException(
"Doesn't support distribution strategy: " + distribution.strategy());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.ws.rs.NotSupportedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.connector.catalog.TableCatalog;

Expand Down Expand Up @@ -106,7 +105,7 @@ public Map<String, String> toGravitinoTableProperties(Map<String, String> proper
gravitinoTableProperties.put(
HivePropertiesConstants.GRAVITINO_HIVE_FORMAT, gravitinoFormat);
} else {
throw new NotSupportedException("Doesn't support hive file format: " + fileFormat);
throw new UnsupportedOperationException("Doesn't support hive file format: " + fileFormat);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import javax.ws.rs.NotSupportedException;
import org.apache.spark.sql.connector.expressions.BucketTransform;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.FieldReference;
Expand Down Expand Up @@ -106,11 +105,11 @@ void testGravitinoToSparkDistributionWithoutSortOrder(boolean supportsBucketPart
if (!supportsBucketPartition) {
// range and even distribution
Assertions.assertThrowsExactly(
NotSupportedException.class,
UnsupportedOperationException.class,
() -> sparkTransformConverter.toSparkTransform(null, Distributions.RANGE, null));
Distribution evenDistribution = Distributions.even(bucketNum, NamedReference.field(""));
Assertions.assertThrowsExactly(
NotSupportedException.class,
UnsupportedOperationException.class,
() -> sparkTransformConverter.toSparkTransform(null, evenDistribution, null));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import javax.ws.rs.NotSupportedException;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.junit.jupiter.api.Assertions;
Expand All @@ -31,7 +30,7 @@ void testTableFormat() {
Assertions.assertEquals(
"PARQUET", hiveProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_FORMAT));
Assertions.assertThrowsExactly(
NotSupportedException.class,
UnsupportedOperationException.class,
() ->
hivePropertiesConverter.toGravitinoTableProperties(
ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_STORED_AS, "notExists")));
Expand All @@ -49,7 +48,7 @@ void testTableFormat() {
HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_TEXTFILE,
hiveProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_FORMAT));
Assertions.assertThrowsExactly(
NotSupportedException.class,
UnsupportedOperationException.class,
() ->
hivePropertiesConverter.toGravitinoTableProperties(
ImmutableMap.of(TableCatalog.PROP_PROVIDER, "notExists")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.integration.test.spark;
package com.datastrato.gravitino.spark.connector.integration.test;

import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfo;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfoChecker;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.integration.test.spark;
package com.datastrato.gravitino.spark.connector.integration.test;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.auxiliary.AuxiliaryServiceManager;
import com.datastrato.gravitino.client.GravitinoMetalake;
import com.datastrato.gravitino.integration.test.container.ContainerSuite;
import com.datastrato.gravitino.integration.test.container.HiveContainer;
import com.datastrato.gravitino.integration.test.util.AbstractIT;
import com.datastrato.gravitino.integration.test.util.spark.SparkUtilIT;
import com.datastrato.gravitino.server.web.JettyServerConfig;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkUtilIT;
import com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -102,16 +102,16 @@ public static void startIntegrationTest() {}
public static void stopIntegrationTest() {}

private void initMetalakeAndCatalogs() {
client.createMetalake(metalakeName, "", Collections.emptyMap());
GravitinoMetalake metalake = client.loadMetalake(metalakeName);
AbstractIT.client.createMetalake(metalakeName, "", Collections.emptyMap());
GravitinoMetalake metalake = AbstractIT.client.loadMetalake(metalakeName);
Map<String, String> properties = getCatalogConfigs();
metalake.createCatalog(
getCatalogName(), Catalog.Type.RELATIONAL, getProvider(), "", properties);
}

private void initGravitinoEnv() {
// Gravitino server is already started by AbstractIT, just construct gravitinoUrl
int gravitinoPort = getGravitinoServerPort();
int gravitinoPort = AbstractIT.getGravitinoServerPort();
gravitinoUri = String.format("http://127.0.0.1:%d", gravitinoPort);
icebergRestServiceUri = getIcebergRestServiceUri();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.integration.test.spark.hive;
package com.datastrato.gravitino.spark.connector.integration.test.hive;

import com.datastrato.gravitino.integration.test.spark.SparkCommonIT;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.hive.HivePropertiesConstants;
import com.datastrato.gravitino.spark.connector.integration.test.SparkCommonIT;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfo;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfoChecker;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.integration.test.spark.iceberg;
package com.datastrato.gravitino.spark.connector.integration.test.iceberg;

import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
import com.google.common.collect.Maps;
Expand Down
Loading
Loading