Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5157] Adding capability to remove all meta fields from source hudi table with Hudi incr source #7132

Merged
merged 1 commit into from
Nov 22, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ public static class Config {
*/
static final String SOURCE_FILE_FORMAT = "hoodie.deltastreamer.source.hoodieincr.file.format";
static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet";

/**
* Drops all meta fields from the source hudi table while ingesting into sink hudi table.
*/
static final String HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = "hoodie.deltastreamer.source.hoodieincr.drop.all.meta.fields.from.source";
public static final Boolean DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = false;
}

public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
Expand Down Expand Up @@ -174,10 +180,13 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
*
* log.info("Validated Source Schema :" + validated.schema());
*/
boolean dropAllMetaFields = props.getBoolean(Config.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE,
Config.DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE);

// Remove Hoodie meta columns except partition path from input source
final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream()
.filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new));
String[] colsToDrop = dropAllMetaFields ? HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) :
HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know what was the reason we strictly dropped partition path in the first place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not decode that. I don't see a need unless we want to carry over the partitioning from tableA to tableB.

final Dataset<Row> src = source.drop(colsToDrop);
// log.info("Final Schema from Source is :" + src.schema());
return Pair.of(Option.of(src), queryTypeAndInstantEndpts.getRight().getRight());
}
Expand Down