Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-13192][hive] Add tests for different Hive table formats #9264

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions flink-connectors/flink-connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,6 @@ under the License.
<groupId>org.apache.tez</groupId>
<artifactId>tez-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.tez</groupId>
<artifactId>tez-mapreduce</artifactId>
</exclusion>
<exclusion>
<!-- This dependency is no longer shipped with the JDK since Java 9.-->
<groupId>jdk.tools</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
Expand Down Expand Up @@ -122,7 +123,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
// number of non-partitioning columns
private transient int numNonPartitionColumns;

private transient AbstractSerDe serializer;
// SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make sure to use a common base class
private transient Serializer recordSerDe;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the type here should be just "Object".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has to be a serializer because we need it to serialize records. Besides, using Object means we have to use reflection to call the serialize method. And if we do this for each record, it might hurt performance.

//StructObjectInspector represents the hive row structure.
private transient StructObjectInspector rowObjectInspector;
private transient Class<? extends Writable> outputClass;
Expand Down Expand Up @@ -257,11 +259,14 @@ public void configure(Configuration parameters) {
public void open(int taskNumber, int numTasks) throws IOException {
try {
StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
serializer = (AbstractSerDe) Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
ReflectionUtils.setConf(serializer, jobConf);
Object serdeLib = Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got " + serdeLib.getClass().getName());
recordSerDe = (Serializer) serdeLib;
ReflectionUtils.setConf(recordSerDe, jobConf);
// TODO: support partition properties, for now assume they're same as table properties
SerDeUtils.initializeSerDe(serializer, jobConf, tableProperties, null);
outputClass = serializer.getSerializedClass();
SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
outputClass = recordSerDe.getSerializedClass();
} catch (IllegalAccessException | SerDeException | InstantiationException | ClassNotFoundException e) {
throw new FlinkRuntimeException("Error initializing Hive serializer", e);
}
Expand Down Expand Up @@ -331,7 +336,7 @@ public void writeRecord(Row record) throws IOException {
partitionToWriter.put(partName, partitionWriter);
}
}
partitionWriter.recordWriter.write(serializer.serialize(getConvertedRow(record), rowObjectInspector));
partitionWriter.recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector));
} catch (IOException | SerDeException e) {
throw new IOException("Could not write Record.", e);
} catch (MetaException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -323,7 +324,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
checkNotNull(tablePath, "tablePath cannot be null");

Table hiveTable = getHiveTable(tablePath);
return instantiateCatalogTable(hiveTable);
return instantiateCatalogTable(hiveTable, hiveConf);
}

@Override
Expand Down Expand Up @@ -394,7 +395,7 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, b
return;
}

CatalogBaseTable existingTable = instantiateCatalogTable(hiveTable);
CatalogBaseTable existingTable = instantiateCatalogTable(hiveTable, hiveConf);

if (existingTable.getClass() != newCatalogTable.getClass()) {
throw new CatalogException(
Expand Down Expand Up @@ -493,7 +494,7 @@ public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
}
}

private static CatalogBaseTable instantiateCatalogTable(Table hiveTable) {
private static CatalogBaseTable instantiateCatalogTable(Table hiveTable, HiveConf hiveConf) {
boolean isView = TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW;

// Table properties
Expand All @@ -506,8 +507,22 @@ private static CatalogBaseTable instantiateCatalogTable(Table hiveTable) {
String comment = properties.remove(HiveCatalogConfig.COMMENT);

// Table schema
List<FieldSchema> fields;
if (org.apache.hadoop.hive.ql.metadata.Table.hasMetastoreBasedSchema(hiveConf,
hiveTable.getSd().getSerdeInfo().getSerializationLib())) {
lirui-apache marked this conversation as resolved.
Show resolved Hide resolved
// get schema from metastore
fields = hiveTable.getSd().getCols();
} else {
// get schema from deserializer
try {
fields = MetaStoreUtils.getFieldsFromDeserializer(hiveTable.getTableName(),
MetaStoreUtils.getDeserializer(hiveConf, hiveTable, true));
} catch (SerDeException | MetaException e) {
throw new CatalogException("Failed to get Hive table schema from deserializer", e);
}
}
TableSchema tableSchema =
HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
HiveTableUtil.createTableSchema(fields, hiveTable.getPartitionKeys());

// Partition keys
List<String> partitionKeys = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.junit.runner.RunWith;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -100,10 +102,50 @@ public void testGetNonExistingFunction() throws Exception {
hiveShell.execute("drop database db1 cascade");
}

@Test
public void testDifferentFormats() throws Exception {
String[] formats = new String[]{"orc", "parquet", "sequencefile", "csv"};
for (String format : formats) {
readWriteFormat(format);
}
}

private void readWriteFormat(String format) throws Exception {
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();

hiveShell.execute("create database db1");

// create source and dest tables
String suffix;
if (format.equals("csv")) {
suffix = "row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'";
} else {
suffix = "stored as " + format;
}
hiveShell.execute("create table db1.src (i int,s string) " + suffix);
hiveShell.execute("create table db1.dest (i int,s string) " + suffix);

// prepare source data with Hive
hiveShell.execute("insert into db1.src values (1,'a'),(2,'b')");

// populate dest table with source table
tableEnv.sqlUpdate("insert into db1.dest select * from db1.src");
tableEnv.execute("test_" + format);

// verify data on hive side
verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb"));

hiveShell.execute("drop database db1 cascade");
}

private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
return tableEnv;
}

private void verifyHiveQueryResult(String query, List<String> expected) {
assertEquals(new HashSet<>(expected), new HashSet<>(hiveShell.executeQuery(query)));
}
}