Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark.git
Browse files Browse the repository at this point in the history
  • Loading branch information
robbinspg committed Jun 1, 2016
2 parents 68be9d8 + 6563d72 commit 180a361
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 25 deletions.
6 changes: 3 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ml.clustering

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -696,8 +696,8 @@ class DistributedLDAModel private[ml] (
@DeveloperApi
@Since("2.0.0")
def deleteCheckpointFiles(): Unit = {
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
_checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs))
val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
_checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, hadoopConf))
_checkpointFiles = Array.empty[String]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.IOException

import scala.collection.mutable

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.ml.tree.{LearningNode, Split}
Expand Down Expand Up @@ -77,8 +77,8 @@ private[spark] class NodeIdCache(
// Indicates whether we can checkpoint
private val canCheckpoint = nodeIdsForInstances.sparkContext.getCheckpointDir.nonEmpty

// FileSystem instance for deleting checkpoints as needed
private val fs = FileSystem.get(nodeIdsForInstances.sparkContext.hadoopConfiguration)
// Hadoop Configuration for deleting checkpoints as needed
private val hadoopConf = nodeIdsForInstances.sparkContext.hadoopConfiguration

/**
* Update the node index values in the cache.
Expand Down Expand Up @@ -130,7 +130,9 @@ private[spark] class NodeIdCache(
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we'll manually delete it here.
try {
fs.delete(new Path(old.getCheckpointFile.get), true)
val path = new Path(old.getCheckpointFile.get)
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
} catch {
case e: IOException =>
logError("Decision Tree learning using cacheNodeIds failed to remove checkpoint" +
Expand All @@ -154,7 +156,9 @@ private[spark] class NodeIdCache(
val old = checkpointQueue.dequeue()
if (old.getCheckpointFile.isDefined) {
try {
fs.delete(new Path(old.getCheckpointFile.get), true)
val path = new Path(old.getCheckpointFile.get)
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
} catch {
case e: IOException =>
logError("Decision Tree learning using cacheNodeIds failed to remove checkpoint" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.mllib.impl

import scala.collection.mutable

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -160,21 +161,23 @@ private[mllib] abstract class PeriodicCheckpointer[T](
private def removeCheckpointFile(): Unit = {
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we manually delete it.
val fs = FileSystem.get(sc.hadoopConfiguration)
getCheckpointFiles(old).foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs))
getCheckpointFiles(old).foreach(
PeriodicCheckpointer.removeCheckpointFile(_, sc.hadoopConfiguration))
}
}

private[spark] object PeriodicCheckpointer extends Logging {

/** Delete a checkpoint file, and log a warning if deletion fails. */
def removeCheckpointFile(path: String, fs: FileSystem): Unit = {
def removeCheckpointFile(checkpointFile: String, conf: Configuration): Unit = {
try {
fs.delete(new Path(path), true)
val path = new Path(checkpointFile)
val fs = path.getFileSystem(conf)
fs.delete(path, true)
} catch {
case e: Exception =>
logWarning("PeriodicCheckpointer could not remove old checkpoint file: " +
path)
checkpointFile)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.impl

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.graphx.{Edge, Graph}
Expand Down Expand Up @@ -140,9 +140,11 @@ private object PeriodicGraphCheckpointerSuite {
// Instead, we check for the presence of the checkpoint files.
// This test should continue to work even after this graph.isCheckpointed issue
// is fixed (though it can then be simplified and not look for the files).
val fs = FileSystem.get(graph.vertices.sparkContext.hadoopConfiguration)
val hadoopConf = graph.vertices.sparkContext.hadoopConfiguration
graph.getCheckpointFiles.foreach { checkpointFile =>
assert(!fs.exists(new Path(checkpointFile)),
val path = new Path(checkpointFile)
val fs = path.getFileSystem(hadoopConf)
assert(!fs.exists(path),
"Graph checkpoint file should have been removed")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.impl

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.mllib.util.MLlibTestSparkContext
Expand Down Expand Up @@ -127,9 +127,11 @@ private object PeriodicRDDCheckpointerSuite {
// Instead, we check for the presence of the checkpoint files.
// This test should continue to work even after this rdd.isCheckpointed issue
// is fixed (though it can then be simplified and not look for the files).
val fs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
val hadoopConf = rdd.sparkContext.hadoopConfiguration
rdd.getCheckpointFile.foreach { checkpointFile =>
assert(!fs.exists(new Path(checkpointFile)), "RDD checkpoint file should have been removed")
val path = new Path(checkpointFile)
val fs = path.getFileSystem(hadoopConf)
assert(!fs.exists(path), "RDD checkpoint file should have been removed")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.IOException
import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -105,8 +105,6 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
}
}

private val fs = FileSystem.get(hadoopConfig)

// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
Expand All @@ -120,7 +118,9 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
}
} else {
try {
fs.mkdirs(new Path(dbDefinition.locationUri))
val location = new Path(dbDefinition.locationUri)
val fs = location.getFileSystem(hadoopConfig)
fs.mkdirs(location)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to create database ${dbDefinition.name} as failed " +
Expand All @@ -147,7 +147,9 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
// Remove the database.
val dbDefinition = catalog(db).db
try {
fs.delete(new Path(dbDefinition.locationUri), true)
val location = new Path(dbDefinition.locationUri)
val fs = location.getFileSystem(hadoopConfig)
fs.delete(location, true)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to drop database ${dbDefinition.name} as failed " +
Expand Down Expand Up @@ -203,6 +205,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
if (tableDefinition.tableType == CatalogTableType.MANAGED) {
val dir = new Path(catalog(db).db.locationUri, table)
try {
val fs = dir.getFileSystem(hadoopConfig)
fs.mkdirs(dir)
} catch {
case e: IOException =>
Expand All @@ -223,6 +226,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
if (getTable(db, table).tableType == CatalogTableType.MANAGED) {
val dir = new Path(catalog(db).db.locationUri, table)
try {
val fs = dir.getFileSystem(hadoopConfig)
fs.delete(dir, true)
} catch {
case e: IOException =>
Expand All @@ -248,6 +252,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
val oldDir = new Path(catalog(db).db.locationUri, oldName)
val newDir = new Path(catalog(db).db.locationUri, newName)
try {
val fs = oldDir.getFileSystem(hadoopConfig)
fs.rename(oldDir, newDir)
} catch {
case e: IOException =>
Expand Down Expand Up @@ -338,6 +343,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
p.spec.get(col).map(col + "=" + _)
}.mkString("/")
try {
val fs = tableDir.getFileSystem(hadoopConfig)
fs.mkdirs(new Path(tableDir, partitionPath))
} catch {
case e: IOException =>
Expand Down Expand Up @@ -373,6 +379,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
p.get(col).map(col + "=" + _)
}.mkString("/")
try {
val fs = tableDir.getFileSystem(hadoopConfig)
fs.delete(new Path(tableDir, partitionPath), true)
} catch {
case e: IOException =>
Expand Down Expand Up @@ -409,6 +416,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
newSpec.get(col).map(col + "=" + _)
}.mkString("/")
try {
val fs = tableDir.getFileSystem(hadoopConfig)
fs.rename(new Path(tableDir, oldPath), new Path(tableDir, newPath))
} catch {
case e: IOException =>
Expand Down

0 comments on commit 180a361

Please sign in to comment.