Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
4 contributors

Users who have contributed to this file

@liwensun @tdas @mukulmurthy @marmbrus
127 lines (107 sloc) 4.95 KB
/*
* Copyright 2019 Databricks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta.storage
import org.apache.spark.sql.delta.DeltaLog
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.Utils
/**
* General interface for all critical file system operations required to read and write the
* [[DeltaLog]]. The correctness of the [[DeltaLog]] is predicated on the atomicity and
* durability guarantees of the implementation of this interface. Specifically,
*
* 1. Atomic visibility of files: Any file written through this store must
* be made visible atomically. In other words, this should not generate partial files.
*
* 2. Mutual exclusion: Only one writer must be able to create (or rename) a file at the final
* destination.
*
* 3. Consistent listing: Once a file has been written in a directory, all future listings for
* that directory must return that file.
*/
trait LogStore {
/** Read the given `path` */
final def read(path: String): Seq[String] = read(new Path(path))
/** Read the given `path` */
def read(path: Path): Seq[String]
/**
* Write the given `actions` to the given `path` without overwriting any existing file.
* Implementation must throw [[java.nio.file.FileAlreadyExistsException]] exception if the file
* already exists. Furthermore, implementation must ensure that the entire file is made
* visible atomically, that is, it should not generate partial files.
*/
final def write(path: String, actions: Iterator[String]): Unit = write(new Path(path), actions)
/**
* Write the given `actions` to the given `path` with or without overwrite as indicated.
* Implementation must throw [[java.nio.file.FileAlreadyExistsException]] exception if the file
* already exists and overwrite = false. Furthermore, implementation must ensure that the
* entire file is made visible atomically, that is, it should not generate partial files.
*/
def write(path: Path, actions: Iterator[String], overwrite: Boolean = false): Unit
/**
* List the paths in the same directory that are lexicographically greater or equal to
* (UTF-8 sorting) the given `path`. The result should also be sorted by the file name.
*/
final def listFrom(path: String): Iterator[FileStatus] = listFrom(new Path(path))
/**
* List the paths in the same directory that are lexicographically greater or equal to
* (UTF-8 sorting) the given `path`. The result should also be sorted by the file name.
*/
def listFrom(path: Path): Iterator[FileStatus]
/** Invalidate any caching that the implementation may be using */
def invalidateCache(): Unit
/** Resolve the fully qualified path for the given `path`. */
def resolvePathOnPhysicalStorage(path: Path): Path = {
throw new UnsupportedOperationException()
}
/**
* Whether a partial write is visible when writing to `path`.
*
* As this depends on the underlying file system implementations, we require the input of `path`
* here in order to identify the underlying file system, even though in most cases a log store
* only deals with one file system.
*
* The default value is only provided here for legacy reasons, which will be removed.
* Any LogStore implementation should override this instead of relying on the default.
*/
def isPartialWriteVisible(path: Path): Boolean = true
}
object LogStore extends LogStoreProvider
with Logging {
def apply(sc: SparkContext): LogStore = {
apply(sc.getConf, sc.hadoopConfiguration)
}
def apply(sparkConf: SparkConf, hadoopConf: Configuration): LogStore = {
createLogStore(sparkConf, hadoopConf)
}
}
trait LogStoreProvider {
val logStoreClassConfKey: String = "spark.delta.logStore.class"
val defaultLogStoreClass: String = classOf[HDFSLogStore].getName
def createLogStore(spark: SparkSession): LogStore = {
val sc = spark.sparkContext
createLogStore(sc.getConf, sc.hadoopConfiguration)
}
def createLogStore(sparkConf: SparkConf, hadoopConf: Configuration): LogStore = {
val logStoreClassName = sparkConf.get(logStoreClassConfKey, defaultLogStoreClass)
val logStoreClass = Utils.classForName(logStoreClassName)
logStoreClass.getConstructor(classOf[SparkConf], classOf[Configuration])
.newInstance(sparkConf, hadoopConf).asInstanceOf[LogStore]
}
}
You can’t perform that action at this time.