diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index 7020d705cac1..dc50a1e34010 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -57,7 +57,6 @@ import org.apache.iceberg.Scan; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.SerializableTable; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; @@ -199,7 +198,12 @@ public List getSplits(JobContext context) { Configuration conf = context.getConfiguration(); Table table = Optional .ofNullable(HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER))) - .orElseGet(() -> Catalogs.loadTable(conf)); + .orElseGet(() -> { + Table tbl = Catalogs.loadTable(conf); + conf.set(InputFormatConfig.TABLE_IDENTIFIER, tbl.name()); + conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tbl.name(), SerializationUtil.serializeToBase64(tbl)); + return tbl; + }); List splits = Lists.newArrayList(); boolean applyResidual = !conf.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false); @@ -215,14 +219,13 @@ public List getSplits(JobContext context) { } try (CloseableIterable tasksIterable = scan.planTasks()) { - Table serializableTable = SerializableTable.copyOf(table); tasksIterable.forEach(task -> { if (applyResidual && (model == InputFormatConfig.InMemoryDataModel.HIVE || model == InputFormatConfig.InMemoryDataModel.PIG)) { // TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet checkResiduals(task); } - splits.add(new IcebergSplit(serializableTable, conf, task)); + splits.add(new IcebergSplit(conf, task)); }); } catch (IOException e) { throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e); @@ -296,7 +299,8 @@ public void initialize(InputSplit split, TaskAttemptContext newContext) { CombinedScanTask task = ((IcebergSplit) split).task(); this.context = newContext; this.conf = newContext.getConfiguration(); - this.table = ((IcebergSplit) split).table(); + this.table = SerializationUtil.deserializeFromBase64( + conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + conf.get(InputFormatConfig.TABLE_IDENTIFIER))); HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table); this.tasks = task.files().iterator(); this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java index 8bc332eaa431..420bf9ea3b18 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java @@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.util.SerializationUtil; @@ -37,7 +36,6 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred public static final String[] ANYWHERE = new String[]{"*"}; - private Table table; private CombinedScanTask task; private transient String[] locations; @@ -47,8 +45,7 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred public IcebergSplit() { } - IcebergSplit(Table table, Configuration conf, CombinedScanTask task) { - this.table = table; + IcebergSplit(Configuration conf, CombinedScanTask task) { this.task = task; this.conf = conf; } @@ -83,10 +80,6 @@ public String[] getLocations() { @Override public void write(DataOutput out) throws IOException { - byte[] tableData = SerializationUtil.serializeToBytes(table); - out.writeInt(tableData.length); - out.write(tableData); - byte[] data = SerializationUtil.serializeToBytes(this.task); out.writeInt(data.length); out.write(data); @@ -94,16 +87,8 @@ public void write(DataOutput out) throws IOException { @Override public void readFields(DataInput in) throws IOException { - byte[] tableData = new byte[in.readInt()]; - in.readFully(tableData); - this.table = SerializationUtil.deserializeFromBytes(tableData); - byte[] data = new byte[in.readInt()]; in.readFully(data); this.task = SerializationUtil.deserializeFromBytes(data); } - - public Table table() { - return table; - } }