Skip to content

Commit

Permalink
[apache#3305] test(spark-connector): move spark connector integration…
Browse files Browse the repository at this point in the history
… test from integration-test module to spark-connector (apache#3307)

move spark connector integration test from integration-test module to
spark-connector

Fix: apache#3305

no

moving test location
  • Loading branch information
FANNG1 committed May 23, 2024
1 parent cfa17b3 commit 2d9ec39
Show file tree
Hide file tree
Showing 18 changed files with 134 additions and 74 deletions.
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ kafka = "3.4.0"
curator = "2.12.0"
awaitility = "4.2.1"
servlet = "3.1.0"
jodd = "3.5.2"

protobuf-plugin = "0.9.2"
spotless-plugin = '6.11.0'
Expand Down Expand Up @@ -98,6 +99,7 @@ jersey-media-json-jackson = { group = "org.glassfish.jersey.media", name = "jers
jersey-hk2 = { group = "org.glassfish.jersey.inject", name = "jersey-hk2", version.ref = "jersey" }
jersey-test-framework-core = { group = "org.glassfish.jersey.test-framework", name = "jersey-test-framework-core", version.ref = "jersey" }
jersey-test-framework-provider-jetty = { group = "org.glassfish.jersey.test-framework.providers", name = "jersey-test-framework-provider-jetty", version.ref = "jersey" }
jodd-core = { group = "org.jodd", name = "jodd-core", version.ref = "jodd" }
mockito-core = { group = "org.mockito", name = "mockito-core", version.ref = "mockito" }
hive2-metastore = { group = "org.apache.hive", name = "hive-metastore", version.ref = "hive2"}
hive2-exec = { group = "org.apache.hive", name = "hive-exec", version.ref = "hive2"}
Expand Down
25 changes: 1 addition & 24 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@ plugins {
id("idea")
}

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val kyuubiVersion: String = libs.versions.kyuubi.get()
val icebergVersion: String = libs.versions.iceberg.get()
val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get()

dependencies {
testAnnotationProcessor(libs.lombok)

Expand Down Expand Up @@ -95,6 +88,7 @@ dependencies {
}
testImplementation(libs.httpclient5)
testImplementation(libs.jline.terminal)
testImplementation(libs.jodd.core)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.minikdc) {
Expand All @@ -104,23 +98,6 @@ dependencies {
testImplementation(libs.mybatis)
testImplementation(libs.mysql.driver)

testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
exclude("org.apache.hadoop", "hadoop-client-api")
exclude("org.apache.hadoop", "hadoop-client-runtime")
}
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")
testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") {
exclude("org.apache.avro")
exclude("org.apache.hadoop")
exclude("org.apache.zookeeper")
exclude("io.dropwizard.metrics")
exclude("org.rocksdb")
}
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")

testImplementation(libs.okhttp3.loginterceptor)
testImplementation(libs.postgresql.driver)
testImplementation(libs.rauschig)
Expand Down
12 changes: 8 additions & 4 deletions spark-connector/spark-connector-runtime/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ plugins {
alias(libs.plugins.shadow)
}

dependencies {
implementation(project(":spark-connector:spark-connector"))
}

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg.get()
val baseName = "${rootProject.name}-spark-connector-runtime-${sparkMajorVersion}_$scalaVersion"

dependencies {
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
implementation(project(":spark-connector:spark-connector"))

implementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
}

tasks.withType<ShadowJar>(ShadowJar::class.java) {
isZip64 = true
configurations = listOf(project.configurations.runtimeClasspath.get())
Expand Down
92 changes: 87 additions & 5 deletions spark-connector/spark-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,106 @@ val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg.get()
val kyuubiVersion: String = libs.versions.kyuubi.get()
val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get()

dependencies {
implementation(project(":catalogs:bundled-catalog", configuration = "shadow"))
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
implementation(libs.guava)
implementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
implementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")

// unable to run IT in embedded mode if including client-java-runtime and common module
compileOnly(project(":clients:client-java-runtime", configuration = "shadow"))
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
compileOnly("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion")
annotationProcessor(libs.lombok)
compileOnly(libs.lombok)

testAnnotationProcessor(libs.lombok)
testCompileOnly(libs.lombok)

testImplementation(project(":api"))
testImplementation(project(":clients:client-java"))
testImplementation(project(":core"))
testImplementation(project(":common"))
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))

testImplementation(libs.hive2.common) {
exclude("com.sun.jersey")
exclude("org.apache.curator")
// use hadoop from Spark
exclude("org.apache.hadoop")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
}
testImplementation(libs.hive2.metastore) {
exclude("co.cask.tephra")
exclude("com.github.joshelser")
exclude("com.google.code.findbugs", "jsr305")
exclude("com.google.code.findbugs", "sr305")
exclude("com.tdunning", "json")
exclude("com.zaxxer", "HikariCP")
exclude("com.sun.jersey")
exclude("io.dropwizard.metricss")
exclude("javax.transaction", "transaction-api")
exclude("org.apache.avro")
exclude("org.apache.curator")
exclude("org.apache.hbase")
exclude("org.apache.hadoop")
exclude("org.apache.parquet", "parquet-hadoop-bundle")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.slf4j")
}
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
testImplementation("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion")
testImplementation(libs.mysql.driver)
testImplementation(libs.testcontainers)

// org.apache.iceberg.rest.RESTSerializers#registerAll(ObjectMapper) has different method signature for iceberg-core and iceberg-spark-runtime package, we must make sure iceberg-core is in front to start up MiniGravitino server.
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
// conflict with Gravitino server jersey
exclude("org.glassfish.jersey.core")
exclude("org.glassfish.jersey.containers")
exclude("org.glassfish.jersey.inject")
exclude("com.sun.jersey")
}
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")

testRuntimeOnly(libs.junit.jupiter.engine)
}

tasks.test {
val skipUTs = project.hasProperty("skipTests")
if (skipUTs) {
// Only run integration tests
include("**/integration/**")
}

val skipITs = project.hasProperty("skipITs")
if (skipITs) {
// Exclude integration tests
exclude("**/integration/**")
} else {
dependsOn(tasks.jar)

doFirst {
environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12")
}

val init = project.extra.get("initIntegrationTest") as (Test) -> Unit
init(this)
}
}

tasks.clean {
delete("spark-warehouse")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.ws.rs.NotSupportedException;
import lombok.Getter;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -117,7 +116,7 @@ public Transform[] toGravitinoPartitionings(
getFieldNameFromGravitinoNamedReference(
(NamedReference) toGravitinoNamedReference(transform.references()[0])));
} else {
throw new NotSupportedException(
throw new UnsupportedOperationException(
"Doesn't support Spark transform: " + transform.name());
}
})
Expand Down Expand Up @@ -146,7 +145,7 @@ public DistributionAndSortOrdersInfo toGravitinoDistributionAndSortOrders(
Distribution distribution = toGravitinoDistribution(bucketTransform);
distributionAndSortOrdersInfo.setDistribution(distribution);
} else {
throw new NotSupportedException(
throw new UnsupportedOperationException(
"Only support BucketTransform and SortedBucketTransform, but get: "
+ transform.name());
}
Expand Down Expand Up @@ -300,7 +299,7 @@ private static org.apache.spark.sql.connector.expressions.Transform toSparkBucke
}
// Spark doesn't support EVEN or RANGE distribution
default:
throw new NotSupportedException(
throw new UnsupportedOperationException(
"Doesn't support distribution strategy: " + distribution.strategy());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.ws.rs.NotSupportedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.connector.catalog.TableCatalog;

Expand Down Expand Up @@ -106,7 +105,7 @@ public Map<String, String> toGravitinoTableProperties(Map<String, String> proper
gravitinoTableProperties.put(
HivePropertiesConstants.GRAVITINO_HIVE_FORMAT, gravitinoFormat);
} else {
throw new NotSupportedException("Doesn't support hive file format: " + fileFormat);
throw new UnsupportedOperationException("Doesn't support hive file format: " + fileFormat);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import javax.ws.rs.NotSupportedException;
import org.apache.spark.sql.connector.expressions.BucketTransform;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.FieldReference;
Expand Down Expand Up @@ -106,11 +105,11 @@ void testGravitinoToSparkDistributionWithoutSortOrder(boolean supportsBucketPart
if (!supportsBucketPartition) {
// range and even distribution
Assertions.assertThrowsExactly(
NotSupportedException.class,
UnsupportedOperationException.class,
() -> sparkTransformConverter.toSparkTransform(null, Distributions.RANGE, null));
Distribution evenDistribution = Distributions.even(bucketNum, NamedReference.field(""));
Assertions.assertThrowsExactly(
NotSupportedException.class,
UnsupportedOperationException.class,
() -> sparkTransformConverter.toSparkTransform(null, evenDistribution, null));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import javax.ws.rs.NotSupportedException;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.junit.jupiter.api.Assertions;
Expand All @@ -31,7 +30,7 @@ void testTableFormat() {
Assertions.assertEquals(
"PARQUET", hiveProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_FORMAT));
Assertions.assertThrowsExactly(
NotSupportedException.class,
UnsupportedOperationException.class,
() ->
hivePropertiesConverter.toGravitinoTableProperties(
ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_STORED_AS, "notExists")));
Expand All @@ -49,7 +48,7 @@ void testTableFormat() {
hiveProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_FORMAT),
HivePropertiesConstants.GRAVITINO_HIVE_FORMAT_TEXTFILE);
Assertions.assertThrowsExactly(
NotSupportedException.class,
UnsupportedOperationException.class,
() ->
hivePropertiesConverter.toGravitinoTableProperties(
ImmutableMap.of(TableCatalog.PROP_PROVIDER, "notExists")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.integration.test.spark;
package com.datastrato.gravitino.spark.connector.integration.test;

import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfo;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfoChecker;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.integration.test.spark;
package com.datastrato.gravitino.spark.connector.integration.test;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.NameIdentifier;
Expand All @@ -12,10 +12,10 @@
import com.datastrato.gravitino.integration.test.container.ContainerSuite;
import com.datastrato.gravitino.integration.test.container.HiveContainer;
import com.datastrato.gravitino.integration.test.util.AbstractIT;
import com.datastrato.gravitino.integration.test.util.spark.SparkUtilIT;
import com.datastrato.gravitino.server.web.JettyServerConfig;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkUtilIT;
import com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -103,8 +103,8 @@ public static void startIntegrationTest() {}
public static void stopIntegrationTest() {}

private void initMetalakeAndCatalogs() {
client.createMetalake(NameIdentifier.of(metalakeName), "", Collections.emptyMap());
GravitinoMetalake metalake = client.loadMetalake(NameIdentifier.of(metalakeName));
AbstractIT.client.createMetalake(NameIdentifier.of(metalakeName), "", Collections.emptyMap());
GravitinoMetalake metalake = AbstractIT.client.loadMetalake(NameIdentifier.of(metalakeName));
Map<String, String> properties = getCatalogConfigs();
metalake.createCatalog(
NameIdentifier.of(metalakeName, getCatalogName()),
Expand All @@ -116,7 +116,7 @@ private void initMetalakeAndCatalogs() {

private void initGravitinoEnv() {
// Gravitino server is already started by AbstractIT, just construct gravitinoUrl
int gravitinoPort = getGravitinoServerPort();
int gravitinoPort = AbstractIT.getGravitinoServerPort();
gravitinoUri = String.format("http://127.0.0.1:%d", gravitinoPort);
icebergRestServiceUri = getIcebergRestServiceUri();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.integration.test.spark.hive;
package com.datastrato.gravitino.spark.connector.integration.test.hive;

import com.datastrato.gravitino.integration.test.spark.SparkCommonIT;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.hive.HivePropertiesConstants;
import com.datastrato.gravitino.spark.connector.integration.test.SparkCommonIT;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfo;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.spark.connector.integration.test.util.SparkTableInfoChecker;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.integration.test.spark.iceberg;
package com.datastrato.gravitino.spark.connector.integration.test.iceberg;

import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
import com.google.common.collect.Maps;
Expand Down
Loading

0 comments on commit 2d9ec39

Please sign in to comment.