Skip to content

[Feature] Map-Reduce Aggregation Redesign #13694

@hanahmily

Description

@hanahmily

Search before asking

  • I had searched in the issues and found no similar feature requirement.

Description

Current Architecture

The Func[N] interface (In, Val, Reset) is used as a monolithic accumulator everywhere:

  • Data node aggregation plans (via Analyze)
  • Liaison node aggregation plans (via DistributedAnalyze)
  • TopN post-processor

The distributed push-down is handled with hacks:

  • needCompletePushDownAgg only supports MAX/MIN/SUM/COUNT (not MEAN)
  • COUNT is silently converted to SUM at the liaison (measure_analyzer.go:185-186)
  • The liaison just deduplicates push-down results rather than properly reducing

New Interface Design

Intermediate type in aggregation.go

// Partial represents the intermediate result of a Map phase.
// For most functions, only Value is meaningful.
// For MEAN, both Value (sum) and Count are used.
type Partial[N Number] struct {
    Value N
    Count N
}

Map interface (replaces Func, used by data node, standalone, and TopN)

// Map accumulates raw values and produces aggregation results.
// It serves as the local accumulator for raw data points.
type Map[N Number] interface {
    // In feeds a raw value into the accumulator.
    In(N)
    // Val returns the locally finalized aggregation result.
    // For MEAN, this computes sum/count. For others, same as Partial().Value.
    Val() N
    // Partial returns the intermediate result for the reduce phase.
    Partial() Partial[N]
    // Reset clears the accumulator for reuse.
    Reset()
}

Reduce interface (new, for liaison node use)

// Reduce combines intermediate results from Map phases into a final value.
type Reduce[N Number] interface {
    // Combine feeds an intermediate result from a Map phase.
    Combine(Partial[N])
    // Val returns the final aggregated value.
    Val() N
    // Reset clears the accumulator for reuse.
    Reset()
}

Factory functions

func NewMap[N Number](af modelv1.AggregationFunction) (Map[N], error)
func NewReduce[N Number](af modelv1.AggregationFunction) (Reduce[N], error)

Concrete Implementations in function.go

SUM

  • Map: Accumulates running sum. Partial() returns {Value: sum}.
  • Reduce: Sums incoming Value fields. Val() returns total sum.

COUNT

  • Map: Increments counter. Partial() returns {Value: count}.
  • Reduce: Sums incoming Value fields (same logic as SUM reduce). Val() returns total count.

MAX

  • Map: Tracks local maximum. Partial() returns {Value: max}.
  • Reduce: Tracks maximum of incoming Value fields. Val() returns global max.

MIN

  • Map: Tracks local minimum. Partial() returns {Value: min}.
  • Reduce: Tracks minimum of incoming Value fields. Val() returns global min.

MEAN (the key case this enables)

  • Map: Tracks sum and count. Partial() returns {Value: sum, Count: count}.
  • Reduce: Accumulates total sum and total count from incoming partials. Val() returns totalSum / totalCount.

Serialization Helpers in aggregation.go

// PartialToFieldValues converts a Partial to field values for wire transport.
func PartialToFieldValues[N Number](p Partial[N]) ([]*modelv1.FieldValue, error)

// FieldValuesToPartial converts field values from wire transport to a Partial.
func FieldValuesToPartial[N Number](fvs []*modelv1.FieldValue) (Partial[N], error)

For non-MEAN functions, this produces a single FieldValue. For MEAN, it produces two (sum and count).

Interface Usage by Context

Context Interface Methods used
Standalone (single node via Analyze) Map In(), Val(), Reset()
Distributed - data node Map In(), Partial(), Reset()
Distributed - liaison node Reduce Combine(), Val(), Reset()
TopN post-processor Map In(), Val(), Reset()

Usage Changes

Standalone / Data node: measure_plan_aggregation.go

The aggregationPlan and its iterators (aggGroupIterator, aggAllIterator) use Map[N] instead of Func[N]:

  • Standalone (single node): Call mapFunc.In(v) for each raw value, then mapFunc.Val() for the final result. This is a drop-in replacement for the old Func — same In/Val/Reset contract.
  • Distributed data node: Call mapFunc.In(v) for each raw value, then mapFunc.Partial() to produce the intermediate result. Convert via PartialToFieldValues for the wire response.

Liaison node: measure_plan_aggregation.go

The liaison-side aggregation plan and iterators use Reduce[N]:

  • Receive intermediate results from data nodes
  • Call reduceFunc.Combine(partial) for each intermediate
  • Call reduceFunc.Val() to produce the final result

This likely means splitting the current aggregationPlan into two variants (map vs reduce) or parameterizing it with a mode flag, since the data node plan and liaison node plan serve different roles.

Distributed plan: measure_plan_distributed.go

  • Remove the needCompletePushDownAgg flag (all aggregation functions can now be pushed down)
  • Remove the COUNT-to-SUM conversion hack in measure_analyzer.go:185-186
  • The distributed plan always pushes down the Map phase and the liaison always runs Reduce
  • The deduplication logic (deduplicateAggregatedDataPointsWithShard) remains for replica handling

TopN post-processor: topn_post_processor.go

Replace int64Func aggregation.Func[int64] with mapFunc aggregation.Map[int64] on topNAggregatorItem. TopN feeds raw values into the aggregation, so it needs Map semantics (not Reduce). For COUNT, Map.In() correctly increments a counter rather than summing values:

type topNAggregatorItem struct {
    mapFunc aggregation.Map[int64]
    // ... other fields unchanged
}

Usage changes:

  • exist.mapFunc.In(item.val) instead of exist.int64Func.In(item.val) (same signature)
  • item.mapFunc.Val() instead of item.int64Func.Val() (same signature)

Execution Flow (distributed)

sequenceDiagram
    participant Client
    participant Liaison as Liaison_Node
    participant Data1 as Data_Node_1
    participant Data2 as Data_Node_2

    Client->>Liaison: QueryRequest with Agg
    Liaison->>Data1: InternalQueryRequest (push down agg)
    Liaison->>Data2: InternalQueryRequest (push down agg)

    Note over Data1: Map phase
    Data1->>Data1: mapFunc.In(rawValue) per data point
    Data1->>Data1: mapFunc.Partial() -> intermediate
    Data1-->>Liaison: InternalQueryResponse with Partial results

    Note over Data2: Map phase
    Data2->>Data2: mapFunc.In(rawValue) per data point
    Data2->>Data2: mapFunc.Partial() -> intermediate
    Data2-->>Liaison: InternalQueryResponse with Partial results

    Note over Liaison: Reduce phase
    Liaison->>Liaison: reduceFunc.Combine(partial1)
    Liaison->>Liaison: reduceFunc.Combine(partial2)
    Liaison->>Liaison: reduceFunc.Val() -> final result
    Liaison-->>Client: QueryResponse
Loading

related to #13291

Use case

No response

Related issues

No response

Are you willing to submit a pull request to implement this on your own?

  • Yes I am willing to submit a pull request on my own!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    databaseBanyanDB - SkyWalking native databasefeatureNew feature

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions