Skip to content

Commit 5445c48

Browse files
committed
add intial tests
1 parent 98908b3 commit 5445c48

File tree

8 files changed

+364
-48
lines changed

8 files changed

+364
-48
lines changed

plugins/spark/v3.5/integration/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ dependencies {
6262
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.24.3")
6363

6464
testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1")
65-
testImplementation("org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0")
65+
testImplementation("org.apache.hudi:hudi-spark-datasource:0.15.0")
6666

6767
testImplementation(platform(libs.jackson.bom))
6868
testImplementation("com.fasterxml.jackson.core:jackson-annotations")
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.spark.quarkus.it;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
23+
24+
import io.quarkus.test.junit.QuarkusIntegrationTest;
25+
import java.io.File;
26+
import java.nio.file.Path;
27+
import java.util.Arrays;
28+
import java.util.List;
29+
import org.apache.commons.io.FileUtils;
30+
import org.apache.polaris.service.it.env.IntegrationTestsHelper;
31+
import org.apache.spark.sql.AnalysisException;
32+
import org.apache.spark.sql.Dataset;
33+
import org.apache.spark.sql.Row;
34+
import org.apache.spark.sql.RowFactory;
35+
import org.apache.spark.sql.types.DataTypes;
36+
import org.apache.spark.sql.types.Metadata;
37+
import org.apache.spark.sql.types.StructField;
38+
import org.apache.spark.sql.types.StructType;
39+
import org.junit.jupiter.api.AfterEach;
40+
import org.junit.jupiter.api.BeforeEach;
41+
import org.junit.jupiter.api.Test;
42+
import org.junit.jupiter.api.io.TempDir;
43+
44+
@QuarkusIntegrationTest
45+
public class SparkHudiIT extends SparkIntegrationBase {
46+
private String defaultNs;
47+
private String tableRootDir;
48+
49+
private String getTableLocation(String tableName) {
50+
return String.format("%s/%s", tableRootDir, tableName);
51+
}
52+
53+
private String getTableNameWithRandomSuffix() {
54+
return generateName("huditb");
55+
}
56+
57+
@BeforeEach
58+
public void createDefaultResources(@TempDir Path tempDir) {
59+
spark.sparkContext().setLogLevel("WARN");
60+
defaultNs = generateName("hudi");
61+
// create a default namespace
62+
sql("CREATE NAMESPACE %s", defaultNs);
63+
sql("USE NAMESPACE %s", defaultNs);
64+
tableRootDir =
65+
IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath();
66+
}
67+
68+
@AfterEach
69+
public void cleanupHudiData() {
70+
// clean up hudi data
71+
File dirToDelete = new File(tableRootDir);
72+
FileUtils.deleteQuietly(dirToDelete);
73+
sql("DROP NAMESPACE %s", defaultNs);
74+
}
75+
76+
@Test
77+
public void testBasicTableOperations() {
78+
// create a regular hudi table
79+
String huditb1 = "huditb1";
80+
sql(
81+
"CREATE TABLE %s (id INT, name STRING) USING HUDI LOCATION '%s'",
82+
huditb1, getTableLocation(huditb1));
83+
sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", huditb1);
84+
List<Object[]> results = sql("SELECT * FROM %s WHERE id > 1 ORDER BY id DESC", huditb1);
85+
assertThat(results.size()).isEqualTo(1);
86+
assertThat(results.get(0)).isEqualTo(new Object[] {2, "bob"});
87+
88+
// create a hudi table with partition
89+
String huditb2 = "huditb2";
90+
sql(
91+
"CREATE TABLE %s (name String, age INT, country STRING) USING HUDI PARTITIONED BY (country) LOCATION '%s'",
92+
huditb2, getTableLocation(huditb2));
93+
sql(
94+
"INSERT INTO %s VALUES ('anna', 10, 'US'), ('james', 32, 'US'), ('yan', 16, 'CHINA')",
95+
huditb2);
96+
results = sql("SELECT name, country FROM %s ORDER BY age", huditb2);
97+
assertThat(results.size()).isEqualTo(3);
98+
assertThat(results.get(0)).isEqualTo(new Object[] {"anna", "US"});
99+
assertThat(results.get(1)).isEqualTo(new Object[] {"yan", "CHINA"});
100+
assertThat(results.get(2)).isEqualTo(new Object[] {"james", "US"});
101+
102+
// verify the partition dir is created
103+
List<String> subDirs = listDirs(getTableLocation(huditb2));
104+
assertThat(subDirs).contains(".hoodie", "country=CHINA", "country=US");
105+
106+
// test listTables
107+
List<Object[]> tables = sql("SHOW TABLES");
108+
assertThat(tables.size()).isEqualTo(2);
109+
assertThat(tables)
110+
.contains(
111+
new Object[] {defaultNs, huditb1, false}, new Object[] {defaultNs, huditb2, false});
112+
113+
sql("DROP TABLE %s", huditb1);
114+
sql("DROP TABLE %s", huditb2);
115+
tables = sql("SHOW TABLES");
116+
assertThat(tables.size()).isEqualTo(0);
117+
}
118+
119+
@Test
120+
public void testAlterOperations() {
121+
String huditb = getTableNameWithRandomSuffix();
122+
sql(
123+
"CREATE TABLE %s (id INT, name STRING) USING HUDI LOCATION '%s'",
124+
huditb, getTableLocation(huditb));
125+
sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", huditb);
126+
127+
// test alter columns
128+
// add two new columns to the table
129+
sql("Alter TABLE %s ADD COLUMNS (city STRING, age INT)", huditb);
130+
// add one more row to the table
131+
sql("INSERT INTO %s VALUES (3, 'john', 'SFO', 20)", huditb);
132+
// verify the table now have 4 columns with correct result
133+
List<Object[]> results = sql("SELECT * FROM %s ORDER BY id", huditb);
134+
assertThat(results.size()).isEqualTo(3);
135+
assertThat(results).contains(new Object[] {1, "anna", null, null});
136+
assertThat(results).contains(new Object[] {2, "bob", null, null});
137+
assertThat(results).contains(new Object[] {3, "john", "SFO", 20});
138+
139+
// drop and rename column require set the hoodie.keep.max.commits property
140+
sql("ALTER TABLE %s SET TBLPROPERTIES ('hoodie.keep.max.commits' = '50')", huditb);
141+
// drop column age
142+
sql("Alter TABLE %s DROP COLUMN age", huditb);
143+
// verify the table now have 3 columns with correct result
144+
results = sql("SELECT * FROM %s ORDER BY id", huditb);
145+
assertThat(results.size()).isEqualTo(3);
146+
assertThat(results).contains(new Object[] {1, "anna", null});
147+
assertThat(results).contains(new Object[] {2, "bob", null});
148+
assertThat(results).contains(new Object[] {3, "john", "SFO"});
149+
150+
// rename column city to address
151+
sql("Alter TABLE %s RENAME COLUMN city TO address", huditb);
152+
// verify column address exists
153+
results = sql("SELECT id, address FROM %s ORDER BY id", huditb);
154+
assertThat(results.size()).isEqualTo(3);
155+
assertThat(results).contains(new Object[] {1, null});
156+
assertThat(results).contains(new Object[] {2, null});
157+
assertThat(results).contains(new Object[] {3, "SFO"});
158+
159+
// test alter properties
160+
sql(
161+
"ALTER TABLE %s SET TBLPROPERTIES ('description' = 'people table', 'test-owner' = 'test-user')",
162+
huditb);
163+
List<Object[]> tableInfo = sql("DESCRIBE TABLE EXTENDED %s", huditb);
164+
// find the table properties result
165+
String properties = null;
166+
for (Object[] info : tableInfo) {
167+
if (info[0].equals("Table Properties")) {
168+
properties = (String) info[1];
169+
break;
170+
}
171+
}
172+
assertThat(properties).contains("description=people table,test-owner=test-user");
173+
sql("DROP TABLE %s", huditb);
174+
}
175+
176+
@Test
177+
public void testUnsupportedAlterTableOperations() {
178+
String huditb = getTableNameWithRandomSuffix();
179+
sql(
180+
"CREATE TABLE %s (name String, age INT, country STRING) USING HUDI PARTITIONED BY (country) LOCATION '%s'",
181+
huditb, getTableLocation(huditb));
182+
183+
// ALTER TABLE ... RENAME TO ... fails
184+
assertThatThrownBy(() -> sql("ALTER TABLE %s RENAME TO new_hudi", huditb))
185+
.isInstanceOf(UnsupportedOperationException.class);
186+
187+
// ALTER TABLE ... SET LOCATION ... fails
188+
assertThatThrownBy(() -> sql("ALTER TABLE %s SET LOCATION '/tmp/new/path'", huditb))
189+
.isInstanceOf(AnalysisException.class);
190+
191+
sql("DROP TABLE %s", huditb);
192+
}
193+
194+
@Test
195+
public void testUnsupportedTableCreateOperations() {
196+
String huditb = getTableNameWithRandomSuffix();
197+
// create hudi table with no location
198+
assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING HUDI", huditb))
199+
.isInstanceOf(UnsupportedOperationException.class);
200+
201+
// CTAS fails
202+
assertThatThrownBy(
203+
() ->
204+
sql(
205+
"CREATE TABLE %s USING HUDI LOCATION '%s' AS SELECT 1 AS id",
206+
huditb, getTableLocation(huditb)))
207+
.isInstanceOf(IllegalArgumentException.class);
208+
}
209+
210+
@Test
211+
public void testDataframeSaveOperations() {
212+
List<Row> data = Arrays.asList(RowFactory.create("Alice", 30), RowFactory.create("Bob", 25));
213+
StructType schema =
214+
new StructType(
215+
new StructField[] {
216+
new StructField("name", DataTypes.StringType, false, Metadata.empty()),
217+
new StructField("age", DataTypes.IntegerType, false, Metadata.empty())
218+
});
219+
Dataset<Row> df = spark.createDataFrame(data, schema);
220+
221+
String huditb = getTableNameWithRandomSuffix();
222+
// saveAsTable requires support for hudi requires CTAS support for third party catalog
223+
// in hudi catalog, which is currently not supported.
224+
assertThatThrownBy(
225+
() ->
226+
df.write()
227+
.format("hudi")
228+
.option("path", getTableLocation(huditb))
229+
.saveAsTable(huditb))
230+
.isInstanceOf(IllegalArgumentException.class);
231+
232+
// verify regular dataframe saving still works
233+
df.write().format("hudi").save(getTableLocation(huditb));
234+
235+
// verify the partition dir is created
236+
List<String> subDirs = listDirs(getTableLocation(huditb));
237+
assertThat(subDirs).contains(".hoodie");
238+
239+
// verify we can create a table out of the exising hudi location
240+
sql("CREATE TABLE %s USING HUDI LOCATION '%s'", huditb, getTableLocation(huditb));
241+
List<Object[]> tables = sql("SHOW TABLES");
242+
assertThat(tables.size()).isEqualTo(1);
243+
assertThat(tables).contains(new Object[] {defaultNs, huditb, false});
244+
245+
sql("INSERT INTO %s VALUES ('Anna', 11)", huditb);
246+
247+
List<Object[]> results = sql("SELECT * FROM %s ORDER BY name", huditb);
248+
assertThat(results.size()).isEqualTo(3);
249+
assertThat(results.get(0)).isEqualTo(new Object[] {"Alice", 30});
250+
assertThat(results.get(1)).isEqualTo(new Object[] {"Anna", 11});
251+
assertThat(results.get(2)).isEqualTo(new Object[] {"Bob", 25});
252+
253+
sql("DROP TABLE %s", huditb);
254+
}
255+
}

plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String
3939
return builder
4040
.config(
4141
"spark.sql.extensions",
42-
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension")
42+
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
4343
.config(
4444
"spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
4545
.config(

plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,12 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
193193
// using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET FILEFORMAT.
194194
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
195195
return deltaCatalog.alterTable(ident, changes);
196+
} else if (PolarisCatalogUtils.useHudi(provider)) {
197+
// check to see if this alters hudi metadata
198+
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
199+
return hudiCatalog.alterTable(ident, changes);
196200
}
201+
197202
return this.polarisSparkCatalog.alterTable(ident);
198203
}
199204
}

plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -80,37 +80,14 @@ public static boolean isTableWithSparkManagedLocation(Map<String, String> proper
8080
* Load spark table using DataSourceV2.
8181
*
8282
* @return V2Table if DataSourceV2 is available for the table format. For delta table, it returns
83-
* DeltaTableV2. For hudi it should return HoodieInternalV2Table
83+
* DeltaTableV2. For hudi it should return HoodieInternalV2Table.
8484
*/
8585
public static Table loadSparkTable(GenericTable genericTable, Identifier identifier) {
86-
SparkSession sparkSession = SparkSession.active();
87-
if (genericTable.getFormat().toLowerCase(Locale.getDefault()).equals("hudi")) {
88-
// Hudi does not use table provider interface so will need to catch it here
89-
Map<String, String> tableProperties = Maps.newHashMap();
90-
tableProperties.putAll(genericTable.getProperties());
91-
tableProperties.put(
92-
TABLE_PATH_KEY, genericTable.getProperties().get(TableCatalog.PROP_LOCATION));
93-
94-
TableIdentifier tableIdentifier =
95-
new TableIdentifier(identifier.name(), Option.apply(identifier.namespace()[0]));
96-
97-
CatalogTable catalogTable = null;
98-
try {
99-
catalogTable = sparkSession.sessionState().catalog().getTableMetadata(tableIdentifier);
100-
} catch (NoSuchDatabaseException e) {
101-
throw new RuntimeException(
102-
"No database found for the given tableIdentifier:" + tableIdentifier, e);
103-
} catch (NoSuchTableException e) {
104-
LOG.debug("No table currently exists, initial create table");
105-
}
106-
107-
return new HoodieInternalV2Table(
108-
sparkSession,
109-
genericTable.getProperties().get(TableCatalog.PROP_LOCATION),
110-
Option.apply(catalogTable),
111-
Option.apply(identifier.toString()),
112-
new CaseInsensitiveStringMap(tableProperties));
86+
if (genericTable.getFormat().equalsIgnoreCase("hudi")) {
87+
// hudi does not implement table provider interface, so will need to catch it
88+
return loadHudiSparkTable(genericTable, identifier);
11389
}
90+
SparkSession sparkSession = SparkSession.active();
11491
TableProvider provider =
11592
DataSource.lookupDataSourceV2(genericTable.getFormat(), sparkSession.sessionState().conf())
11693
.get();
@@ -130,6 +107,31 @@ public static Table loadSparkTable(GenericTable genericTable, Identifier identif
130107
provider, new CaseInsensitiveStringMap(tableProperties), scala.Option.empty());
131108
}
132109

110+
public static Table loadHudiSparkTable(GenericTable genericTable, Identifier identifier) {
111+
SparkSession sparkSession = SparkSession.active();
112+
Map<String, String> tableProperties = Maps.newHashMap();
113+
tableProperties.putAll(genericTable.getProperties());
114+
tableProperties.put(
115+
TABLE_PATH_KEY, genericTable.getProperties().get(TableCatalog.PROP_LOCATION));
116+
String namespacePath = String.join(".", identifier.namespace());
117+
TableIdentifier tableIdentifier = new TableIdentifier(identifier.name(), Option.apply(namespacePath));
118+
CatalogTable catalogTable = null;
119+
try {
120+
catalogTable = sparkSession.sessionState().catalog().getTableMetadata(tableIdentifier);
121+
} catch (NoSuchDatabaseException e) {
122+
throw new RuntimeException(
123+
"No database found for the given tableIdentifier:" + tableIdentifier, e);
124+
} catch (NoSuchTableException e) {
125+
LOG.debug("No table currently exists, as an initial create table");
126+
}
127+
return new HoodieInternalV2Table(
128+
sparkSession,
129+
genericTable.getProperties().get(TableCatalog.PROP_LOCATION),
130+
Option.apply(catalogTable),
131+
Option.apply(identifier.toString()),
132+
new CaseInsensitiveStringMap(tableProperties));
133+
}
134+
133135
/**
134136
* Get the catalogAuth field inside the RESTSessionCatalog used by Iceberg Spark Catalog use
135137
* reflection. TODO: Deprecate this function once the iceberg client is updated to 1.9.0 to use
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.spark;
20+
21+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
22+
import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
23+
import org.apache.spark.sql.connector.catalog.Identifier;
24+
import org.apache.spark.sql.connector.catalog.Table;
25+
import org.apache.spark.sql.connector.catalog.TableChange;
26+
27+
/**
28+
* This is a fake hudi catalog class that is used for testing. This class is a noop class that
29+
* directly passes all calls to the delegate CatalogPlugin configured as part of
30+
* DelegatingCatalogExtension.
31+
*/
32+
public class NoopHudiCatalog extends DelegatingCatalogExtension {
33+
@Override
34+
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
35+
return super.loadTable(ident);
36+
}
37+
}

0 commit comments

Comments
 (0)