Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1511] InstantGenerateOperator support multiple parallelism #2434

Merged
merged 8 commits into from
Jan 22, 2021

Conversation

loukey-lj
Copy link
Contributor

InstantGenerateOperator support multiple parallelism.
When InstantGenerateOperator subtask size greater than 1 we can set subtask 0 as a main subtask, only main task create new instant.
The prerequisite of create new instant is exist subtask received data in current checkpoint. Every subtask will create a tmp file,
flie name is make up by checkpointid,subtask index and received records size.
The main subtask will check every subtask file and parse file to make sure is shuold to create new instant.

@loukey-lj loukey-lj closed this Jan 12, 2021
@loukey-lj
Copy link
Contributor Author

reopen with new changes

Copy link
Contributor

@yanghua yanghua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loukey-lj Thanks for your contribution. Quickly check and left some comments.

@@ -71,6 +71,7 @@

private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
public static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming to instant_generate_tmp looks better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move it to hudi-flink module would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to InstantGenerateOperator

@@ -272,7 +273,7 @@ public HoodieWrapperFileSystem getFs() {

/**
* Return raw file-system.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just revert this file's change via git command.

@@ -420,6 +421,12 @@ public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hado
}
}

// Always create instantGenerateFolder which is needed for InstantGenerateOperator
final Path instantGenerateFolder = new Path(basePath, HoodieTableMetaClient.INSTANT_GENERATE_FOLDER_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangxianghu Do we have a better place to put this change? It's common package.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangxianghu Do we have a better place to put this change? It's common package.

How about StreamerUtil ? this operation is only used in HoodieFlinkStreamer WDYT @loukey-lj

@@ -18,6 +18,18 @@

package org.apache.hudi.operator;

import org.apache.flink.api.common.state.ListState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let revert unnecessary changes? Pay attention to the import order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
if (isMain) {
// retry times
retryTimes = Integer.valueOf(cfg.blockRetryTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to verify the config opinion to see if it's valuable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config have default value

// writeClient
writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
// retry interval
retryInterval = Integer.valueOf(cfg.blockRetryInterval);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config have default value

// last instant completed, set it empty
latestInstant = "";
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
String instantGenerateInfoFileName = indexOfThisSubtask + UNDERLINE + checkpointId + UNDERLINE + batchSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use String.format()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

fs.create(path,true);
LOG.info("subtask [{}] at checkpoint [{}] created generate file [{}]",indexOfThisSubtask,checkpointId,instantGenerateInfoFileName);
if (isMain) {
boolean hasData = generateFilePasre(checkpointId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receivedDataInCurrentCP?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree


}

private boolean generateFilePasre(long checkpointId) throws InterruptedException, IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkReceivedData?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

@codecov-io
Copy link

codecov-io commented Jan 12, 2021

Codecov Report

Merging #2434 (ed0b594) into master (56866a1) will decrease coverage by 43.87%.
The diff coverage is n/a.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #2434       +/-   ##
============================================
- Coverage     53.56%   9.68%   -43.88%     
+ Complexity     2774      48     -2726     
============================================
  Files           348      53      -295     
  Lines         16117    1930    -14187     
  Branches       1641     230     -1411     
============================================
- Hits           8633     187     -8446     
+ Misses         6785    1730     -5055     
+ Partials        699      13      -686     
Flag Coverage Δ Complexity Δ
hudicli ? ?
hudiclient ? ?
hudicommon ? ?
hudihadoopmr ? ?
hudispark ? ?
huditimelineservice ? ?
hudiutilities 9.68% <ø> (-60.38%) 0.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...va/org/apache/hudi/utilities/IdentitySplitter.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-2.00%)
...va/org/apache/hudi/utilities/schema/SchemaSet.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-3.00%)
...a/org/apache/hudi/utilities/sources/RowSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
.../org/apache/hudi/utilities/sources/AvroSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
.../org/apache/hudi/utilities/sources/JsonSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
...rg/apache/hudi/utilities/sources/CsvDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-10.00%)
...g/apache/hudi/utilities/sources/JsonDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
...apache/hudi/utilities/sources/JsonKafkaSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-6.00%)
...pache/hudi/utilities/sources/ParquetDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-5.00%)
...lities/schema/SchemaProviderWithPostProcessor.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-3.00%)
... and 328 more

Copy link
Contributor

@yanghua yanghua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loukey-lj Left some comments.

latestInstant = "";
}
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
String instantGenerateInfoFileName = String.format("%d_%d_%d",indexOfThisSubtask,checkpointId,batchSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many lines have the same issue, please change the code style like this: xxx,xxx -> xxx, xxx.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formated

return hasData;
}

private void createinstantGenerateTmpFolder() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> createInstantGenerateTmpDir?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
}

boolean hasData = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> receivedData?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if (context.isRestored()) {
Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
latestInstantIterator.forEachRemaining(x -> latestInstant = x);
LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restoring the latest instant [{}] from the state. sounds better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
}

@Override
public void open() throws Exception {
super.open();
isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wet call getRuntimeContext().getIndexOfThisSubtask() several times, can we define a variable to store it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

called in open and initializeState method, initializeState method start before open method . So I removed the code in open method.

Copy link
Contributor

@yanghua yanghua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loukey-lj Left another comments.

@@ -102,65 +105,76 @@ public void open() throws Exception {
// Hadoop FileSystem
fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());

TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
if (isMain) {
TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, here we can set the argument of FlinkTaskContextSupplier to the instance of RuntimeContext, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt about this review suggestion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// init table, create it if not exists.
initTable();

// create instantGenerateTmpFolder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the comment, we'd better avoid copying the method name, create temp folder for generating instant seems better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to 'create instant marker directory'

fileStatuses = fs.listStatus(generatePath, new PathFilter() {
@Override
public boolean accept(Path pathname) {
return pathname.getName().contains(String.format("_%d_", checkpointId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have defined the separator by UNDERLINE, it would be better to use that. wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this suggestion be accepted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
FileStatus[] fileStatuses = null;
Path generatePath = new Path(INSTANT_GENERATE_FOLDER_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loukey-lj how about moving this path to .hoodie/.aux

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

private Integer retryTimes;
private Integer retryInterval;
private static final String UNDERLINE = "_";
private static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate_tmp";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INSTANT_GENERATE_FOLDER_NAME = ".instant_marker" ?
WDYT ? @yanghua

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

@@ -272,7 +273,7 @@ public HoodieWrapperFileSystem getFs() {

/**
* Return raw file-system.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just revert this file's change via git command.

private static final String UNDERLINE = "_";
private static final String INSTANT_MARKER_FOLDER_NAME = ".instant_marker";
private transient boolean isMain = false;
private transient AtomicLong batchSize = new AtomicLong(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think cpRecordCounter sounds better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

@@ -102,65 +105,76 @@ public void open() throws Exception {
// Hadoop FileSystem
fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());

TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
if (isMain) {
TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt about this review suggestion?

} else {
latestInstantList.set(0, latestInstant);
LOG.info("Records size [{}] checkpointId [{}]", batchSize, checkpointId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task instance %d received %d records in checkpoint [%d] looks better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

fileStatuses = fs.listStatus(generatePath, new PathFilter() {
@Override
public boolean accept(Path pathname) {
return pathname.getName().contains(String.format("_%d_", checkpointId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this suggestion be accepted?

Copy link
Contributor

@yanghua yanghua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loukey-lj We are close to merging this PR, and there are several minor issues that need to be fixed.

// no data no new instant
if (!bufferedRecords.isEmpty()) {
latestInstant = startNewInstant(checkpointId);
// no data no new instant
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we can move boolean receivedDataInCurrentCP = checkReceivedData(checkpointId); to this line. This way we can delay calling the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
// mk generate file by each subtask
fs.create(path, true);
LOG.info("Subtask [{}] at checkpoint [{}] created generate file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate -> marker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

if (tryTimes >= 5) {
LOG.warn("waiting generate file, checkpointId [{}]", checkpointId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate -> marker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
// mk generate file by each subtask
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate -> marker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


if (tryTimes >= 5) {
LOG.warn("waiting marker file, checkpointId [{}]", checkpointId);
tryTimes = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a bug? when tryTimes >= 5, only reinit it to 0? Do not break? If the logic is correct, why we need this counter?

Copy link
Contributor

@yanghua yanghua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loukey-lj Thanks for your effort! LGTM now. @wangxianghu Do you still have any concerns?

@wangxianghu
Copy link
Contributor

@loukey-lj Thanks for your effort! LGTM now. @wangxianghu Do you still have any concerns?
Nothing else, thanks @loukey-lj @yanghua

@yanghua
Copy link
Contributor

yanghua commented Jan 20, 2021

Hi @vinothchandar Can we merge this PR, right now?

@vinothchandar
Copy link
Member

@yanghua please wait till I make the release branch. We also need to stabilize CI before landing anymore. it will be very hard to track down otherwise.

@vinothchandar
Copy link
Member

@yanghua should be good to land now if you are happy with it

@yanghua
Copy link
Contributor

yanghua commented Jan 22, 2021

@yanghua should be good to land now if you are happy with it

ack, thanks.

@yanghua yanghua merged commit b64d22e into apache:master Jan 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants