Skip to content

Commit

Permalink
[MINOR][REFACTORING] KeyValueGroupedDataset.mapGroupsWithState uses f…
Browse files Browse the repository at this point in the history
…latMapGroupsWithState
  • Loading branch information
jaceklaskowski committed Jul 15, 2017
1 parent cb8d5cc commit ce51466
Showing 1 changed file with 2 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
def mapGroupsWithState[S: Encoder, U: Encoder](
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
Dataset[U](
sparkSession,
FlatMapGroupsWithState[K, V, S, U](
flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
groupingAttributes,
dataAttributes,
OutputMode.Update,
isMapGroupsWithState = true,
GroupStateTimeout.NoTimeout,
child = logicalPlan))
flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.NoTimeout)(flatMapFunc)
}

/**
Expand All @@ -278,16 +269,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
timeoutConf: GroupStateTimeout)(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
Dataset[U](
sparkSession,
FlatMapGroupsWithState[K, V, S, U](
flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
groupingAttributes,
dataAttributes,
OutputMode.Update,
isMapGroupsWithState = true,
timeoutConf,
child = logicalPlan))
flatMapGroupsWithState(OutputMode.Update, timeoutConf)(flatMapFunc)
}

/**
Expand Down

0 comments on commit ce51466

Please sign in to comment.