Skip to content

Commit

Permalink
Refactored write ahead stuff from streaming.storage to streaming.util
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Oct 21, 2014
1 parent b06be2b commit 4ab602a
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 22 deletions.
Expand Up @@ -14,6 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage
package org.apache.spark.streaming.util

private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage
package org.apache.spark.streaming.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
Expand Down
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage
package org.apache.spark.streaming.util

import java.nio.ByteBuffer

Expand All @@ -25,18 +25,17 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.Logging
import org.apache.spark.streaming.storage.WriteAheadLogManager._
import org.apache.spark.streaming.util.{Clock, SystemClock}
import org.apache.spark.util.Utils
import WriteAheadLogManager._

/**
* This class manages write ahead log files.
* - Writes records (bytebuffers) to periodically rotating log files.
* - Recovers the log files and the reads the recovered records upon failures.
* - Cleans up old log files.
*
* Uses [[org.apache.spark.streaming.storage.WriteAheadLogWriter]] to write
* and [[org.apache.spark.streaming.storage.WriteAheadLogReader]] to read.
* Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
* and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
*
*@param logDirectory Directory when rotating log files will be created.
* @param hadoopConf Hadoop configuration for reading/writing log files.
Expand Down Expand Up @@ -199,7 +198,7 @@ private[streaming] class WriteAheadLogManager(
}
}

private[storage] object WriteAheadLogManager {
private[util] object WriteAheadLogManager {

case class LogInfo(startTime: Long, endTime: Long, path: String)

Expand Down
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage
package org.apache.spark.streaming.util

import java.io.Closeable
import java.nio.ByteBuffer
Expand All @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration

/**
* A random access reader for reading write ahead log files written using
* [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. Given the file segment info,
* [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. Given the file segment info,
* this reads the record (bytebuffer) from the log file.
*/
private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
Expand Down
Expand Up @@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage
package org.apache.spark.streaming.util

import java.io.{EOFException, Closeable}
import java.io.{Closeable, EOFException}
import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.spark.Logging

/**
* A reader for reading write ahead log files written using
* [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. This reads
* [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads
* the records (bytebuffers) in the log file sequentially and return them as an
* iterator of bytebuffers.
*/
Expand Down
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage
package org.apache.spark.streaming.util

import java.io._
import java.net.URI
Expand All @@ -24,7 +24,6 @@ import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
import org.apache.spark.streaming.storage.FileSegment

/**
* A writer for writing byte-buffers to a write ahead log file.
Expand Down
Expand Up @@ -14,26 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage
package org.apache.spark.streaming.util

import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.language.postfixOps
import scala.util.Random

import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually._

import WriteAheadLogSuite._
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration

import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
import WriteAheadLogSuite._
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually._

/**
* This testsuite tests all classes related to write ahead logs.
Expand Down

0 comments on commit 4ab602a

Please sign in to comment.