Skip to content

Commit

Permalink
[FLINK-13192][hive] Add tests for different Hive table formats
Browse files Browse the repository at this point in the history
This closes apache#9264
  • Loading branch information
lirui-apache authored and KurtYoung committed Aug 6, 2019
1 parent 789c605 commit 6c25805
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 16 deletions.
6 changes: 1 addition & 5 deletions flink-connectors/flink-connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,7 @@ under the License.
<artifactId>tez-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.tez</groupId>
<artifactId>tez-mapreduce</artifactId>
</exclusion>
<exclusion>
<!-- This dependency is not available with java 9.-->
<!-- This dependency is no longer shipped with the JDK since Java 9.-->
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
Expand Down Expand Up @@ -122,7 +123,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
// number of non-partitioning columns
private transient int numNonPartitionColumns;

private transient AbstractSerDe serializer;
// SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make sure to use a common base class
private transient Serializer recordSerDe;
//StructObjectInspector represents the hive row structure.
private transient StructObjectInspector rowObjectInspector;
private transient Class<? extends Writable> outputClass;
Expand Down Expand Up @@ -257,11 +259,14 @@ public void configure(Configuration parameters) {
public void open(int taskNumber, int numTasks) throws IOException {
try {
StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
serializer = (AbstractSerDe) Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
ReflectionUtils.setConf(serializer, jobConf);
Object serdeLib = Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got " + serdeLib.getClass().getName());
recordSerDe = (Serializer) serdeLib;
ReflectionUtils.setConf(recordSerDe, jobConf);
// TODO: support partition properties, for now assume they're same as table properties
SerDeUtils.initializeSerDe(serializer, jobConf, tableProperties, null);
outputClass = serializer.getSerializedClass();
SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
outputClass = recordSerDe.getSerializedClass();
} catch (IllegalAccessException | SerDeException | InstantiationException | ClassNotFoundException e) {
throw new FlinkRuntimeException("Error initializing Hive serializer", e);
}
Expand Down Expand Up @@ -331,7 +336,7 @@ public void writeRecord(Row record) throws IOException {
partitionToWriter.put(partName, partitionWriter);
}
}
partitionWriter.recordWriter.write(serializer.serialize(getConvertedRow(record), rowObjectInspector));
partitionWriter.recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector));
} catch (IOException | SerDeException e) {
throw new IOException("Could not write Record.", e);
} catch (MetaException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -323,7 +324,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
checkNotNull(tablePath, "tablePath cannot be null");

Table hiveTable = getHiveTable(tablePath);
return instantiateCatalogTable(hiveTable);
return instantiateCatalogTable(hiveTable, hiveConf);
}

@Override
Expand Down Expand Up @@ -394,7 +395,7 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, b
return;
}

CatalogBaseTable existingTable = instantiateCatalogTable(hiveTable);
CatalogBaseTable existingTable = instantiateCatalogTable(hiveTable, hiveConf);

if (existingTable.getClass() != newCatalogTable.getClass()) {
throw new CatalogException(
Expand Down Expand Up @@ -493,7 +494,7 @@ public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
}
}

private static CatalogBaseTable instantiateCatalogTable(Table hiveTable) {
private static CatalogBaseTable instantiateCatalogTable(Table hiveTable, HiveConf hiveConf) {
boolean isView = TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW;

// Table properties
Expand All @@ -506,8 +507,22 @@ private static CatalogBaseTable instantiateCatalogTable(Table hiveTable) {
String comment = properties.remove(HiveCatalogConfig.COMMENT);

// Table schema
List<FieldSchema> fields;
if (org.apache.hadoop.hive.ql.metadata.Table.hasMetastoreBasedSchema(hiveConf,
hiveTable.getSd().getSerdeInfo().getSerializationLib())) {
// get schema from metastore
fields = hiveTable.getSd().getCols();
} else {
// get schema from deserializer
try {
fields = MetaStoreUtils.getFieldsFromDeserializer(hiveTable.getTableName(),
MetaStoreUtils.getDeserializer(hiveConf, hiveTable, true));
} catch (SerDeException | MetaException e) {
throw new CatalogException("Failed to get Hive table schema from deserializer", e);
}
}
TableSchema tableSchema =
HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
HiveTableUtil.createTableSchema(fields, hiveTable.getPartitionKeys());

// Partition keys
List<String> partitionKeys = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.junit.runner.RunWith;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -100,10 +102,50 @@ public void testGetNonExistingFunction() throws Exception {
hiveShell.execute("drop database db1 cascade");
}

@Test
public void testDifferentFormats() throws Exception {
String[] formats = new String[]{"orc", "parquet", "sequencefile", "csv"};
for (String format : formats) {
readWriteFormat(format);
}
}

private void readWriteFormat(String format) throws Exception {
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();

hiveShell.execute("create database db1");

// create source and dest tables
String suffix;
if (format.equals("csv")) {
suffix = "row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'";
} else {
suffix = "stored as " + format;
}
hiveShell.execute("create table db1.src (i int,s string) " + suffix);
hiveShell.execute("create table db1.dest (i int,s string) " + suffix);

// prepare source data with Hive
hiveShell.execute("insert into db1.src values (1,'a'),(2,'b')");

// populate dest table with source table
tableEnv.sqlUpdate("insert into db1.dest select * from db1.src");
tableEnv.execute("test_" + format);

// verify data on hive side
verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb"));

hiveShell.execute("drop database db1 cascade");
}

private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
return tableEnv;
}

private void verifyHiveQueryResult(String query, List<String> expected) {
assertEquals(new HashSet<>(expected), new HashSet<>(hiveShell.executeQuery(query)));
}
}

0 comments on commit 6c25805

Please sign in to comment.