From 07c3c5d797f612f9b73b2805b65275c2029c2442 Mon Sep 17 00:00:00 2001 From: leesf <490081539@qq.com> Date: Sun, 29 Mar 2020 16:54:00 +0800 Subject: [PATCH] [HUDI-679] Make io package Spark free (#1460) * [HUDI-679] Make io package Spark free --- .../org/apache/hudi/cli/SparkHelpers.scala | 3 +- .../hudi/client/SparkTaskContextSupplier.java | 42 +++++++++++++++++++ .../hudi/execution/BulkInsertMapFunction.java | 2 +- .../CopyOnWriteLazyInsertIterable.java | 10 +++-- .../MergeOnReadLazyInsertIterable.java | 9 ++-- .../apache/hudi/io/HoodieAppendHandle.java | 14 +++---- .../apache/hudi/io/HoodieCreateHandle.java | 14 +++---- .../org/apache/hudi/io/HoodieMergeHandle.java | 14 +++---- .../org/apache/hudi/io/HoodieWriteHandle.java | 25 ++++++++--- .../hudi/io/storage/HoodieParquetWriter.java | 10 +++-- .../storage/HoodieStorageWriterFactory.java | 13 +++--- .../hudi/table/HoodieCopyOnWriteTable.java | 8 ++-- .../hudi/table/HoodieMergeOnReadTable.java | 4 +- .../org/apache/hudi/table/HoodieTable.java | 7 ++++ .../client/TestUpdateSchemaEvolution.java | 4 +- .../hudi/common/HoodieClientTestHarness.java | 3 ++ .../hudi/common/HoodieClientTestUtils.java | 4 +- .../TestHoodieStorageWriterFactory.java | 6 ++- .../hudi/table/TestCopyOnWriteTable.java | 2 +- 19 files changed, 136 insertions(+), 58 deletions(-) create mode 100644 hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index 4c8e4c1b089d..6fdac1ce8a27 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -22,6 +22,7 @@ import org.apache.avro.generic.IndexedRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.avro.HoodieAvroWriteSupport +import org.apache.hudi.client.SparkTaskContextSupplier import org.apache.hudi.common.HoodieJsonPayload import org.apache.hudi.common.bloom.filter.{BloomFilter, BloomFilterFactory} import org.apache.hudi.common.model.HoodieRecord @@ -45,7 +46,7 @@ object SparkHelpers { HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE); val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter) val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble) - val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema) + val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier()) for (rec <- sourceRecords) { val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString if (!keysToSkip.contains(key)) { diff --git a/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java b/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java new file mode 100644 index 000000000000..601dd98a2ad6 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.spark.TaskContext; + +import java.io.Serializable; +import java.util.function.Supplier; + +/** + * Spark task context supplier. + */ +public class SparkTaskContextSupplier implements Serializable { + + public Supplier getPartitionIdSupplier() { + return () -> TaskContext.getPartitionId(); + } + + public Supplier getStageIdSupplier() { + return () -> TaskContext.get().stageId(); + } + + public Supplier getAttemptIdSupplier() { + return () -> TaskContext.get().taskAttemptId(); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java index 249ff3d04c19..5d4391ca4958 100644 --- a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java @@ -51,6 +51,6 @@ public BulkInsertMapFunction(String instantTime, HoodieWriteConfig config, Hoodi @Override public Iterator> call(Integer partition, Iterator> sortedRecordItr) { return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable, - fileIDPrefixes.get(partition)); + fileIDPrefixes.get(partition), hoodieTable.getSparkTaskContextSupplier()); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java index bdcea618d36e..8f98496fce57 100644 --- a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java @@ -18,6 +18,7 @@ package org.apache.hudi.execution; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.LazyIterableIterator; import org.apache.hudi.common.model.HoodieRecord; @@ -50,15 +51,18 @@ public class CopyOnWriteLazyInsertIterable protected final HoodieTable hoodieTable; protected final String idPrefix; protected int numFilesWritten; + protected SparkTaskContextSupplier sparkTaskContextSupplier; public CopyOnWriteLazyInsertIterable(Iterator> sortedRecordItr, HoodieWriteConfig config, - String instantTime, HoodieTable hoodieTable, String idPrefix) { + String instantTime, HoodieTable hoodieTable, String idPrefix, + SparkTaskContextSupplier sparkTaskContextSupplier) { super(sortedRecordItr); this.hoodieConfig = config; this.instantTime = instantTime; this.hoodieTable = hoodieTable; this.idPrefix = idPrefix; this.numFilesWritten = 0; + this.sparkTaskContextSupplier = sparkTaskContextSupplier; } // Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread. @@ -137,7 +141,7 @@ protected void consumeOneRecord(HoodieInsertValueGenResult payload // lazily initialize the handle, for the first time if (handle == null) { handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(), - getNextFileId(idPrefix)); + getNextFileId(idPrefix), sparkTaskContextSupplier); } if (handle.canWrite(payload.record)) { @@ -148,7 +152,7 @@ protected void consumeOneRecord(HoodieInsertValueGenResult payload statuses.add(handle.close()); // Need to handle the rejected payload & open new handle handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(), - getNextFileId(idPrefix)); + getNextFileId(idPrefix), sparkTaskContextSupplier); handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload. } } diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java index 11c00350ec65..02a9eada81f9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java @@ -18,6 +18,7 @@ package org.apache.hudi.execution; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -35,8 +36,8 @@ public class MergeOnReadLazyInsertIterable extends CopyOnWriteLazyInsertIterable { public MergeOnReadLazyInsertIterable(Iterator> sortedRecordItr, HoodieWriteConfig config, - String instantTime, HoodieTable hoodieTable, String idPfx) { - super(sortedRecordItr, config, instantTime, hoodieTable, idPfx); + String instantTime, HoodieTable hoodieTable, String idPfx, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(sortedRecordItr, config, instantTime, hoodieTable, idPfx, sparkTaskContextSupplier); } @Override @@ -53,7 +54,7 @@ protected void consumeOneRecord(HoodieInsertValueGenResult payload // lazily initialize the handle, for the first time if (handle == null) { handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable, - insertPayload.getPartitionPath(), getNextFileId(idPrefix)); + insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier); } if (handle.canWrite(insertPayload)) { // write the payload, if the handle has capacity @@ -64,7 +65,7 @@ protected void consumeOneRecord(HoodieInsertValueGenResult payload statuses.add(handle.getWriteStatus()); // Need to handle the rejected payload & open new handle handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable, - insertPayload.getPartitionPath(), getNextFileId(idPrefix)); + insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier); handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload. } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index f1bd57c81fcb..0c0873443958 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieDeltaWriteStat; @@ -49,7 +50,6 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.TaskContext; import org.apache.spark.util.SizeEstimator; import java.io.IOException; @@ -101,16 +101,16 @@ public class HoodieAppendHandle extends HoodieWri private long insertRecordsWritten = 0; public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, - String partitionPath, String fileId, Iterator> recordItr) { - super(config, instantTime, partitionPath, fileId, hoodieTable); + String partitionPath, String fileId, Iterator> recordItr, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier); writeStatus.setStat(new HoodieDeltaWriteStat()); this.fileId = fileId; this.recordItr = recordItr; } public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, - String partitionPath, String fileId) { - this(config, instantTime, hoodieTable, partitionPath, fileId, null); + String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) { + this(config, instantTime, hoodieTable, partitionPath, fileId, null, sparkTaskContextSupplier); } private void init(HoodieRecord record) { @@ -137,7 +137,7 @@ private void init(HoodieRecord record) { //save hoodie partition meta in the partition path HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime, new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); - partitionMetadata.trySave(TaskContext.getPartitionId()); + partitionMetadata.trySave(getPartitionId()); this.writer = createLogWriter(fileSlice, baseInstantTime); this.currentLogFile = writer.getLogFile(); ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion()); @@ -163,7 +163,7 @@ private Option getIndexedRecord(HoodieRecord hoodieRecord) { // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get())); String seqId = - HoodieRecord.generateSequenceId(instantTime, TaskContext.getPartitionId(), recordIndex.getAndIncrement()); + HoodieRecord.generateSequenceId(instantTime, getPartitionId(), recordIndex.getAndIncrement()); HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(), fileId); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), instantTime, seqId); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 1ab22e0f09e5..dd8bdac248d8 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -38,7 +39,6 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.TaskContext; import java.io.IOException; import java.util.Iterator; @@ -56,8 +56,8 @@ public class HoodieCreateHandle extends HoodieWri private boolean useWriterSchema = false; public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, - String partitionPath, String fileId) { - super(config, instantTime, partitionPath, fileId, hoodieTable); + String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier); writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); @@ -66,10 +66,10 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa try { HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime, new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); - partitionMetadata.trySave(TaskContext.getPartitionId()); + partitionMetadata.trySave(getPartitionId()); createMarkerFile(partitionPath); this.storageWriter = - HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema); + HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema, this.sparkTaskContextSupplier); } catch (IOException e) { throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e); } @@ -80,8 +80,8 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa * Called by the compactor code path. */ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, - String partitionPath, String fileId, Iterator> recordIterator) { - this(config, instantTime, hoodieTable, partitionPath, fileId); + String partitionPath, String fileId, Iterator> recordIterator, SparkTaskContextSupplier sparkTaskContextSupplier) { + this(config, instantTime, hoodieTable, partitionPath, fileId, sparkTaskContextSupplier); this.recordIterator = recordIterator; this.useWriterSchema = true; } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 3a81340264dc..5b95cd0fa54e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.SparkConfigUtils; import org.apache.hudi.common.model.HoodieBaseFile; @@ -46,7 +47,6 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.TaskContext; import java.io.IOException; import java.util.HashSet; @@ -71,8 +71,8 @@ public class HoodieMergeHandle extends HoodieWrit private boolean useWriterSchema; public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, - Iterator> recordItr, String partitionPath, String fileId) { - super(config, instantTime, partitionPath, fileId, hoodieTable); + Iterator> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier); init(fileId, recordItr); init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get()); } @@ -82,8 +82,8 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab */ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Map> keyToNewRecords, String partitionPath, String fileId, - HoodieBaseFile dataFileToBeMerged) { - super(config, instantTime, partitionPath, fileId, hoodieTable); + HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier); this.keyToNewRecords = keyToNewRecords; this.useWriterSchema = true; init(fileId, this.partitionPath, dataFileToBeMerged); @@ -111,7 +111,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile dataFileTo HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime, new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); - partitionMetadata.trySave(TaskContext.getPartitionId()); + partitionMetadata.trySave(getPartitionId()); oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath); String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") @@ -132,7 +132,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile dataFileTo // Create the writer for writing the new version file storageWriter = - HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema); + HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier); } catch (IOException io) { LOG.error("Error in update task at commit " + instantTime, io); writeStatus.setGlobalError(io); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 336e508e5e41..dd67a6ac150f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -38,7 +39,6 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.TaskContext; import java.io.IOException; @@ -55,26 +55,27 @@ public abstract class HoodieWriteHandle extends H protected final String partitionPath; protected final String fileId; protected final String writeToken; + protected final SparkTaskContextSupplier sparkTaskContextSupplier; public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, - String fileId, HoodieTable hoodieTable) { + String fileId, HoodieTable hoodieTable, SparkTaskContextSupplier sparkTaskContextSupplier) { super(config, instantTime, hoodieTable); this.partitionPath = partitionPath; this.fileId = fileId; - this.writeToken = makeSparkWriteToken(); this.originalSchema = new Schema.Parser().parse(config.getSchema()); this.writerSchema = createHoodieWriteSchema(originalSchema); this.timer = new HoodieTimer().startTimer(); this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(), !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction()); + this.sparkTaskContextSupplier = sparkTaskContextSupplier; + this.writeToken = makeWriteToken(); } /** * Generate a write token based on the currently running spark task and its place in the spark dag. */ - private static String makeSparkWriteToken() { - return FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(), - TaskContext.get().taskAttemptId()); + private String makeWriteToken() { + return FSUtils.makeWriteToken(getPartitionId(), getStageId(), getAttemptId()); } public static Schema createHoodieWriteSchema(Schema originalSchema) { @@ -171,4 +172,16 @@ protected GenericRecord rewriteRecord(GenericRecord record) { protected FileSystem getFileSystem() { return hoodieTable.getMetaClient().getFs(); } + + protected int getPartitionId() { + return sparkTaskContextSupplier.getPartitionIdSupplier().get(); + } + + protected int getStageId() { + return sparkTaskContextSupplier.getStageIdSupplier().get(); + } + + protected long getAttemptId() { + return sparkTaskContextSupplier.getAttemptIdSupplier().get(); + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java index ad8987a54ebb..473a8062f56a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java @@ -19,6 +19,7 @@ package org.apache.hudi.io.storage; import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.common.io.storage.HoodieWrapperFileSystem; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -32,7 +33,6 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.spark.TaskContext; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; @@ -52,9 +52,10 @@ public class HoodieParquetWriter HoodieStorageWriter getStorageWriter( - String instantTime, Path path, HoodieTable hoodieTable, HoodieWriteConfig config, Schema schema) - throws IOException { + String instantTime, Path path, HoodieTable hoodieTable, HoodieWriteConfig config, Schema schema, + SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException { final String name = path.getName(); final String extension = FSUtils.isLogFile(path) ? HOODIE_LOG.getFileExtension() : FSUtils.getFileExtension(name); if (PARQUET.getFileExtension().equals(extension)) { - return newParquetStorageWriter(instantTime, path, config, schema, hoodieTable); + return newParquetStorageWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier); } throw new UnsupportedOperationException(extension + " format not supported yet."); } private static HoodieStorageWriter newParquetStorageWriter( - String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable) - throws IOException { + String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable, + SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException { BloomFilter filter = BloomFilterFactory .createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(), config.getDynamicBloomFilterMaxNumEntries(), @@ -63,6 +64,6 @@ private static HoodieSt config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(), hoodieTable.getHadoopConf(), config.getParquetCompressionRatio()); - return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema); + return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, sparkTaskContextSupplier); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 0b16efe803b9..250bc51c8a6e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -222,13 +222,13 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle ups } protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, Iterator> recordItr) { - return new HoodieMergeHandle<>(config, instantTime, this, recordItr, partitionPath, fileId); + return new HoodieMergeHandle<>(config, instantTime, this, recordItr, partitionPath, fileId, sparkTaskContextSupplier); } protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, Map> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) { return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, - partitionPath, fileId, dataFileToBeMerged); + partitionPath, fileId, dataFileToBeMerged, sparkTaskContextSupplier); } public Iterator> handleInsert(String instantTime, String idPfx, Iterator> recordItr) @@ -238,13 +238,13 @@ public Iterator> handleInsert(String instantTime, String idPfx LOG.info("Empty partition"); return Collections.singletonList((List) Collections.EMPTY_LIST).iterator(); } - return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx); + return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier); } public Iterator> handleInsert(String instantTime, String partitionPath, String fileId, Iterator> recordItr) { HoodieCreateHandle createHandle = - new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordItr); + new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordItr, sparkTaskContextSupplier); createHandle.write(); return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index a7c5a6856d88..4690382901c4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -108,7 +108,7 @@ public Iterator> handleUpdate(String instantTime, String parti return super.handleUpdate(instantTime, partitionPath, fileId, recordItr); } else { HoodieAppendHandle appendHandle = new HoodieAppendHandle<>(config, instantTime, this, - partitionPath, fileId, recordItr); + partitionPath, fileId, recordItr, sparkTaskContextSupplier); appendHandle.doAppend(); appendHandle.close(); return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator(); @@ -120,7 +120,7 @@ public Iterator> handleInsert(String instantTime, String idPfx throws Exception { // If canIndexLogFiles, write inserts to log files else write inserts to parquet files if (index.canIndexLogFiles()) { - return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx); + return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier); } else { return super.handleInsert(instantTime, idPfx, recordItr); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index e38510ffdcd7..a05e28f2e2aa 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; @@ -84,6 +85,8 @@ public abstract class HoodieTable implements Seri private SerializableConfiguration hadoopConfiguration; private transient FileSystemViewManager viewManager; + protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier(); + protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc) { this.config = config; this.hadoopConfiguration = new SerializableConfiguration(jsc.hadoopConfiguration()); @@ -448,4 +451,8 @@ private boolean waitForCondition(String partitionPath, Stream oldRecords = ParquetUtils.readAvroRecords(conf, diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java index 4e5721f58c9c..c5c7d8248841 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java @@ -17,6 +17,7 @@ package org.apache.hudi.common; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.TestHoodieClientBase; import org.apache.hudi.common.minicluster.HdfsTestService; import org.apache.hudi.common.model.HoodieTestUtils; @@ -55,6 +56,8 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im protected transient HoodieTableMetaClient metaClient; private static AtomicInteger instantGen = new AtomicInteger(1); + protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); + public String getNextInstant() { return String.format("%09d", instantGen.getAndIncrement()); } diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java index b85de2cbba62..b4601c299783 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.common; import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.filter.BloomFilter; @@ -230,7 +231,8 @@ public static String writeParquetFile(String basePath, String partitionPath, Str ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, HoodieTestUtils.getDefaultHadoopConf(), Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO)); HoodieParquetWriter writer = - new HoodieParquetWriter(instantTime, new Path(basePath + "/" + partitionPath + "/" + filename), config, schema); + new HoodieParquetWriter(instantTime, new Path(basePath + "/" + partitionPath + "/" + filename), config, + schema, new SparkTaskContextSupplier()); int seqId = 1; for (HoodieRecord record : records) { GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java index 6758377b3ea9..a1492dd43eb6 100755 --- a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java @@ -18,6 +18,7 @@ package org.apache.hudi.io.storage; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.TestHoodieClientBase; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieWriteConfig; @@ -44,15 +45,16 @@ public void testGetStorageWriter() throws IOException { final Path parquetPath = new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"); final HoodieWriteConfig cfg = getConfig(); HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); HoodieStorageWriter parquetWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, - parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA); + parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); Assert.assertTrue(parquetWriter instanceof HoodieParquetWriter); // other file format exception. final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1"); try { HoodieStorageWriter logWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, logPath, - table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA); + table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); fail("should fail since log storage writer is not supported yet."); } catch (Exception e) { Assert.assertTrue(e instanceof UnsupportedOperationException); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index c88233df87f3..9ff0f971ed3b 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -103,7 +103,7 @@ public void testMakeNewPath() throws Exception { when(record.getPartitionPath()).thenReturn(partitionPath); String writeToken = FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); - HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime, table, partitionPath, fileName); + HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime, table, partitionPath, fileName, supplier); return Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken); }).collect().get(0);