-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Group by upload: use repartition to increase parallelism #601
Changes from all commits
32d6bce
4d37b0e
a219c8b
0ae630b
c91f7e9
b4a7781
427f420
d4f47c6
b1c484e
d773fc9
b0a8f7c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,12 @@ | ||
package ai.chronon.spark | ||
|
||
import ai.chronon.aggregator.windowing.{FinalBatchIr, FiveMinuteResolution, Resolution, SawtoothOnlineAggregator} | ||
import ai.chronon.aggregator.windowing.{ | ||
BatchIr, | ||
FinalBatchIr, | ||
FiveMinuteResolution, | ||
Resolution, | ||
SawtoothOnlineAggregator | ||
} | ||
import ai.chronon.api | ||
import ai.chronon.api.{Accuracy, Constants, DataModel, GroupByServingInfo, QueryUtils, ThriftJsonCodec} | ||
import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps} | ||
|
@@ -60,11 +66,25 @@ class GroupByUpload(endPartition: String, groupBy: GroupBy) extends Serializable | |
.serialize(sawtoothOnlineAggregator.init) | ||
.capacity()} | ||
|""".stripMargin) | ||
val outputRdd = groupBy.inputDf.rdd | ||
.keyBy(keyBuilder) | ||
.mapValues(SparkConversions.toChrononRow(_, groupBy.tsIndex)) | ||
.aggregateByKey(sawtoothOnlineAggregator.init)( // shuffle point | ||
seqOp = sawtoothOnlineAggregator.update, combOp = sawtoothOnlineAggregator.merge) | ||
|
||
def seqOp(batchIr: BatchIr, row: Row): BatchIr = { | ||
sawtoothOnlineAggregator.update(batchIr, SparkConversions.toChrononRow(row, groupBy.tsIndex)) | ||
} | ||
|
||
val parallelism = sparkSession.sparkContext.getConf.getInt("spark.default.parallelism", 1000) | ||
val inputPartition = groupBy.inputDf.rdd.getNumPartitions | ||
val keyedInputRdd = groupBy.inputDf.rdd.keyBy(keyBuilder) | ||
// shuffle point: the input rdd has less number of partitions due to compact size | ||
// when rows are converted to chronon rows, the size increases | ||
// so we repartition it to reduce memory overhead and improve performance | ||
val keyedInputRddRepartitioned = if (inputPartition < (parallelism / 10)) { | ||
keyedInputRdd | ||
.repartition(parallelism) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs to be configurable (OPT_IN) before merging - we are going to add a shuffle step to ALL the upload jobs. By default it should be opt-out There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. Let me make it configurable. |
||
} else { | ||
keyedInputRdd | ||
} | ||
val outputRdd = keyedInputRddRepartitioned | ||
.aggregateByKey(sawtoothOnlineAggregator.init)(seqOp = seqOp, combOp = sawtoothOnlineAggregator.merge) | ||
.mapValues(sawtoothOnlineAggregator.normalizeBatchIr) | ||
.map { | ||
case (keyWithHash: KeyWithHash, finalBatchIr: FinalBatchIr) => | ||
|
@@ -75,7 +95,6 @@ class GroupByUpload(endPartition: String, groupBy: GroupBy) extends Serializable | |
} | ||
KvRdd(outputRdd, groupBy.keySchema, irSchema) | ||
} | ||
|
||
} | ||
|
||
object GroupByUpload { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to make this
10
configurable?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah we can make it configurable