Skip to content

Commit

Permalink
Fix build warnings primarily related to Breeze 0.13 operator changes,…
Browse files Browse the repository at this point in the history
… "ProcessingTime" method name and Java style problems
  • Loading branch information
srowen committed Apr 28, 2017
1 parent 8c911ad commit bd3de24
Show file tree
Hide file tree
Showing 35 changed files with 148 additions and 137 deletions.
Expand Up @@ -363,9 +363,9 @@ protected File initRecoveryDb(String dbName) {
// If another DB was initialized first just make sure all the DBs are in the same
// location.
Path newLoc = new Path(_recoveryPath, dbName);
Path copyFrom = new Path(f.toURI());
Path copyFrom = new Path(f.toURI());
if (!newLoc.equals(copyFrom)) {
logger.info("Moving " + copyFrom + " to: " + newLoc);
logger.info("Moving " + copyFrom + " to: " + newLoc);
try {
// The move here needs to handle moving non-empty directories across NFS mounts
FileSystem fs = FileSystem.getLocal(_conf);
Expand Down
Expand Up @@ -48,7 +48,8 @@ public final class Platform {
boolean _unaligned;
String arch = System.getProperty("os.arch", "");
if (arch.equals("ppc64le") || arch.equals("ppc64")) {
// Since java.nio.Bits.unaligned() doesn't return true on ppc (See JDK-8165231), but ppc64 and ppc64le support it
// Since java.nio.Bits.unaligned() doesn't return true on ppc (See JDK-8165231), but
// ppc64 and ppc64le support it
_unaligned = true;
} else {
try {
Expand Down
Expand Up @@ -155,7 +155,8 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
for (MemoryConsumer c: consumers) {
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
long key = c.getUsed();
List<MemoryConsumer> list = sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
List<MemoryConsumer> list =
sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
list.add(c);
}
}
Expand Down
Expand Up @@ -1070,11 +1070,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched.dagScheduler = mockDAGScheduler
val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1))
when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).then(new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
assert(manager.isZombie === true)
}
})
when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).thenAnswer(
new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
assert(manager.isZombie)
}
})
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
// this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.storage

import scala.collection.mutable
import scala.language.implicitConversions
import scala.util.Random

import org.scalatest.{BeforeAndAfter, Matchers}
Expand Down
Expand Up @@ -28,8 +28,6 @@
import java.sql.Timestamp;
import java.util.*;

import scala.Tuple2;

/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network.
* <p>
Expand Down Expand Up @@ -148,7 +146,7 @@ public Iterator<Event> call(LineWithTimestamp lineWithTimestamp) throws Exceptio
stateUpdateFunc,
Encoders.bean(SessionInfo.class),
Encoders.bean(SessionUpdate.class),
GroupStateTimeout.ProcessingTimeTimeout());
GroupStateTimeout.processingTimeTimeout());

