Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
lirui-apache committed Aug 2, 2019
1 parent 01a11b8 commit a20e191
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
private transient int numNonPartitionColumns;

// SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make sure to use a common base class
private transient Serializer serializer;
private transient Serializer recordSerDe;
//StructObjectInspector represents the hive row structure.
private transient StructObjectInspector rowObjectInspector;
private transient Class<? extends Writable> outputClass;
Expand Down Expand Up @@ -259,13 +259,13 @@ public void configure(Configuration parameters) {
public void open(int taskNumber, int numTasks) throws IOException {
try {
StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
serializer = (Serializer) Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
Preconditions.checkArgument(serializer instanceof Deserializer,
"Expect to get a SerDe, but actually got " + serializer.getClass().getName());
ReflectionUtils.setConf(serializer, jobConf);
recordSerDe = (Serializer) Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
Preconditions.checkArgument(recordSerDe instanceof Deserializer,
"Expect to get a SerDe, but actually got " + recordSerDe.getClass().getName());
ReflectionUtils.setConf(recordSerDe, jobConf);
// TODO: support partition properties, for now assume they're same as table properties
SerDeUtils.initializeSerDe((Deserializer) serializer, jobConf, tableProperties, null);
outputClass = serializer.getSerializedClass();
SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
outputClass = recordSerDe.getSerializedClass();
} catch (IllegalAccessException | SerDeException | InstantiationException | ClassNotFoundException e) {
throw new FlinkRuntimeException("Error initializing Hive serializer", e);
}
Expand Down Expand Up @@ -335,7 +335,7 @@ public void writeRecord(Row record) throws IOException {
partitionToWriter.put(partName, partitionWriter);
}
}
partitionWriter.recordWriter.write(serializer.serialize(getConvertedRow(record), rowObjectInspector));
partitionWriter.recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector));
} catch (IOException | SerDeException e) {
throw new IOException("Could not write Record.", e);
} catch (MetaException e) {
Expand Down

0 comments on commit a20e191

Please sign in to comment.