Skip to content

Commit

Permalink
Additional cleanup from review
Browse files Browse the repository at this point in the history
  • Loading branch information
srowen committed Dec 30, 2015
1 parent 1fffa8e commit 5e3318a
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
Expand Down Expand Up @@ -648,8 +649,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

/**
* Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2,
* so we have to resort to ugly reflection (as usual...).
* Checks whether HDFS is in safe mode.
*
* Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
* makes it more public than not.
Expand All @@ -663,11 +663,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

// For testing.
private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"
val actionClass: Class[_] = getClass().getClassLoader().loadClass(hadoop2Class)
val action = actionClass.getField("SAFEMODE_GET").get(null)
val method = dfs.getClass().getMethod("setSafeMode", action.getClass())
method.invoke(dfs, action).asInstanceOf[Boolean]
dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
import org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl, JobContextImpl}
import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,6 @@ private[spark] class EventLoggingListener(
// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None

// The Hadoop APIs have changed over time, so we use reflection to figure out
// the correct method to use to flush a hadoop data stream. See SPARK-1518
// for details.
private val hadoopFlushMethod = {
val cls = classOf[FSDataOutputStream]
scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
}

private var writer: Option[PrintWriter] = None

// For testing. Keep track of all JSON serialized events that have been logged.
Expand Down Expand Up @@ -147,7 +139,7 @@ private[spark] class EventLoggingListener(
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
hadoopDataStream.foreach(_.hflush())
}
if (testing) {
loggedEvents += eventJson
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
import org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl, JobContextImpl}
import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.DataReadMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer

import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FSDataOutputStream

import org.apache.spark.util.Utils

Expand All @@ -34,11 +31,6 @@ private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf:

private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)

private lazy val hadoopFlushMethod = {
// Use reflection to get the right flush operation
val cls = classOf[FSDataOutputStream]
Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
}

private var nextOffset = stream.getPos()
private var closed = false
Expand All @@ -62,7 +54,7 @@ private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf:
}

private def flush() {
hadoopFlushMethod.foreach { _.invoke(stream) }
stream.hflush()
// Useful for local file system where hflush/sync does not work (HADOOP-7844)
stream.getWrappedStream.flush()
}
Expand Down

0 comments on commit 5e3318a

Please sign in to comment.