Skip to content

Commit

Permalink
Add spark.feathr.output.parallelism to control output partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymo001 committed Jun 13, 2023
1 parent cb6032d commit 4e031dd
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,19 @@ object SparkIOUtils {
}
}
if(!dfWritten) {
val num_parts = parameters.get(FeathrUtils.DEBUG_OUTPUT_PART_NUM).getOrElse("10").toInt
// Honor the debug output part num config
val coalescedDf = outputDF.coalesce(num_parts)
val ss = SparkSession.builder().getOrCreate()
val num_parts = FeathrUtils.getFeathrJobParam(ss, FeathrUtils.OUTPUT_PARALLELISM).toInt
val repartitionedDf = outputDF.repartition(num_parts)
outputLocation match {
case SimplePath(path) => {
val output_format = coalescedDf.sqlContext.getConf("spark.feathr.outputFormat", "avro")
val output_format = repartitionedDf.sqlContext.getConf("spark.feathr.outputFormat", "avro")
// if the output format is set by spark configurations "spark.feathr.outputFormat"
// we will use that as the job output format; otherwise use avro as default for backward compatibility
if(!outputDF.isEmpty) {
coalescedDf.write.mode(SaveMode.Overwrite).format(output_format).save(path)
repartitionedDf.write.mode(SaveMode.Overwrite).format(output_format).save(path)
}
}
case _ => outputLocation.writeDf(SparkSession.builder().getOrCreate(), coalescedDf, None)
case _ => outputLocation.writeDf(SparkSession.builder().getOrCreate(), repartitionedDf, None)
}
}
outputDF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ private[feathr] object FeathrUtils {
val DEBUG_FEATURE_NAMES = "debug.feature.names"
val DEBUG_OUTPUT_PATH = "debug.output.path"
val DEBUG_OUTPUT_PART_NUM = "debug.output.num.parts"
val OUTPUT_PARALLELISM = "output.parallelism"
val FEATHR_PARAMS_PREFIX = "spark.feathr."
/*
* The execution config controls feathr-offline behavior when loading date partitioned feature data.
Expand Down Expand Up @@ -67,6 +68,7 @@ private[feathr] object FeathrUtils {
// Check point every {CHECKPOINT_FREQUENCY} dataframes
CHECKPOINT_FREQUENCY -> (SQLConf.buildConf(getFullConfigKeyName(CHECKPOINT_FREQUENCY )).stringConf.createOptional, "10"),
DEBUG_OUTPUT_PART_NUM -> (SQLConf.buildConf(getFullConfigKeyName(DEBUG_OUTPUT_PART_NUM )).stringConf.createOptional, "200"),
OUTPUT_PARALLELISM -> (SQLConf.buildConf(getFullConfigKeyName(OUTPUT_PARALLELISM )).stringConf.createOptional, "200"),
FAIL_ON_MISSING_PARTITION -> (SQLConf.buildConf(getFullConfigKeyName(FAIL_ON_MISSING_PARTITION )).stringConf.createOptional, "false"),
SEQ_JOIN_ARRAY_EXPLODE_ENABLED -> (SQLConf.buildConf(getFullConfigKeyName(SEQ_JOIN_ARRAY_EXPLODE_ENABLED )).stringConf.createOptional, "true"),
ENABLE_SALTED_JOIN -> (SQLConf.buildConf(getFullConfigKeyName(ENABLE_SALTED_JOIN )).stringConf.createOptional, "false"),
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=1.0.5-rc1
version=1.0.5-rc2
SONATYPE_AUTOMATIC_RELEASE=true
POM_ARTIFACT_ID=feathr_2.12

0 comments on commit 4e031dd

Please sign in to comment.