Skip to content

Commit 2bb83cd

Browse files
committed
resolve added unit tests, integration test, and dep conflicts
1 parent 678f3ab commit 2bb83cd

File tree

8 files changed

+501
-163
lines changed

8 files changed

+501
-163
lines changed

plugins/spark/v3.5/integration/build.gradle.kts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,51 @@ dependencies {
5656
exclude("org.apache.logging.log4j", "log4j-core")
5757
exclude("org.slf4j", "jul-to-slf4j")
5858
}
59+
60+
// Add spark-hive for Hudi integration - provides HiveExternalCatalog that Hudi needs
61+
testImplementation("org.apache.spark:spark-hive_${scalaVersion}:${spark35Version}") {
62+
// exclude log4j dependencies to match spark-sql exclusions
63+
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
64+
exclude("org.apache.logging.log4j", "log4j-1.2-api")
65+
exclude("org.apache.logging.log4j", "log4j-core")
66+
exclude("org.slf4j", "jul-to-slf4j")
67+
// exclude old slf4j 1.x to log4j 2.x bridge that conflicts with slf4j 2.x bridge
68+
exclude("org.apache.logging.log4j", "log4j-slf4j-impl")
69+
}
5970
// enforce the usage of log4j 2.24.3. This is for the log4j-api compatibility
6071
// of spark-sql dependency
6172
testRuntimeOnly("org.apache.logging.log4j:log4j-core:2.24.3")
6273
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.24.3")
6374

6475
testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1")
65-
testImplementation("org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0")
76+
testImplementation("org.apache.hudi:hudi-spark3.5-bundle_${scalaVersion}:0.15.0") {
77+
// exclude log4j dependencies to match spark-sql exclusions and prevent version conflicts
78+
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
79+
exclude("org.apache.logging.log4j", "log4j-1.2-api")
80+
exclude("org.apache.logging.log4j", "log4j-core")
81+
exclude("org.slf4j", "jul-to-slf4j")
82+
exclude("org.slf4j", "slf4j-log4j12")
83+
exclude("org.slf4j", "slf4j-reload4j")
84+
exclude("ch.qos.reload4j", "reload4j")
85+
exclude("log4j", "log4j")
86+
// exclude old slf4j 1.x to log4j 2.x bridge that conflicts with slf4j 2.x bridge
87+
exclude("org.apache.logging.log4j", "log4j-slf4j-impl")
88+
}
89+
90+
// The hudi-spark-bundle includes most Hive libraries but excludes hive-exec to keep size
91+
// manageable
92+
// This matches what Spark 3.5 distribution provides (hive-exec-2.3.9-core.jar)
93+
testImplementation("org.apache.hive:hive-exec:2.3.9:core") {
94+
// Exclude conflicting dependencies to use Spark's versions
95+
exclude("org.apache.hadoop", "*")
96+
exclude("org.apache.commons", "*")
97+
exclude("org.slf4j", "*")
98+
exclude("log4j", "*")
99+
exclude("org.apache.logging.log4j", "*")
100+
exclude("org.pentaho", "*")
101+
exclude("org.apache.calcite", "*")
102+
exclude("org.apache.tez", "*")
103+
}
66104

67105
testImplementation(platform(libs.jackson.bom))
68106
testImplementation("com.fasterxml.jackson.core:jackson-annotations")

plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkHudiIT.java

