Skip to content
Permalink
Browse files
HIVE-25162: Add support for CREATE TABLE ... STORED BY ICEBERG statem…
…ents (#2317) (Laszlo Pinter, reviewed by Marton Bod and Peter Vary)
  • Loading branch information
lcspinter committed May 31, 2021
1 parent fd029c5 commit a143f491ffb8c33171c97c8b824ceb413ebeb831
Showing 14 changed files with 229 additions and 33 deletions.
@@ -348,6 +348,7 @@ private void updateHmsTableProperties(org.apache.hadoop.hive.metastore.api.Table
* <li>The base of the properties is the properties stored at the Hive Metastore for the given table
* <li>We add the {@link Catalogs#LOCATION} as the table location
* <li>We add the {@link Catalogs#NAME} as TableIdentifier defined by the database name and table name
* <li>We add the serdeProperties of the HMS table
* <li>We remove some parameters that we don't want to push down to the Iceberg table props
* </ul>
* @param hmsTable Table for which we are calculating the properties
@@ -371,6 +372,15 @@ private static Properties getCatalogProperties(org.apache.hadoop.hive.metastore.
properties.put(Catalogs.NAME, TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()).toString());
}

SerDeInfo serdeInfo = hmsTable.getSd().getSerdeInfo();
if (serdeInfo != null) {
serdeInfo.getParameters().entrySet().stream()
.filter(e -> e.getKey() != null && e.getValue() != null).forEach(e -> {
String icebergKey = HiveTableOperations.translateToIcebergProp(e.getKey());
properties.put(icebergKey, e.getValue());
});
}

// Remove HMS table parameters we don't want to propagate to Iceberg
PROPERTIES_TO_REMOVE.forEach(properties::remove);

@@ -193,7 +193,7 @@ public void testCreateTableWithColumnSpecification() throws IOException {
String createSql = "CREATE EXTERNAL TABLE " + identifier +
" (customer_id BIGINT, first_name STRING COMMENT 'This is first name', " +
"last_name STRING COMMENT 'This is last name')" +
" STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
" STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of());
runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
@@ -212,7 +212,7 @@ public void testCreateTableWithColumnSpecificationPartitioned() throws IOExcepti
String createSql = "CREATE EXTERNAL TABLE " + identifier +
" (customer_id BIGINT, first_name STRING COMMENT 'This is first name') " +
"PARTITIONED BY (last_name STRING COMMENT 'This is last name') STORED BY " +
"'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of());
runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, data);
@@ -228,7 +228,7 @@ public void testCreatePartitionedTableByProperty() throws IOException {
Row.of("Green"), Collections.singletonList(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.get(1)),
Row.of("Pink"), Collections.singletonList(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.get(2)));
String createSql = "CREATE EXTERNAL TABLE " + identifier +
" STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
" STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " +
"'" + InputFormatConfig.TABLE_SCHEMA + "'='" +
@@ -249,7 +249,7 @@ public void testCreateTableWithColumnSpecificationMultilevelPartitioned() throws
String createSql = "CREATE EXTERNAL TABLE " + identifier + " (customer_id BIGINT) " +
"PARTITIONED BY (first_name STRING COMMENT 'This is first name', " +
"last_name STRING COMMENT 'This is last name') " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of());
runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, data);
@@ -33,6 +33,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
@@ -159,7 +160,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc
TableIdentifier identifier = TableIdentifier.of("default", "customers");

shell.executeStatement("CREATE EXTERNAL TABLE customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
@@ -221,7 +222,7 @@ public void testCreateDropTableNonDefaultCatalog() throws TException, Interrupte
String createSql = "CREATE EXTERNAL TABLE " + identifier +
" (customer_id BIGINT, first_name STRING COMMENT 'This is first name'," +
" last_name STRING COMMENT 'This is last name')" +
" STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
" STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of());
shell.executeStatement(createSql);
@@ -239,12 +240,40 @@ public void testCreateDropTableNonDefaultCatalog() throws TException, Interrupte
);
}

@Test
public void testCreateTableStoredByIceberg() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
String query = String.format("CREATE EXTERNAL TABLE customers (customer_id BIGINT, first_name STRING, last_name " +
"STRING) STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')",
testTables.locationForCreateTableSQL(identifier),
InputFormatConfig.CATALOG_NAME,
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
shell.executeStatement(query);
Assert.assertNotNull(testTables.loadTable(identifier));
}