// Start running the query that prints the session updates to the console
StreamingQuery query = sessionUpdates
Expand Down
Expand Up @@ -74,7 +74,7 @@ object StructuredSessionization {
// and report session updates.
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
.mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.processingTimeTimeout) {

case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>

Expand Down
14 changes: 7 additions & 7 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
Expand Up @@ -226,18 +226,18 @@ object PageRank extends Logging {
// Propagates the message along outbound edges
// and adding start nodes back in with activation resetProb
val rankUpdates = rankGraph.aggregateMessages[BV[Double]](
ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
(a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src)
ctx => ctx.sendToDst(ctx.srcAttr *:* ctx.attr),
(a : BV[Double], b : BV[Double]) => a +:+ b, TripletFields.Src)

rankGraph = rankGraph.outerJoinVertices(rankUpdates) {
(vid, oldRank, msgSumOpt) =>
val popActivations: BV[Double] = msgSumOpt.getOrElse(zero) :* (1.0 - resetProb)
val popActivations: BV[Double] = msgSumOpt.getOrElse(zero) *:* (1.0 - resetProb)
val resetActivations = if (sourcesInitMapBC.value contains vid) {
sourcesInitMapBC.value(vid) :* resetProb
sourcesInitMapBC.value(vid) *:* resetProb
} else {
zero
}
popActivations :+ resetActivations
popActivations +:+ resetActivations
}.cache()

rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
Expand All @@ -250,9 +250,9 @@ object PageRank extends Logging {
}

// SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
val rankSums = rankGraph.vertices.values.fold(zero)(_ :+ _)
val rankSums = rankGraph.vertices.values.fold(zero)(_ +:+ _)
rankGraph.mapVertices { (vid, attr) =>
Vectors.fromBreeze(attr :/ rankSums)
Vectors.fromBreeze(attr /:/ rankSums)
}
}

Expand Down
Expand Up @@ -56,7 +56,7 @@ private[ann] class SigmoidLayerModelWithSquaredError
extends FunctionalLayerModel(new FunctionalLayer(new SigmoidFunction)) with LossFunction {
override def loss(output: BDM[Double], target: BDM[Double], delta: BDM[Double]): Double = {
ApplyInPlace(output, target, delta, (o: Double, t: Double) => o - t)
val error = Bsum(delta :* delta) / 2 / output.cols
val error = Bsum(delta *:* delta) / 2 / output.cols
ApplyInPlace(delta, output, delta, (x: Double, o: Double) => x * (o - o * o))
error
}
Expand Down Expand Up @@ -119,6 +119,6 @@ private[ann] class SoftmaxLayerModelWithCrossEntropyLoss extends LayerModel with

override def loss(output: BDM[Double], target: BDM[Double], delta: BDM[Double]): Double = {
ApplyInPlace(output, target, delta, (o: Double, t: Double) => o - t)
-Bsum( target :* brzlog(output)) / output.cols
-Bsum( target *:* brzlog(output)) / output.cols
}
}
Expand Up @@ -472,7 +472,7 @@ class GaussianMixture @Since("2.0.0") (
*/
val cov = {
val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) ^:^ 2.0)
val diagVec = Vectors.fromBreeze(ss)
BLAS.scal(1.0 / numSamples, diagVec)
val covVec = new DenseVector(Array.fill[Double](
Expand Down
Expand Up @@ -271,7 +271,7 @@ class GaussianMixture private (
private def initCovariance(x: IndexedSeq[BV[Double]]): BreezeMatrix[Double] = {
val mu = vectorMean(x)
val ss = BDV.zeros[Double](x(0).length)
x.foreach(xi => ss += (xi - mu) :^ 2.0)
x.foreach(xi => ss += (xi - mu) ^:^ 2.0)
diag(ss / x.length.toDouble)
}
}
Expand Down
Expand Up @@ -314,7 +314,7 @@ class LocalLDAModel private[spark] (
docBound += count * LDAUtils.logSumExp(Elogthetad + localElogbeta(idx, ::).t)
}
// E[log p(theta | alpha) - log q(theta | gamma)]
docBound += sum((brzAlpha - gammad) :* Elogthetad)
docBound += sum((brzAlpha - gammad) *:* Elogthetad)
docBound += sum(lgamma(gammad) - lgamma(brzAlpha))
docBound += lgamma(sum(brzAlpha)) - lgamma(sum(gammad))

Expand All @@ -324,7 +324,7 @@ class LocalLDAModel private[spark] (
// Bound component for prob(topic-term distributions):
// E[log p(beta | eta) - log q(beta | lambda)]
val sumEta = eta * vocabSize
val topicsPart = sum((eta - lambda) :* Elogbeta) +
val topicsPart = sum((eta - lambda) *:* Elogbeta) +
sum(lgamma(lambda) - lgamma(eta)) +
sum(lgamma(sumEta) - lgamma(sum(lambda(::, breeze.linalg.*))))

Expand Down Expand Up @@ -721,7 +721,7 @@ class DistributedLDAModel private[clustering] (
val N_wj = edgeContext.attr
val smoothed_N_wk: TopicCounts = edgeContext.dstAttr + (eta - 1.0)
val smoothed_N_kj: TopicCounts = edgeContext.srcAttr + (alpha - 1.0)
val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k
val phi_wk: TopicCounts = smoothed_N_wk /:/ smoothed_N_k
val theta_kj: TopicCounts = normalize(smoothed_N_kj, 1.0)
val tokenLogLikelihood = N_wj * math.log(phi_wk.dot(theta_kj))
edgeContext.sendToDst(tokenLogLikelihood)
Expand All @@ -748,7 +748,7 @@ class DistributedLDAModel private[clustering] (
if (isTermVertex(vertex)) {
val N_wk = vertex._2
val smoothed_N_wk: TopicCounts = N_wk + (eta - 1.0)
val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k
val phi_wk: TopicCounts = smoothed_N_wk /:/ smoothed_N_k
sumPrior + (eta - 1.0) * sum(phi_wk.map(math.log))
} else {
val N_kj = vertex._2
Expand Down
Expand Up @@ -482,7 +482,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
stats.unpersist()
expElogbetaBc.destroy(false)
val batchResult = statsSum :* expElogbeta.t
val batchResult = statsSum *:* expElogbeta.t

// Note that this is an optimization to avoid batch.count
updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
Expand Down Expand Up @@ -522,7 +522,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {

val dalpha = -(gradf - b) / q

if (all((weight * dalpha + alpha) :> 0D)) {
if (all((weight * dalpha + alpha) >:> 0D)) {
alpha :+= weight * dalpha
this.alpha = Vectors.dense(alpha.toArray)
}
Expand Down Expand Up @@ -584,22 +584,22 @@ private[clustering] object OnlineLDAOptimizer {
val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad)) // K
val expElogbetad = expElogbeta(ids, ::).toDenseMatrix // ids * K

val phiNorm: BDV[Double] = expElogbetad * expElogthetad :+ 1e-100 // ids
val phiNorm: BDV[Double] = expElogbetad * expElogthetad +:+ 1e-100 // ids
var meanGammaChange = 1D
val ctsVector = new BDV[Double](cts) // ids

// Iterate between gamma and phi until convergence
while (meanGammaChange > 1e-3) {
val lastgamma = gammad.copy
// K K * ids ids
gammad := (expElogthetad :* (expElogbetad.t * (ctsVector :/ phiNorm))) :+ alpha
gammad := (expElogthetad *:* (expElogbetad.t * (ctsVector /:/ phiNorm))) +:+ alpha
expElogthetad := exp(LDAUtils.dirichletExpectation(gammad))
// TODO: Keep more values in log space, and only exponentiate when needed.
phiNorm := expElogbetad * expElogthetad :+ 1e-100
phiNorm := expElogbetad * expElogthetad +:+ 1e-100
meanGammaChange = sum(abs(gammad - lastgamma)) / k
}

val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector :/ phiNorm).asDenseMatrix
val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector /:/ phiNorm).asDenseMatrix
(gammad, sstatsd, ids)
}
}
Expand Up @@ -29,7 +29,7 @@ private[clustering] object LDAUtils {
*/
private[clustering] def logSumExp(x: BDV[Double]): Double = {
val a = max(x)
a + log(sum(exp(x :- a)))
a + log(sum(exp(x -:- a)))
}

/**
Expand Down
Expand Up @@ -168,7 +168,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
assert(m1.pi ~== m2.pi relTol 0.01)
assert(m1.theta ~== m2.theta relTol 0.01)
}
val testParams = Seq(
val testParams = Seq[(String, Dataset[_])](
("bernoulli", bernoulliDataset),
("multinomial", dataset)
)
Expand Down
4 changes: 0 additions & 4 deletions pom.xml
Expand Up @@ -58,10 +58,6 @@
<url>https://issues.apache.org/jira/browse/SPARK</url>
</issueManagement>

<prerequisites>
<maven>${maven.version}</maven>
</prerequisites>

<mailingLists>
<mailingList>
<name>Dev Mailing List</name>
Expand Down
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.scheduler.cluster

import scala.language.reflectiveCalls

import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar

Expand Down
Expand Up @@ -37,7 +37,9 @@ public class GroupStateTimeout {
* `map/flatMapGroupsWithState` by calling `GroupState.setTimeoutDuration()`. See documentation
* on `GroupState` for more details.
*/
public static GroupStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
public static GroupStateTimeout processingTimeTimeout() {
return ProcessingTimeTimeout$.MODULE$;
}

/**
* Timeout based on event-time. The event-time timestamp for timeout can be set for each
Expand All @@ -47,8 +49,13 @@ public class GroupStateTimeout {
* received any data, then the group times out. See documentation on
* `GroupState` for more details.
*/
public static GroupStateTimeout EventTimeTimeout() { return EventTimeTimeout$.MODULE$; }
public static GroupStateTimeout eventTimeTimeout() {
return EventTimeTimeout$.MODULE$;
}

/** No timeout. */
public static GroupStateTimeout NoTimeout() { return NoTimeout$.MODULE$; }
public static GroupStateTimeout noTimeout() {
return NoTimeout$.MODULE$;
}

}
Expand Up @@ -26,8 +26,8 @@ public class JavaGroupStateTimeoutSuite {

@Test
public void testTimeouts() {
assert (GroupStateTimeout.ProcessingTimeTimeout() == ProcessingTimeTimeout$.MODULE$);
assert (GroupStateTimeout.EventTimeTimeout() == EventTimeTimeout$.MODULE$);
assert (GroupStateTimeout.NoTimeout() == NoTimeout$.MODULE$);
assert (GroupStateTimeout.processingTimeTimeout() == ProcessingTimeTimeout$.MODULE$);
assert (GroupStateTimeout.eventTimeTimeout() == EventTimeTimeout$.MODULE$);
assert (GroupStateTimeout.noTimeout() == NoTimeout$.MODULE$);
}
}
Expand Up @@ -41,7 +41,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {

/* invalid json with leading nulls would trigger java.io.CharConversionException
in Jackson's JsonFactory.createParser(byte[]) due to RFC-4627 encoding detection */
val badJson = "\0\0\0A\1AAA"
val badJson = "\u0000\u0000\u0000A\u0001AAA"

test("$.store.bicycle") {
checkEvaluation(
Expand Down
Expand Up @@ -66,7 +66,6 @@
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.LongAccumulator;

/**
* Base class for custom RecordReaders for Parquet that directly materialize to `T`.
Expand Down Expand Up @@ -160,7 +159,9 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
if (taskContext != null) {
Option<AccumulatorV2<?, ?>> accu = taskContext.taskMetrics().externalAccums().lastOption();
if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
((AccumulatorV2<Integer, Integer>)accu.get()).add(blocks.size());
@SuppressWarnings("unchecked")
AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get();
intAccum.add(blocks.size());
}
}
}
Expand Down
Expand Up @@ -250,7 +250,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
dataAttributes,
OutputMode.Update,
isMapGroupsWithState = true,
GroupStateTimeout.NoTimeout,
GroupStateTimeout.noTimeout,
child = logicalPlan))
}

Expand Down
Expand Up @@ -50,7 +50,7 @@ private[sql] class GroupStateImpl[S](
optionalValue,
batchProcessingTimeMs = NO_TIMESTAMP,
eventTimeWatermarkMs = NO_TIMESTAMP,
timeoutConf = GroupStateTimeout.NoTimeout,
timeoutConf = GroupStateTimeout.noTimeout,
hasTimedOut = false)
private var value: S = optionalValue.getOrElse(null.asInstanceOf[S])
private var defined: Boolean = optionalValue.isDefined
Expand Down
Expand Up @@ -379,7 +379,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {

private var outputMode: OutputMode = OutputMode.Append

private var trigger: Trigger = Trigger.ProcessingTime(0L)
private var trigger: Trigger = Trigger.processingTime(0L)

private var extraOptions = new scala.collection.mutable.HashMap[String, String]

Expand Down

0 comments on commit bd3de24

Please sign in to comment.