Lines changed: 27 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,10 @@
2424
import io.quarkus.test.junit.QuarkusIntegrationTest;
2525
import java.io.File;
2626
import java.nio.file.Path;
27-
import java.util.Arrays;
2827
import java.util.List;
2928
import org.apache.commons.io.FileUtils;
3029
import org.apache.polaris.service.it.env.IntegrationTestsHelper;
31-
import org.apache.spark.sql.AnalysisException;
32-
import org.apache.spark.sql.Dataset;
33-
import org.apache.spark.sql.Row;
34-
import org.apache.spark.sql.RowFactory;
3530
import org.apache.spark.sql.SparkSession;
36-
import org.apache.spark.sql.types.DataTypes;
37-
import org.apache.spark.sql.types.Metadata;
38-
import org.apache.spark.sql.types.StructField;
39-
import org.apache.spark.sql.types.StructType;
4031
import org.junit.jupiter.api.AfterEach;
4132
import org.junit.jupiter.api.BeforeEach;
4233
import org.junit.jupiter.api.Test;
@@ -48,28 +39,30 @@ public class SparkHudiIT extends SparkIntegrationBase {
4839
@Override
4940
protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) {
5041
return builder
51-
.config(
52-
"spark.sql.extensions",
53-
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
54-
.config(
55-
"spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
56-
.config(
57-
String.format("spark.sql.catalog.%s", catalogName),
58-
"org.apache.polaris.spark.SparkCatalog")
59-
.config("spark.sql.warehouse.dir", warehouseDir.toString())
60-
.config(String.format("spark.sql.catalog.%s.type", catalogName), "rest")
61-
.config(
62-
String.format("spark.sql.catalog.%s.uri", catalogName),
63-
endpoints.catalogApiEndpoint().toString())
64-
.config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName)
65-
.config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL")
66-
.config(
67-
String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId())
68-
.config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken)
69-
.config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey")
70-
.config(
71-
String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret")
72-
.config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2");
42+
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
43+
.config(
44+
"spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
45+
.config(
46+
String.format("spark.sql.catalog.%s", catalogName),
47+
"org.apache.polaris.spark.SparkCatalog")
48+
.config("spark.sql.warehouse.dir", warehouseDir.toString())
49+
.config(String.format("spark.sql.catalog.%s.type", catalogName), "rest")
50+
.config(
51+
String.format("spark.sql.catalog.%s.uri", catalogName),
52+
endpoints.catalogApiEndpoint().toString())
53+
.config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName)
54+
.config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL")
55+
.config(
56+
String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId())
57+
.config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken)
58+
.config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey")
59+
.config(
60+
String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret")
61+
.config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2")
62+
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
63+
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
64+
// for intial integration test have disabled for now, to revisit enabling in future
65+
.config("hoodie.metadata.enable", "false");
7366
}
7467

7568
private String defaultNs;
@@ -85,7 +78,7 @@ private String getTableNameWithRandomSuffix() {
8578

8679
@BeforeEach
8780
public void createDefaultResources(@TempDir Path tempDir) {
88-
spark.sparkContext().setLogLevel("WARN");
81+
spark.sparkContext().setLogLevel("INFO");
8982
defaultNs = generateName("hudi");
9083
// create a default namespace
9184
sql("CREATE NAMESPACE %s", defaultNs);
@@ -110,7 +103,7 @@ public void testBasicTableOperations() {
110103
"CREATE TABLE %s (id INT, name STRING) USING HUDI LOCATION '%s'",
111104
huditb1, getTableLocation(huditb1));
112105
sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", huditb1);
113-
List<Object[]> results = sql("SELECT * FROM %s WHERE id > 1 ORDER BY id DESC", huditb1);
106+
List<Object[]> results = sql("SELECT id,name FROM %s WHERE id > 1 ORDER BY id DESC", huditb1);
114107
assertThat(results.size()).isEqualTo(1);
115108
assertThat(results.get(0)).isEqualTo(new Object[] {2, "bob"});
116109

@@ -145,63 +138,6 @@ public void testBasicTableOperations() {
145138
assertThat(tables.size()).isEqualTo(0);
146139
}
147140

148-
@Test
149-
public void testAlterOperations() {
150-
String huditb = getTableNameWithRandomSuffix();
151-
sql(
152-
"CREATE TABLE %s (id INT, name STRING) USING HUDI LOCATION '%s'",
153-
huditb, getTableLocation(huditb));
154-
sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", huditb);
155-
156-
// test alter columns
157-
// add two new columns to the table
158-
sql("Alter TABLE %s ADD COLUMNS (city STRING, age INT)", huditb);
159-
// add one more row to the table
160-
sql("INSERT INTO %s VALUES (3, 'john', 'SFO', 20)", huditb);
161-
// verify the table now have 4 columns with correct result
162-
List<Object[]> results = sql("SELECT * FROM %s ORDER BY id", huditb);
163-
assertThat(results.size()).isEqualTo(3);
164-
assertThat(results).contains(new Object[] {1, "anna", null, null});
165-
assertThat(results).contains(new Object[] {2, "bob", null, null});
166-
assertThat(results).contains(new Object[] {3, "john", "SFO", 20});
167-
168-
// drop and rename column require set the hoodie.keep.max.commits property
169-
sql("ALTER TABLE %s SET TBLPROPERTIES ('hoodie.keep.max.commits' = '50')", huditb);
170-
// drop column age
171-
sql("Alter TABLE %s DROP COLUMN age", huditb);
172-
// verify the table now have 3 columns with correct result
173-
results = sql("SELECT * FROM %s ORDER BY id", huditb);
174-
assertThat(results.size()).isEqualTo(3);
175-
assertThat(results).contains(new Object[] {1, "anna", null});
176-
assertThat(results).contains(new Object[] {2, "bob", null});
177-
assertThat(results).contains(new Object[] {3, "john", "SFO"});
178-
179-
// rename column city to address
180-
sql("Alter TABLE %s RENAME COLUMN city TO address", huditb);
181-
// verify column address exists
182-
results = sql("SELECT id, address FROM %s ORDER BY id", huditb);
183-
assertThat(results.size()).isEqualTo(3);
184-
assertThat(results).contains(new Object[] {1, null});
185-
assertThat(results).contains(new Object[] {2, null});
186-
assertThat(results).contains(new Object[] {3, "SFO"});
187-
188-
// test alter properties
189-
sql(
190-
"ALTER TABLE %s SET TBLPROPERTIES ('description' = 'people table', 'test-owner' = 'test-user')",
191-
huditb);
192-
List<Object[]> tableInfo = sql("DESCRIBE TABLE EXTENDED %s", huditb);
193-
// find the table properties result
194-
String properties = null;
195-
for (Object[] info : tableInfo) {
196-
if (info[0].equals("Table Properties")) {
197-
properties = (String) info[1];
198-
break;
199-
}
200-
}
201-
assertThat(properties).contains("description=people table,test-owner=test-user");
202-
sql("DROP TABLE %s", huditb);
203-
}
204-
205141
@Test
206142
public void testUnsupportedAlterTableOperations() {
207143
String huditb = getTableNameWithRandomSuffix();
@@ -215,7 +151,7 @@ public void testUnsupportedAlterTableOperations() {
215151

216152
// ALTER TABLE ... SET LOCATION ... fails
217153
assertThatThrownBy(() -> sql("ALTER TABLE %s SET LOCATION '/tmp/new/path'", huditb))
218-
.isInstanceOf(AnalysisException.class);
154+
.isInstanceOf(UnsupportedOperationException.class);
219155

220156
sql("DROP TABLE %s", huditb);
221157
}
@@ -235,50 +171,4 @@ public void testUnsupportedTableCreateOperations() {
235171
huditb, getTableLocation(huditb)))
236172
.isInstanceOf(IllegalArgumentException.class);
237173
}
238-
239-
@Test
240-
public void testDataframeSaveOperations() {
241-
List<Row> data = Arrays.asList(RowFactory.create("Alice", 30), RowFactory.create("Bob", 25));
242-
StructType schema =
243-
new StructType(
244-
new StructField[] {
245-
new StructField("name", DataTypes.StringType, false, Metadata.empty()),
246-
new StructField("age", DataTypes.IntegerType, false, Metadata.empty())
247-
});
248-
Dataset<Row> df = spark.createDataFrame(data, schema);
249-
250-
String huditb = getTableNameWithRandomSuffix();
251-
// saveAsTable requires support for hudi requires CTAS support for third party catalog
252-
// in hudi catalog, which is currently not supported.
253-
assertThatThrownBy(
254-
() ->
255-
df.write()
256-
.format("hudi")
257-
.option("path", getTableLocation(huditb))
258-
.saveAsTable(huditb))
259-
.isInstanceOf(IllegalArgumentException.class);
260-
261-
// verify regular dataframe saving still works
262-
df.write().format("hudi").save(getTableLocation(huditb));
263-
264-
// verify the partition dir is created
265-
List<String> subDirs = listDirs(getTableLocation(huditb));
266-
assertThat(subDirs).contains(".hoodie");
267-
268-
// verify we can create a table out of the exising hudi location
269-
sql("CREATE TABLE %s USING HUDI LOCATION '%s'", huditb, getTableLocation(huditb));
270-
List<Object[]> tables = sql("SHOW TABLES");
271-
assertThat(tables.size()).isEqualTo(1);
272-
assertThat(tables).contains(new Object[] {defaultNs, huditb, false});
273-
274-
sql("INSERT INTO %s VALUES ('Anna', 11)", huditb);
275-
276-
List<Object[]> results = sql("SELECT * FROM %s ORDER BY name", huditb);
277-
assertThat(results.size()).isEqualTo(3);
278-
assertThat(results.get(0)).isEqualTo(new Object[] {"Alice", 30});
279-
assertThat(results.get(1)).isEqualTo(new Object[] {"Anna", 11});
280-
assertThat(results.get(2)).isEqualTo(new Object[] {"Bob", 25});
281-
282-
sql("DROP TABLE %s", huditb);
283-
}
284174
}

plugins/spark/v3.5/integration/src/intTest/resources/logback.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ out the configuration if you would like ot see all spark debug log during the ru
3232
</encoder>
3333
</appender>
3434

35+
<!-- Hudi-specific loggers for test -->
36+
<logger name="org.apache.hudi" level="INFO"/>
37+
3538
<root level="ERROR">
3639
<appender-ref ref="CONSOLE"/>
3740
</root>

plugins/spark/v3.5/spark/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ dependencies {
7676
}
7777

7878
implementation("org.apache.iceberg:iceberg-core:${icebergVersion}")
79-
compileOnly("org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0")
80-
testImplementation("org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0")
79+
compileOnly("org.apache.hudi:hudi-spark3.5-bundle_${scalaVersion}:0.15.0")
80+
testImplementation("org.apache.hudi:hudi-spark3.5-bundle_${scalaVersion}:0.15.0")
8181

8282
implementation(
8383
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"

plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iceberg.spark.SupportsReplaceView;
3131
import org.apache.iceberg.util.PropertyUtil;
3232
import org.apache.polaris.spark.utils.DeltaHelper;
33+
import org.apache.polaris.spark.utils.HudiCatalogUtils;
3334
import org.apache.polaris.spark.utils.HudiHelper;
3435
import org.apache.polaris.spark.utils.PolarisCatalogUtils;
3536
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
@@ -194,7 +195,6 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
194195
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
195196
return deltaCatalog.alterTable(ident, changes);
196197
} else if (PolarisCatalogUtils.useHudi(provider)) {
197-
// check to see if this alters hudi metadata
198198
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
199199
return hudiCatalog.alterTable(ident, changes);
200200
}
@@ -286,18 +286,24 @@ public Map<String, String> loadNamespaceMetadata(String[] namespace)
286286
public void createNamespace(String[] namespace, Map<String, String> metadata)
287287
throws NamespaceAlreadyExistsException {
288288
this.icebergsSparkCatalog.createNamespace(namespace, metadata);
289+
HudiCatalogUtils.createNamespace(namespace, metadata);
289290
}
290291

291292
@Override
292293
public void alterNamespace(String[] namespace, NamespaceChange... changes)
293294
throws NoSuchNamespaceException {
294295
this.icebergsSparkCatalog.alterNamespace(namespace, changes);
296+
HudiCatalogUtils.alterNamespace(namespace, changes);
295297
}
296298

297299
@Override
298300
public boolean dropNamespace(String[] namespace, boolean cascade)
299301
throws NoSuchNamespaceException {
300-
return this.icebergsSparkCatalog.dropNamespace(namespace, cascade);
302+
boolean result = this.icebergsSparkCatalog.dropNamespace(namespace, cascade);
303+
if (result) {
304+
HudiCatalogUtils.dropNamespace(namespace, cascade);
305+
}
306+
return result;
301307
}
302308

303309
@Override

0 commit comments

Comments
 (0)