@Test
public void testCreateTableStoredByIcebergWithSerdeProperties() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
String query = String.format("CREATE EXTERNAL TABLE customers (customer_id BIGINT, first_name STRING, last_name " +
"STRING) STORED BY iceberg WITH SERDEPROPERTIES('%s'='%s') %s TBLPROPERTIES ('%s'='%s')",
TableProperties.DEFAULT_FILE_FORMAT,
"orc",
testTables.locationForCreateTableSQL(identifier),
InputFormatConfig.CATALOG_NAME,
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
shell.executeStatement(query);
Table table = testTables.loadTable(identifier);
Assert.assertNotNull(table);
Assert.assertEquals("orc", table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
}

@Test
public void testCreateTableWithoutSpec() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");

shell.executeStatement("CREATE EXTERNAL TABLE customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','" +
@@ -260,7 +289,7 @@ public void testCreateTableWithUnpartitionedSpec() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
// We need the location for HadoopTable based tests only
shell.executeStatement("CREATE EXTERNAL TABLE customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
@@ -278,7 +307,7 @@ public void testDeleteBackingTable() throws TException, IOException, Interrupted
TableIdentifier identifier = TableIdentifier.of("default", "customers");

shell.executeStatement("CREATE EXTERNAL TABLE customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
@@ -324,7 +353,7 @@ public void testCreateTableError() {
AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
"Unrecognized token 'WrongSchema'", () -> {
shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='WrongSchema'" +
",'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
@@ -335,7 +364,7 @@ public void testCreateTableError() {
AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
"Please provide ", () -> {
shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
}
@@ -346,7 +375,7 @@ public void testCreateTableError() {
AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
"Table location not set", () -> {
shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','" +
InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
@@ -366,7 +395,7 @@ public void testCreateTableAboveExistingTable() throws IOException {
AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
"customers already exists", () -> {
shell.executeStatement("CREATE EXTERNAL TABLE customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "',' " +
InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
@@ -375,7 +404,7 @@ public void testCreateTableAboveExistingTable() throws IOException {
} else {
// With other catalogs, table creation should succeed
shell.executeStatement("CREATE EXTERNAL TABLE customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers")) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
}
@@ -390,7 +419,7 @@ public void testCreatePartitionedTableWithPropertiesAndWithColumnSpecification()
"Provide only one of the following", () -> {
shell.executeStatement("CREATE EXTERNAL TABLE customers (customer_id BIGINT) " +
"PARTITIONED BY (first_name STRING) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers")) +
" TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" +
PartitionSpecParser.toJson(spec) + "')");
@@ -409,7 +438,7 @@ public void testCreateTableWithColumnSpecificationHierarchy() {
"memorable_moments MAP < STRING, STRUCT < year: INT, place: STRING, details: STRING >>, " +
"current_address STRUCT < street_address: STRUCT " +
"<street_number: INT, street_name: STRING, street_type: STRING>, country: STRING, postal_code: STRING >) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of()));

@@ -438,7 +467,7 @@ public void testCreateTableWithAllSupportedTypes() {
shell.executeStatement("CREATE EXTERNAL TABLE all_types (" +
"t_Float FLOaT, t_dOuble DOUBLE, t_boolean BOOLEAN, t_int INT, t_bigint BIGINT, t_binary BINARY, " +
"t_string STRING, t_timestamp TIMESTAMP, t_date DATE, t_decimal DECIMAL(3,2)) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of()));

@@ -462,7 +491,7 @@ public void testCreateTableWithNotSupportedTypes() {
"Unsupported Hive type", () -> {
shell.executeStatement("CREATE EXTERNAL TABLE not_supported_types " +
"(not_supported " + notSupportedType + ") " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
}
@@ -484,7 +513,7 @@ public void testCreateTableWithNotSupportedTypesWithAutoConversion() {

for (String notSupportedType : notSupportedTypes.keySet()) {
shell.executeStatement("CREATE EXTERNAL TABLE not_supported_types (not_supported " + notSupportedType + ") " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of()));

@@ -500,7 +529,7 @@ public void testCreateTableWithColumnComments() {
shell.executeStatement("CREATE EXTERNAL TABLE comment_table (" +
"t_int INT COMMENT 'int column', " +
"t_string STRING COMMENT 'string column') " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -520,7 +549,7 @@ public void testCreateTableWithoutColumnComments() {
shell.executeStatement("CREATE EXTERNAL TABLE without_comment_table (" +
"t_int INT, " +
"t_string STRING) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -541,7 +570,7 @@ public void testAlterTableProperties() {
shell.executeStatement("CREATE EXTERNAL TABLE customers (" +
"t_int INT, " +
"t_string STRING) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
String propKey = "dummy";
@@ -571,7 +600,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
TableIdentifier identifier = TableIdentifier.of("default", "customers");

shell.executeStatement(String.format("CREATE EXTERNAL TABLE default.customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' %s" +
"STORED BY ICEBERG %s" +
"TBLPROPERTIES ('%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s')",
testTables.locationForCreateTableSQL(identifier), // we need the location for HadoopTable based tests only
InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA),
@@ -587,6 +616,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
expectedIcebergProperties.put("custom_property", "initial_val");
expectedIcebergProperties.put("EXTERNAL", "TRUE");
expectedIcebergProperties.put("storage_handler", HiveIcebergStorageHandler.class.getName());
expectedIcebergProperties.put(serdeConstants.SERIALIZATION_FORMAT, "1");

// Check the HMS table parameters
org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
@@ -606,7 +636,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
Assert.assertEquals(expectedIcebergProperties, icebergTable.properties());

if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
Assert.assertEquals(10, hmsParams.size());
Assert.assertEquals(11, hmsParams.size());
Assert.assertEquals("initial_val", hmsParams.get("custom_property"));
Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE));
Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL"));
@@ -620,6 +650,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
Assert.assertNull(hmsParams.get(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP));
Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME));
Assert.assertNotNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC));
Assert.assertNotNull(hmsParams.get(serdeConstants.SERIALIZATION_FORMAT));
} else {
Assert.assertEquals(8, hmsParams.size());
Assert.assertNull(hmsParams.get(TableProperties.ENGINE_HIVE_ENABLED));
@@ -644,7 +675,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
Assert.assertEquals(13, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
Assert.assertEquals(14, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
Assert.assertEquals("true", hmsParams.get("new_prop_1"));
Assert.assertEquals("false", hmsParams.get("new_prop_2"));
Assert.assertEquals("new_val", hmsParams.get("custom_property"));
@@ -690,7 +721,7 @@ public void testIcebergHMSPropertiesTranslation() throws Exception {

// Create HMS table with with a property to be translated
shell.executeStatement(String.format("CREATE EXTERNAL TABLE default.customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'" +
"STORED BY ICEBERG " +
"TBLPROPERTIES ('%s'='%s', '%s'='%s', '%s'='%s')",
InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA),
InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC),

0 comments on commit a143f49

Please sign in to comment.