Skip to content

Commit

Permalink
[HUDI-679] Make io package Spark free (#1460)
Browse files Browse the repository at this point in the history
* [HUDI-679] Make io package Spark free
  • Loading branch information
leesf committed Mar 29, 2020
1 parent ac73bdc commit 07c3c5d
Show file tree
Hide file tree
Showing 19 changed files with 136 additions and 58 deletions.
Expand Up @@ -22,6 +22,7 @@ import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.filter.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.HoodieRecord
Expand All @@ -45,7 +46,7 @@ object SparkHelpers {
HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema)
val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier())
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {
Expand Down
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client;

import org.apache.spark.TaskContext;

import java.io.Serializable;
import java.util.function.Supplier;

/**
* Spark task context supplier.
*/
public class SparkTaskContextSupplier implements Serializable {

public Supplier<Integer> getPartitionIdSupplier() {
return () -> TaskContext.getPartitionId();
}

public Supplier<Integer> getStageIdSupplier() {
return () -> TaskContext.get().stageId();
}

public Supplier<Long> getAttemptIdSupplier() {
return () -> TaskContext.get().taskAttemptId();
}
}
Expand Up @@ -51,6 +51,6 @@ public BulkInsertMapFunction(String instantTime, HoodieWriteConfig config, Hoodi
@Override
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) {
return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable,
fileIDPrefixes.get(partition));
fileIDPrefixes.get(partition), hoodieTable.getSparkTaskContextSupplier());
}
}
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.execution;

import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -50,15 +51,18 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
protected final HoodieTable<T> hoodieTable;
protected final String idPrefix;
protected int numFilesWritten;
protected SparkTaskContextSupplier sparkTaskContextSupplier;

public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String instantTime, HoodieTable<T> hoodieTable, String idPrefix) {
String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
SparkTaskContextSupplier sparkTaskContextSupplier) {
super(sortedRecordItr);
this.hoodieConfig = config;
this.instantTime = instantTime;
this.hoodieTable = hoodieTable;
this.idPrefix = idPrefix;
this.numFilesWritten = 0;
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
}

// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
Expand Down Expand Up @@ -137,7 +141,7 @@ protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
getNextFileId(idPrefix));
getNextFileId(idPrefix), sparkTaskContextSupplier);
}

if (handle.canWrite(payload.record)) {
Expand All @@ -148,7 +152,7 @@ protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload
statuses.add(handle.close());
// Need to handle the rejected payload & open new handle
handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
getNextFileId(idPrefix));
getNextFileId(idPrefix), sparkTaskContextSupplier);
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.execution;

import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand All @@ -35,8 +36,8 @@
public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extends CopyOnWriteLazyInsertIterable<T> {

public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String instantTime, HoodieTable<T> hoodieTable, String idPfx) {
super(sortedRecordItr, config, instantTime, hoodieTable, idPfx);
String instantTime, HoodieTable<T> hoodieTable, String idPfx, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(sortedRecordItr, config, instantTime, hoodieTable, idPfx, sparkTaskContextSupplier);
}

@Override
Expand All @@ -53,7 +54,7 @@ protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
insertPayload.getPartitionPath(), getNextFileId(idPrefix));
insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier);
}
if (handle.canWrite(insertPayload)) {
// write the payload, if the handle has capacity
Expand All @@ -64,7 +65,7 @@ protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload
statuses.add(handle.getWriteStatus());
// Need to handle the rejected payload & open new handle
handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
insertPayload.getPartitionPath(), getNextFileId(idPrefix));
insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier);
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io;

import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
Expand Down Expand Up @@ -49,7 +50,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import org.apache.spark.util.SizeEstimator;

import java.io.IOException;
Expand Down Expand Up @@ -101,16 +101,16 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
private long insertRecordsWritten = 0;

public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
super(config, instantTime, partitionPath, fileId, hoodieTable);
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
writeStatus.setStat(new HoodieDeltaWriteStat());
this.fileId = fileId;
this.recordItr = recordItr;
}

public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId) {
this(config, instantTime, hoodieTable, partitionPath, fileId, null);
String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, null, sparkTaskContextSupplier);
}

