From 9267c4b23e411349a4be9556a44e37f31dedd5ac Mon Sep 17 00:00:00 2001 From: harshal patil Date: Fri, 25 Mar 2022 15:10:39 +0530 Subject: [PATCH] [HUDI-3745] Support for spark datasource options in S3EventsHoodieIncrSource --- .../sources/S3EventsHoodieIncrSource.java | 35 ++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 2f7d9898b95b0..483e44830c7c1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -26,14 +26,18 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; +import com.esotericsoftware.minlog.Log; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -42,6 +46,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT; @@ -50,7 +55,6 @@ import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT; - /** * This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}. */ @@ -71,6 +75,12 @@ static class Config { static final String S3_IGNORE_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.ignore.key.prefix"; // control whether to ignore the s3 objects with this substring static final String S3_IGNORE_KEY_SUBSTRING = "hoodie.deltastreamer.source.s3incr.ignore.key.substring"; + /** + *{@value #SPARK_DATASOURCE_OPTIONS} is json string, passed to the reader while loading dataset. + * Example delta streamer conf + * - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"} + */ + static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options"; } public S3EventsHoodieIncrSource( @@ -81,6 +91,22 @@ public S3EventsHoodieIncrSource( super(props, sparkContext, sparkSession, schemaProvider); } + private DataFrameReader getDataFrameReader(String fileFormat) { + DataFrameReader dataFrameReader = sparkSession.read().format(fileFormat); + if (!StringUtils.isNullOrEmpty(props.getString(Config.SPARK_DATASOURCE_OPTIONS, null))) { + final ObjectMapper mapper = new ObjectMapper(); + Map sparkOptionsMap = null; + try { + sparkOptionsMap = mapper.readValue(props.getString(Config.SPARK_DATASOURCE_OPTIONS), Map.class); + } catch (IOException e) { + throw new HoodieException(String.format("Failed to parse sparkOptions: %s", props.getString(Config.SPARK_DATASOURCE_OPTIONS)), e); + } + Log.info(String.format("sparkOptions loaded: %s", sparkOptionsMap)); + dataFrameReader = dataFrameReader.options(sparkOptionsMap); + } + return dataFrameReader; + } + @Override public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH)); @@ -125,7 +151,7 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryTypeAndInstantEndpts.getRight().getLeft())); } - + if (source.isEmpty()) { return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight()); } @@ -141,7 +167,7 @@ public Pair>, String> fetchNextBatch(Option lastCkpt filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'"; } // add file format filtering by default - filter = filter + " and s3.object.key like '%" + fileFormat + "%'"; + filter = filter + " and s3.object.key like '%" + fileFormat + "%'"; String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase(); String s3Prefix = s3FS + "://"; @@ -174,7 +200,8 @@ public Pair>, String> fetchNextBatch(Option lastCkpt } Option> dataset = Option.empty(); if (!cloudFiles.isEmpty()) { - dataset = Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new String[0]))); + DataFrameReader dataFrameReader = getDataFrameReader(fileFormat); + dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0]))); } return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight()); }