Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;

import com.esotericsoftware.minlog.Log;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
Expand All @@ -42,6 +46,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
Expand All @@ -50,7 +55,6 @@
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;

/**
* This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}.
*/
Expand All @@ -71,6 +75,12 @@ static class Config {
static final String S3_IGNORE_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.ignore.key.prefix";
// control whether to ignore the s3 objects with this substring
static final String S3_IGNORE_KEY_SUBSTRING = "hoodie.deltastreamer.source.s3incr.ignore.key.substring";
/**
*{@value #SPARK_DATASOURCE_OPTIONS} is json string, passed to the reader while loading dataset.
* Example delta streamer conf
* - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
*/
static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";
}

public S3EventsHoodieIncrSource(
Expand All @@ -81,6 +91,22 @@ public S3EventsHoodieIncrSource(
super(props, sparkContext, sparkSession, schemaProvider);
}

private DataFrameReader getDataFrameReader(String fileFormat) {
DataFrameReader dataFrameReader = sparkSession.read().format(fileFormat);
if (!StringUtils.isNullOrEmpty(props.getString(Config.SPARK_DATASOURCE_OPTIONS, null))) {
final ObjectMapper mapper = new ObjectMapper();
Map<String, String> sparkOptionsMap = null;
try {
sparkOptionsMap = mapper.readValue(props.getString(Config.SPARK_DATASOURCE_OPTIONS), Map.class);
} catch (IOException e) {
throw new HoodieException(String.format("Failed to parse sparkOptions: %s", props.getString(Config.SPARK_DATASOURCE_OPTIONS)), e);
}
Log.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
dataFrameReader = dataFrameReader.options(sparkOptionsMap);
}
return dataFrameReader;
}

@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH));
Expand Down Expand Up @@ -125,7 +151,7 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
queryTypeAndInstantEndpts.getRight().getLeft()));
}

if (source.isEmpty()) {
return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight());
}
Expand All @@ -141,7 +167,7 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
}
// add file format filtering by default
filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
filter = filter + " and s3.object.key like '%" + fileFormat + "%'";

String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
String s3Prefix = s3FS + "://";
Expand Down Expand Up @@ -174,7 +200,8 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
}
Option<Dataset<Row>> dataset = Option.empty();
if (!cloudFiles.isEmpty()) {
dataset = Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new String[0])));
DataFrameReader dataFrameReader = getDataFrameReader(fileFormat);
dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0])));
}
return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight());
}
Expand Down