Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public boolean isImplicitWithStorage() {
HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodiePairData<String, String> partitionRecordKeyPairs) {
LOG.info("Instantiating index file filter ");
IndexFileFilter indexFileFilter =
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,14 @@ public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getTaskAttemptNumberSupplier() {
return () -> -1;
}

@Override
public Supplier<Integer> getStageAttemptNumberSupplier() {
return () -> -1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,14 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getTaskAttemptNumberSupplier() {
return () -> -1;
}

@Override
public Supplier<Integer> getStageAttemptNumberSupplier() {
return () -> -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getTaskAttemptNumberSupplier() {
return () -> -1;
}

@Override
public Supplier<Integer> getStageAttemptNumberSupplier() {
return () -> -1;
}
}

protected void initFileSystem(String basePath, StorageConfiguration<?> hadoopConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ public Supplier<Long> getAttemptIdSupplier() {
return () -> TaskContext.get().taskAttemptId();
}

@Override
public Supplier<Integer> getTaskAttemptNumberSupplier() {
return () -> TaskContext.get().attemptNumber();
}

@Override
public Supplier<Integer> getStageAttemptNumberSupplier() {
return () -> TaskContext.get().stageAttemptNumber();
}

@Override
public Option<String> getProperty(EngineProperty prop) {
if (prop == EngineProperty.TOTAL_MEMORY_AVAILABLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public BucketizedBloomCheckPartitioner(int targetPartitions, Map<HoodieFileGroup
// of buckets and assigns buckets in the same order as file groups. If we were to simply round robin, then buckets
// for a file group is more or less guaranteed to be placed on different partitions all the time.
int minBucketsPerPartition = Math.max((int) Math.floor((1.0 * totalBuckets) / partitions), 1);
LOG.info(String.format("TotalBuckets %d, min_buckets/partition %d", totalBuckets, minBucketsPerPartition));
LOG.info("TotalBuckets {}, min_buckets/partition {}, partitions {}", totalBuckets, minBucketsPerPartition, partitions);
int[] bucketsFilled = new int[partitions];
Map<HoodieFileGroupId, AtomicInteger> bucketsFilledPerFileGroup = new HashMap<>();
int partitionIndex = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hudi.table.HoodieTable;

import org.apache.spark.Partitioner;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
Expand Down Expand Up @@ -312,6 +313,10 @@ public HoodieSparkBloomIndexCheckFunction(HoodieTable hoodieTable,

@Override
public Iterator<List<HoodieKeyLookupResult>> call(Iterator<Tuple2<HoodieFileGroupId, String>> fileGroupIdRecordKeyPairIterator) {
TaskContext taskContext = TaskContext.get();
LOG.info("HoodieSparkBloomIndexCheckFunction with stageId : {}, stage attempt no: {}, taskId : {}, task attempt no : {}, task attempt id : {} ",
taskContext.stageId(), taskContext.stageAttemptNumber(), taskContext.partitionId(), taskContext.attemptNumber(),
taskContext.taskAttemptId());
return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRec

// Handle records update with clustering
HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = clusteringHandleUpdate(inputRecords);
LOG.info("Num spark partitions for inputRecords before triggering workload profile {}", inputRecordsWithClusteringUpdate.getNumPartitions());

context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile:" + config.getTableName());
HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start(); // time taken from dedup -> tag location -> building workload profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,13 @@ public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getTaskAttemptNumberSupplier() {
return () -> -1;
}

@Override
public Supplier<Integer> getStageAttemptNumberSupplier() {
return () -> -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,11 @@ public abstract class TaskContextSupplier implements Serializable {
public abstract Supplier<Long> getAttemptIdSupplier();

public abstract Option<String> getProperty(EngineProperty prop);

/**
* @returns the attempt number for the task of interest. Attempt starts with 0 and goes up by 1 on retries.
*/
public abstract Supplier<Integer> getTaskAttemptNumberSupplier();

public abstract Supplier<Integer> getStageAttemptNumberSupplier();
}
Original file line number Diff line number Diff line change
Expand Up @@ -550,5 +550,15 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getTaskAttemptNumberSupplier() {
return () -> 1;
}

@Override
public Supplier<Integer> getStageAttemptNumberSupplier() {
return () -> 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

Expand All @@ -56,6 +59,7 @@
* Read json kafka data.
*/
public class JsonKafkaSource extends KafkaSource<JavaRDD<String>> {
private static final Logger LOG = LoggerFactory.getLogger(JsonKafkaSource.class);

public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
Expand Down Expand Up @@ -83,6 +87,10 @@ protected JavaRDD<String> toBatch(OffsetRange[] offsetRanges) {
protected JavaRDD<String> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
if (this.shouldAddOffsets) {
return kafkaRDD.mapPartitions(partitionIterator -> {
TaskContext taskContext = TaskContext.get();
LOG.info("Converting Kafka source objects to strings with stageId : {}, stage attempt no: {}, taskId : {}, task attempt no : {}, task attempt id : {} ",
taskContext.stageId(), taskContext.stageAttemptNumber(), taskContext.partitionId(), taskContext.attemptNumber(),
taskContext.taskAttemptId());
ObjectMapper objectMapper = new ObjectMapper();
return new CloseableMappingIterator<>(ClosableIterator.wrap(partitionIterator), consumerRecord -> {
String recordValue = consumerRecord.value().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ protected final InputBatch<T> fetchNewData(Option<String> lastCkptStr, long sour
@Override
protected InputBatch<T> readFromCheckpoint(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
try {
return toInputBatch(getOffsetRanges(props, sourceProfileSupplier, offsetGen, metrics,
lastCheckpoint, sourceLimit));
OffsetRange[] offsetRanges = getOffsetRanges(props, sourceProfileSupplier, offsetGen, metrics,
lastCheckpoint, sourceLimit);
return toInputBatch(offsetRanges);
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
} catch (KafkaException ex) {
Expand Down Expand Up @@ -116,7 +117,8 @@ public static OffsetRange[] getOffsetRanges(TypedProperties props,

private InputBatch<T> toInputBatch(OffsetRange[] offsetRanges) {
long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
LOG.info("About to read {} from Kafka for topic :{} after offset generation with offset ranges {}",
totalNewMsgs, offsetGen.getTopicName(), Arrays.toString(offsetRanges));
if (totalNewMsgs <= 0) {
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 0);
return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.apache.spark.sql.avro.HoodieAvroDeserializer;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Iterator;
Expand All @@ -70,6 +72,8 @@
*/
public class HoodieStreamerUtils {

private static final Logger LOG = LoggerFactory.getLogger(HoodieStreamerUtils.class);

/**
* Generates HoodieRecords for the avro data read from source.
* Takes care of dropping columns, precombine, auto key generation.
Expand All @@ -94,6 +98,10 @@ public static Option<JavaRDD<HoodieRecord>> createHoodieRecords(HoodieStreamer.C
if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
records = avroRDD.mapPartitions(
(FlatMapFunction<Iterator<GenericRecord>, Either<HoodieRecord,String>>) genericRecordIterator -> {
TaskContext taskContext = TaskContext.get();
LOG.info("Creating HoodieRecords with stageId : {}, stage attempt no: {}, taskId : {}, task attempt no : {}, task attempt id : {} ",
taskContext.stageId(), taskContext.stageAttemptNumber(), taskContext.partitionId(), taskContext.attemptNumber(),
taskContext.taskAttemptId());
if (autoGenerateRecordKeys) {
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId()));
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
Expand Down
Loading