Skip to content

Commit

Permalink
fix serialization issue
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilsimha committed May 14, 2024
1 parent d691502 commit 6f56099
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions spark/src/main/scala/ai/chronon/spark/GroupBy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory
import java.util
import scala.collection.{Seq, mutable}
import scala.util.ScalaJavaConversions.{JListOps, ListOps, MapOps}
import scala.util.Try

class GroupBy(val aggregations: Seq[api.Aggregation],
val keyColumns: Seq[String],
Expand Down Expand Up @@ -515,22 +516,21 @@ object GroupBy {
// Generate mutation Df if required, align the columns with inputDf so no additional schema is needed by aggregator.
val mutationSources = groupByConf.sources.toScala.filter { _.isSetEntities }
val mutationsColumnOrder = inputDf.columns ++ Constants.MutationFields.map(_.name)
val mutationQueriesTry = Try(
mutationSources.map(ms =>
renderDataSourceQuery(groupByConf,
ms,
groupByConf.getKeyColumns.toScala,
queryRange.shift(1),
tableUtils,
groupByConf.maxWindow,
groupByConf.inferredAccuracy,
mutations = true)))

def mutationDfFn(): DataFrame = {
val df: DataFrame = if (groupByConf.inferredAccuracy == api.Accuracy.TEMPORAL && mutationSources.nonEmpty) {
val mutationDf = mutationSources
.map {
renderDataSourceQuery(groupByConf,
_,
groupByConf.getKeyColumns.toScala,
queryRange.shift(1),
tableUtils,
groupByConf.maxWindow,
groupByConf.inferredAccuracy,
mutations = true)
}
.map {
tableUtils.sql
}
val mutationDf = mutationQueriesTry.get
.map { tableUtils.sql }
.reduce { (df1, df2) =>
val columns1 = df1.schema.fields.map(_.name)
df1.union(df2.selectExpr(columns1: _*))
Expand Down

0 comments on commit 6f56099

Please sign in to comment.