Permalink
Cannot retrieve contributors at this time
Fetching contributors…
| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| package org.apache.spark | |
| import java.io.Serializable | |
| import scala.collection.generic.Growable | |
| import scala.reflect.ClassTag | |
| import org.apache.spark.scheduler.AccumulableInfo | |
| import org.apache.spark.serializer.JavaSerializer | |
| import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, LegacyAccumulatorWrapper} | |
| /** | |
| * A data type that can be accumulated, i.e. has a commutative and associative "add" operation, | |
| * but where the result type, `R`, may be different from the element type being added, `T`. | |
| * | |
| * You must define how to add data, and how to merge two of these together. For some data types, | |
| * such as a counter, these might be the same operation. In that case, you can use the simpler | |
| * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are | |
| * accumulating a set. You will add items to the set, and you will union two sets together. | |
| * | |
| * Operations are not thread-safe. | |
| * | |
| * @param id ID of this accumulator; for internal use only. | |
| * @param initialValue initial value of accumulator | |
| * @param param helper object defining how to add elements of type `R` and `T` | |
| * @param name human-readable name for use in Spark's web UI | |
| * @param countFailedValues whether to accumulate values from failed tasks. This is set to true | |
| * for system and time metrics like serialization time or bytes spilled, | |
| * and false for things with absolute values like number of input rows. | |
| * This should be used for internal metrics only. | |
| * @tparam R the full accumulated data (result type) | |
| * @tparam T partial data that can be added in | |
| */ | |
| @deprecated("use AccumulatorV2", "2.0.0") | |
| class Accumulable[R, T] private ( | |
| val id: Long, | |
| // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile | |
| @transient private val initialValue: R, | |
| param: AccumulableParam[R, T], | |
| val name: Option[String], | |
| private[spark] val countFailedValues: Boolean) | |
| extends Serializable { | |
| private[spark] def this( | |
| initialValue: R, | |
| param: AccumulableParam[R, T], | |
| name: Option[String], | |
| countFailedValues: Boolean) = { | |
| this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues) | |
| } | |
| private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = { | |
| this(initialValue, param, name, false /* countFailedValues */) | |
| } | |
| def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None) | |
| val zero = param.zero(initialValue) | |
| private[spark] val newAcc = new LegacyAccumulatorWrapper(initialValue, param) | |
| newAcc.metadata = AccumulatorMetadata(id, name, countFailedValues) | |
| // Register the new accumulator in ctor, to follow the previous behaviour. | |
| AccumulatorContext.register(newAcc) | |
| /** | |
| * Add more data to this accumulator / accumulable | |
| * @param term the data to add | |
| */ | |
| def += (term: T) { newAcc.add(term) } | |
| /** | |
| * Add more data to this accumulator / accumulable | |
| * @param term the data to add | |
| */ | |
| def add(term: T) { newAcc.add(term) } | |
| /** | |
| * Merge two accumulable objects together | |
| * | |
| * Normally, a user will not want to use this version, but will instead call `+=`. | |
| * @param term the other `R` that will get merged with this | |
| */ | |
| def ++= (term: R) { newAcc._value = param.addInPlace(newAcc._value, term) } | |
| /** | |
| * Merge two accumulable objects together | |
| * | |
| * Normally, a user will not want to use this version, but will instead call `add`. | |
| * @param term the other `R` that will get merged with this | |
| */ | |
| def merge(term: R) { newAcc._value = param.addInPlace(newAcc._value, term) } | |
| /** | |
| * Access the accumulator's current value; only allowed on driver. | |
| */ | |
| def value: R = { | |
| if (newAcc.isAtDriverSide) { | |
| newAcc.value | |
| } else { | |
| throw new UnsupportedOperationException("Can't read accumulator value in task") | |
| } | |
| } | |
| /** | |
| * Get the current value of this accumulator from within a task. | |
| * | |
| * This is NOT the global value of the accumulator. To get the global value after a | |
| * completed operation on the dataset, call `value`. | |
| * | |
| * The typical use of this method is to directly mutate the local value, eg., to add | |
| * an element to a Set. | |
| */ | |
| def localValue: R = newAcc.value | |
| /** | |
| * Set the accumulator's value; only allowed on driver. | |
| */ | |
| def value_= (newValue: R) { | |
| if (newAcc.isAtDriverSide) { | |
| newAcc._value = newValue | |
| } else { | |
| throw new UnsupportedOperationException("Can't assign accumulator value in task") | |
| } | |
| } | |
| /** | |
| * Set the accumulator's value. For internal use only. | |
| */ | |
| def setValue(newValue: R): Unit = { newAcc._value = newValue } | |
| /** | |
| * Set the accumulator's value. For internal use only. | |
| */ | |
| private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) } | |
| /** | |
| * Create an [[AccumulableInfo]] representation of this [[Accumulable]] with the provided values. | |
| */ | |
| private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { | |
| val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) | |
| new AccumulableInfo(id, name, update, value, isInternal, countFailedValues) | |
| } | |
| override def toString: String = if (newAcc._value == null) "null" else newAcc._value.toString | |
| } | |
| /** | |
| * Helper object defining how to accumulate values of a particular type. An implicit | |
| * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type. | |
| * | |
| * @tparam R the full accumulated data (result type) | |
| * @tparam T partial data that can be added in | |
| */ | |
| @deprecated("use AccumulatorV2", "2.0.0") | |
| trait AccumulableParam[R, T] extends Serializable { | |
| /** | |
| * Add additional data to the accumulator value. Is allowed to modify and return `r` | |
| * for efficiency (to avoid allocating objects). | |
| * | |
| * @param r the current value of the accumulator | |
| * @param t the data to be added to the accumulator | |
| * @return the new value of the accumulator | |
| */ | |
| def addAccumulator(r: R, t: T): R | |
| /** | |
| * Merge two accumulated values together. Is allowed to modify and return the first value | |
| * for efficiency (to avoid allocating objects). | |
| * | |
| * @param r1 one set of accumulated data | |
| * @param r2 another set of accumulated data | |
| * @return both data sets merged together | |
| */ | |
| def addInPlace(r1: R, r2: R): R | |
| /** | |
| * Return the "zero" (identity) value for an accumulator type, given its initial value. For | |
| * example, if R was a vector of N dimensions, this would return a vector of N zeroes. | |
| */ | |
| def zero(initialValue: R): R | |
| } | |
| @deprecated("use AccumulatorV2", "2.0.0") | |
| private[spark] class | |
| GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] | |
| extends AccumulableParam[R, T] { | |
| def addAccumulator(growable: R, elem: T): R = { | |
| growable += elem | |
| growable | |
| } | |
| def addInPlace(t1: R, t2: R): R = { | |
| t1 ++= t2 | |
| t1 | |
| } | |
| def zero(initialValue: R): R = { | |
| // We need to clone initialValue, but it's hard to specify that R should also be Cloneable. | |
| // Instead we'll serialize it to a buffer and load it back. | |
| val ser = new JavaSerializer(new SparkConf(false)).newInstance() | |
| val copy = ser.deserialize[R](ser.serialize(initialValue)) | |
| copy.clear() // In case it contained stuff | |
| copy | |
| } | |
| } |