Skip to content

Commit

Permalink
[HUDI-2742] - Added s3 object filter to support multiple S3EventsHood…
Browse files Browse the repository at this point in the history
…ieIncrSources single S3 meta table
  • Loading branch information
h7kanna committed Nov 18, 2021
1 parent 2d3f2a3 commit 8dc9c77
Showing 1 changed file with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
Expand All @@ -49,7 +50,6 @@
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX;

/**
* This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}.
Expand All @@ -62,6 +62,10 @@ static class Config {
// control whether we do existence check for files before consuming them
static final String ENABLE_EXISTS_CHECK = "hoodie.deltastreamer.source.s3incr.check.file.exists";
static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = false;

// control whether to filter the s3 objects starting with this prefix
static final String S3_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.key.prefix";
static final String S3_FS_PREFIX = "hoodie.deltastreamer.source.s3incr.fs.prefix";
}

public S3EventsHoodieIncrSource(
Expand Down Expand Up @@ -101,9 +105,18 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
Dataset<Row> source = metaReader.load(srcPath);

String filter = "s3.object.size > 0";
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {
filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'";
}

String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
String s3Prefix = s3FS + "://";

// Extract distinct file keys from s3 meta hoodie table
final List<Row> cloudMetaDf = source
.filter("s3.object.size > 0")
.filter(filter)
.select("s3.bucket.name", "s3.object.key")
.distinct()
.collectAsList();
Expand All @@ -113,9 +126,9 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
for (Row row : cloudMetaDf) {
// construct file path, row index 0 refers to bucket and 1 refers to key
String bucket = row.getString(0);
String filePath = S3_PREFIX + bucket + "/" + row.getString(1);
String filePath = s3Prefix + bucket + "/" + row.getString(1);
if (checkExists) {
FileSystem fs = FSUtils.getFs(S3_PREFIX + bucket, sparkSession.sparkContext().hadoopConfiguration());
FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration());
try {
if (fs.exists(new Path(filePath))) {
cloudFiles.add(filePath);
Expand Down

0 comments on commit 8dc9c77

Please sign in to comment.