private void init(HoodieRecord record) {
Expand All @@ -137,7 +137,7 @@ private void init(HoodieRecord record) {
//save hoodie partition meta in the partition path
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
partitionMetadata.trySave(getPartitionId());
this.writer = createLogWriter(fileSlice, baseInstantTime);
this.currentLogFile = writer.getLogFile();
((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
Expand All @@ -163,7 +163,7 @@ private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get()));
String seqId =
HoodieRecord.generateSequenceId(instantTime, TaskContext.getPartitionId(), recordIndex.getAndIncrement());
HoodieRecord.generateSequenceId(instantTime, getPartitionId(), recordIndex.getAndIncrement());
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(),
hoodieRecord.getPartitionPath(), fileId);
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), instantTime, seqId);
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io;

import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -38,7 +39,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;

import java.io.IOException;
import java.util.Iterator;
Expand All @@ -56,8 +56,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
private boolean useWriterSchema = false;

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId) {
super(config, instantTime, partitionPath, fileId, hoodieTable);
String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);

Expand All @@ -66,10 +66,10 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
try {
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath);
this.storageWriter =
HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema);
HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema, this.sparkTaskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
}
Expand All @@ -80,8 +80,8 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
* Called by the compactor code path.
*/
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator) {
this(config, instantTime, hoodieTable, partitionPath, fileId);
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator, SparkTaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, sparkTaskContextSupplier);
this.recordIterator = recordIterator;
this.useWriterSchema = true;
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io;

import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
Expand Down Expand Up @@ -46,7 +47,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;

import java.io.IOException;
import java.util.HashSet;
Expand All @@ -71,8 +71,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
private boolean useWriterSchema;

public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId) {
super(config, instantTime, partitionPath, fileId, hoodieTable);
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
init(fileId, recordItr);
init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
}
Expand All @@ -82,8 +82,8 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab
*/
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged) {
super(config, instantTime, partitionPath, fileId, hoodieTable);
HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
init(fileId, this.partitionPath, dataFileToBeMerged);
Expand Down Expand Up @@ -111,7 +111,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile dataFileTo

HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
partitionMetadata.trySave(getPartitionId());

oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
Expand All @@ -132,7 +132,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile dataFileTo

// Create the writer for writing the new version file
storageWriter =
HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema);
HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io;

import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand All @@ -38,7 +39,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;

import java.io.IOException;

Expand All @@ -55,26 +55,27 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
protected final String partitionPath;
protected final String fileId;
protected final String writeToken;
protected final SparkTaskContextSupplier sparkTaskContextSupplier;

public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T> hoodieTable) {
String fileId, HoodieTable<T> hoodieTable, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, hoodieTable);
this.partitionPath = partitionPath;
this.fileId = fileId;
this.writeToken = makeSparkWriteToken();
this.originalSchema = new Schema.Parser().parse(config.getSchema());
this.writerSchema = createHoodieWriteSchema(originalSchema);
this.timer = new HoodieTimer().startTimer();
this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
this.writeToken = makeWriteToken();
}

/**
* Generate a write token based on the currently running spark task and its place in the spark dag.
*/
private static String makeSparkWriteToken() {
return FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(),
TaskContext.get().taskAttemptId());
private String makeWriteToken() {
return FSUtils.makeWriteToken(getPartitionId(), getStageId(), getAttemptId());
}

public static Schema createHoodieWriteSchema(Schema originalSchema) {
Expand Down Expand Up @@ -171,4 +172,16 @@ protected GenericRecord rewriteRecord(GenericRecord record) {
protected FileSystem getFileSystem() {
return hoodieTable.getMetaClient().getFs();
}

protected int getPartitionId() {
return sparkTaskContextSupplier.getPartitionIdSupplier().get();
}

protected int getStageId() {
return sparkTaskContextSupplier.getStageIdSupplier().get();
}

protected long getAttemptId() {
return sparkTaskContextSupplier.getAttemptIdSupplier().get();
}
}

0 comments on commit 07c3c5d

Please sign in to comment.