Permalink
| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| package org.apache.spark | |
| import org.apache.spark.annotation.DeveloperApi | |
| import org.apache.spark.util.collection.ExternalAppendOnlyMap | |
| /** | |
| * :: DeveloperApi :: | |
| * A set of functions used to aggregate data. | |
| * | |
| * @param createCombiner function to create the initial value of the aggregation. | |
| * @param mergeValue function to merge a new value into the aggregation result. | |
| * @param mergeCombiners function to merge outputs from multiple mergeValue function. | |
| */ | |
| @DeveloperApi | |
| case class Aggregator[K, V, C] ( | |
| createCombiner: V => C, | |
| mergeValue: (C, V) => C, | |
| mergeCombiners: (C, C) => C) { | |
| def combineValuesByKey( | |
| iter: Iterator[_ <: Product2[K, V]], | |
| context: TaskContext): Iterator[(K, C)] = { | |
| val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) | |
| combiners.insertAll(iter) | |
| updateMetrics(context, combiners) | |
| combiners.iterator | |
| } | |
| def combineCombinersByKey( | |
| iter: Iterator[_ <: Product2[K, C]], | |
| context: TaskContext): Iterator[(K, C)] = { | |
| val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners) | |
| combiners.insertAll(iter) | |
| updateMetrics(context, combiners) | |
| combiners.iterator | |
| } | |
| /** Update task metrics after populating the external map. */ | |
| private def updateMetrics(context: TaskContext, map: ExternalAppendOnlyMap[_, _, _]): Unit = { | |
| Option(context).foreach { c => | |
| c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled) | |
| c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled) | |
| c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes) | |
| } | |
| } | |
| } |