Skip to content

Commit

Permalink
[HUDI-2738] Remove the bucketAssignFunction useless context
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhaojing committed Nov 11, 2021
1 parent 90f9b45 commit 536f346
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
Expand Down Expand Up @@ -70,8 +69,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
extends KeyedProcessFunction<K, I, O>
implements CheckpointedFunction, CheckpointListener {

private BucketAssignOperator.Context context;

/**
* Index cache(speed-up) state for the underneath file based(BloomFilter) indices.
* When a record came in, we do these check:
Expand Down Expand Up @@ -158,7 +155,6 @@ public void initializeState(FunctionInitializationContext context) {
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
if (value instanceof IndexRecord) {
IndexRecord<?> indexRecord = (IndexRecord<?>) value;
this.context.setCurrentKey(indexRecord.getRecordKey());
this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
} else {
processRecord((HoodieRecord<?>) value, out);
Expand Down Expand Up @@ -198,7 +194,6 @@ private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exce
}
} else {
location = getNewRecordLocation(partitionPath);
this.context.setCurrentKey(recordKey);
}
// always refresh the index
if (isChangingRecords) {
Expand Down Expand Up @@ -243,13 +238,4 @@ public void notifyCheckpointComplete(long checkpointId) {
public void close() throws Exception {
this.bucketAssigner.close();
}

public void setContext(BucketAssignOperator.Context context) {
this.context = context;
}

@VisibleForTesting
public void clearIndexState() {
this.indexState.clear();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;

Expand All @@ -44,6 +43,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
Expand Down Expand Up @@ -163,7 +163,7 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
// shuffle by fileId(bucket id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
Expand Down Expand Up @@ -91,7 +90,7 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
/**
* BucketAssignOperator context.
**/
private MockBucketAssignOperatorContext bucketAssignOperatorContext;
private final MockBucketAssignFunctionContext bucketAssignFunctionContext;
/**
* Stream write function.
*/
Expand Down Expand Up @@ -125,11 +124,10 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
this.stateInitializationContext = new MockStateInitializationContext();
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
this.output = new CollectorOutput<>(new ArrayList<>());
this.streamConfig = new StreamConfig(conf);
streamConfig.setOperatorID(new OperatorID());
Expand All @@ -155,7 +153,6 @@ public void openFunction() throws Exception {
bucketAssignerFunction = new BucketAssignFunction<>(conf);
bucketAssignerFunction.setRuntimeContext(runtimeContext);
bucketAssignerFunction.open(conf);
bucketAssignerFunction.setContext(bucketAssignOperatorContext);
bucketAssignerFunction.initializeState(this.stateInitializationContext);

setupWriteFunction();
Expand Down Expand Up @@ -187,15 +184,16 @@ public void close() {
if (streamElement.isRecord()) {
HoodieRecord<?> bootstrapRecord = (HoodieRecord<?>) streamElement.asRecord().getValue();
bucketAssignerFunction.processElement(bootstrapRecord, null, collector);
bucketAssignFunctionContext.setCurrentKey(bootstrapRecord.getRecordKey());
}
}

bootstrapOperator.processElement(new StreamRecord<>(hoodieRecord));
list.clear();
this.bucketAssignOperatorContext.setCurrentKey(hoodieRecord.getRecordKey());
}

bucketAssignerFunction.processElement(hoodieRecord, null, collector);
bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
writeFunction.processElement(hoodieRecords[0], null, null);
}

Expand Down Expand Up @@ -267,13 +265,8 @@ public MockOperatorCoordinatorContext getCoordinatorContext() {
return coordinatorContext;
}

public void clearIndexState() {
this.bucketAssignerFunction.clearIndexState();
this.bucketAssignOperatorContext.clearIndexState();
}

public boolean isKeyInState(HoodieKey hoodieKey) {
return this.bucketAssignOperatorContext.isKeyInState(hoodieKey.getRecordKey());
return this.bucketAssignFunctionContext.isKeyInState(hoodieKey.getRecordKey());
}

public boolean isConforming() {
Expand Down Expand Up @@ -303,18 +296,13 @@ private void setupWriteFunction() throws Exception {
// Inner Class
// -------------------------------------------------------------------------

private static class MockBucketAssignOperatorContext implements BucketAssignOperator.Context {
private static class MockBucketAssignFunctionContext {
private final Set<Object> updateKeys = new HashSet<>();

@Override
public void setCurrentKey(Object key) {
this.updateKeys.add(key);
}

public void clearIndexState() {
this.updateKeys.clear();
}

public boolean isKeyInState(String key) {
return this.updateKeys.contains(key);
}
Expand Down

0 comments on commit 536f346

Please sign in to comment.