Skip to content

Commit

Permalink
Refactor IcebergObjectInspector and implement custom object inspectors (
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jun 26, 2020
1 parent c5b6cd8 commit f7c5c39
Show file tree
Hide file tree
Showing 21 changed files with 1,244 additions and 455 deletions.

This file was deleted.

This file was deleted.

50 changes: 7 additions & 43 deletions mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -38,36 +29,33 @@
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.mr.mapred.serde.objectinspector.IcebergObjectInspector;

public class IcebergSerDe extends AbstractSerDe {

private Schema schema;
private ObjectInspector inspector;
private List<Object> row;

@Override
public void initialize(@Nullable Configuration configuration, Properties serDeProperties) throws SerDeException {
Table table = null;
final Table table;

try {
table = TableResolver.resolveTableFromConfiguration(configuration, serDeProperties);
} catch (IOException e) {
throw new UncheckedIOException("Unable to resolve table from configuration: ", e);
}
this.schema = table.schema();

try {
this.inspector = new IcebergObjectInspectorGenerator().createObjectInspector(schema);
this.inspector = IcebergObjectInspector.create(table.schema());
} catch (Exception e) {
throw new SerDeException(e);
}
}

@Override
public Class<? extends Writable> getSerializedClass() {
return null;
return IcebergWritable.class;
}

@Override
Expand All @@ -82,31 +70,7 @@ public SerDeStats getSerDeStats() {

@Override
public Object deserialize(Writable writable) {
IcebergWritable icebergWritable = (IcebergWritable) writable;
List<Types.NestedField> fields = icebergWritable.schema().columns();

if (row == null || row.size() != fields.size()) {
row = new ArrayList<Object>(fields.size());
} else {
row.clear();
}
for (int i = 0; i < fields.size(); i++) {
Object obj = ((IcebergWritable) writable).record().get(i);
Type fieldType = fields.get(i).type();
if (fieldType.equals(Types.DateType.get())) {
row.add(Date.valueOf((LocalDate) obj));
} else if (fieldType.equals(Types.TimestampType.withoutZone())) {
row.add(Timestamp.valueOf((LocalDateTime) obj));
} else if (fieldType.equals(Types.TimestampType.withZone())) {
LocalDateTime timestamp = ((OffsetDateTime) obj).toLocalDateTime();
row.add(Timestamp.valueOf(timestamp));
} else if (fieldType.equals(Types.TimeType.get())) {
row.add(((LocalTime) obj).toString());
} else {
row.add(obj);
}
}
return Collections.unmodifiableList(row);
return ((IcebergWritable) writable).record();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public class IcebergWritable implements Writable {
private Record record;
private Schema schema;

public IcebergWritable(Record record, Schema schema) {
this.record = record;
this.schema = schema;
}

@SuppressWarnings("checkstyle:HiddenField")
public void wrapRecord(Record record) {
this.record = record;
Expand Down
21 changes: 10 additions & 11 deletions mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
package org.apache.iceberg.mr.mapred;

import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -45,27 +45,26 @@ static Table resolveTableFromJob(JobConf conf) throws IOException {

static Table resolveTableFromConfiguration(Configuration conf, Properties properties) throws IOException {
String catalogName = properties.getProperty(InputFormatConfig.CATALOG_NAME, InputFormatConfig.HADOOP_TABLES);
String tableLocation = properties.getProperty(InputFormatConfig.TABLE_LOCATION);
String tableName = properties.getProperty(InputFormatConfig.TABLE_NAME);
Preconditions.checkNotNull(tableLocation, "Table location is not set.");
Preconditions.checkNotNull(tableName, "Table name is not set.");

switch (catalogName) {
case InputFormatConfig.HADOOP_TABLES:
String tableLocation = properties.getProperty(InputFormatConfig.TABLE_LOCATION);
Preconditions.checkNotNull(tableLocation, "Table location is not set.");
HadoopTables tables = new HadoopTables(conf);
return tables.load(tableLocation);

case InputFormatConfig.HIVE_CATALOG:
String tableName = properties.getProperty(InputFormatConfig.TABLE_NAME);
Preconditions.checkNotNull(tableName, "Table name is not set.");
//TODO Implement HiveCatalog
return null;
default:
throw new NoSuchTableException("Table does not exist at location: " + tableLocation);
throw new RuntimeException("Catalog " + catalogName + " not supported.");
}
}

protected static String extractProperty(JobConf conf, String key) {
String value = conf.get(key);
if (value == null) {
throw new IllegalArgumentException("Property not set in JobConf: " + key);
}
return value;
return Optional.ofNullable(conf.get(key))
.orElseThrow(() -> new IllegalArgumentException("Property not set in JobConf: " + key));
}
}

0 comments on commit f7c5c39

Please sign in to comment.