Skip to content

Commit

Permalink
fix conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
scwf committed Feb 4, 2015
2 parents 015fe2f + e380d2d commit dd34ebf
Show file tree
Hide file tree
Showing 139 changed files with 5,845 additions and 2,138 deletions.
115 changes: 115 additions & 0 deletions core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.scheduler.*;

/**
* Class that allows users to receive all SparkListener events.
* Users should override the onEvent method.
*
* This is a concrete Java class in order to ensure that we don't forget to update it when adding
* new methods to SparkListener: forgetting to add a method will result in a compilation error (if
* this was a concrete Scala class, default implementations of new event handlers would be inherited
* from the SparkListener trait).
*/
public class SparkFirehoseListener implements SparkListener {

public void onEvent(SparkListenerEvent event) { }

@Override
public final void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
onEvent(stageCompleted);
}

@Override
public final void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
onEvent(stageSubmitted);
}

@Override
public final void onTaskStart(SparkListenerTaskStart taskStart) {
onEvent(taskStart);
}

@Override
public final void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
onEvent(taskGettingResult);
}

@Override
public final void onTaskEnd(SparkListenerTaskEnd taskEnd) {
onEvent(taskEnd);
}

@Override
public final void onJobStart(SparkListenerJobStart jobStart) {
onEvent(jobStart);
}

@Override
public final void onJobEnd(SparkListenerJobEnd jobEnd) {
onEvent(jobEnd);
}

@Override
public final void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
onEvent(environmentUpdate);
}

@Override
public final void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
onEvent(blockManagerAdded);
}

@Override
public final void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
onEvent(blockManagerRemoved);
}

@Override
public final void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
onEvent(unpersistRDD);
}

@Override
public final void onApplicationStart(SparkListenerApplicationStart applicationStart) {
onEvent(applicationStart);
}

@Override
public final void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
onEvent(applicationEnd);
}

@Override
public final void onExecutorMetricsUpdate(
SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
onEvent(executorMetricsUpdate);
}

@Override
public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
onEvent(executorAdded);
}

@Override
public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
onEvent(executorRemoved);
}
}
126 changes: 0 additions & 126 deletions core/src/main/java/org/apache/spark/TaskContext.java

This file was deleted.

7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -804,6 +804,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
assertNotStopped()
// The call to new NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
Expand All @@ -826,7 +828,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
kClass: Class[K],
vClass: Class[V]): RDD[(K, V)] = {
assertNotStopped()
new NewHadoopRDD(this, fClass, kClass, vClass, conf)
// Add necessary security credentials to the JobConf. Required to access secure HDFS.
val jconf = new JobConf(conf)
SparkHadoopUtil.get.addCredentials(jconf)
new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
}

/** Get an RDD for a Hadoop SequenceFile with given key and value types.
Expand Down

0 comments on commit dd34ebf

Please sign in to comment.