Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2742] - Added s3 object filter to support multiple S3EventsHood… #4025

Merged
merged 1 commit into from
Nov 20, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
Expand All @@ -49,7 +50,6 @@
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX;

/**
* This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}.
Expand All @@ -62,6 +62,10 @@ static class Config {
// control whether we do existence check for files before consuming them
static final String ENABLE_EXISTS_CHECK = "hoodie.deltastreamer.source.s3incr.check.file.exists";
static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = false;

// control whether to filter the s3 objects starting with this prefix
static final String S3_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.key.prefix";
static final String S3_FS_PREFIX = "hoodie.deltastreamer.source.s3incr.fs.prefix";
}

public S3EventsHoodieIncrSource(
Expand Down Expand Up @@ -101,9 +105,18 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
Dataset<Row> source = metaReader.load(srcPath);

String filter = "s3.object.size > 0";
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {
filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'";
}

String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
String s3Prefix = s3FS + "://";

// Extract distinct file keys from s3 meta hoodie table
final List<Row> cloudMetaDf = source
.filter("s3.object.size > 0")
.filter(filter)
.select("s3.bucket.name", "s3.object.key")
.distinct()
.collectAsList();
Expand All @@ -113,9 +126,9 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
for (Row row : cloudMetaDf) {
// construct file path, row index 0 refers to bucket and 1 refers to key
String bucket = row.getString(0);
String filePath = S3_PREFIX + bucket + "/" + row.getString(1);
String filePath = s3Prefix + bucket + "/" + row.getString(1);
if (checkExists) {
FileSystem fs = FSUtils.getFs(S3_PREFIX + bucket, sparkSession.sparkContext().hadoopConfiguration());
FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration());
try {
if (fs.exists(new Path(filePath))) {
cloudFiles.add(filePath);
Expand Down