Skip to content

Commit

Permalink
Merge pull request apache#12 from harishreedharan/hdfs-wal
Browse files Browse the repository at this point in the history
Initial HDFS Readers and Writers implementation.
  • Loading branch information
tdas committed Oct 2, 2014
2 parents 2fedb5d + d86a518 commit 01d2bf7
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 0 deletions.
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}

private[streaming] object HdfsUtils {

def getOutputStream(path: String): FSDataOutputStream = {
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that

val dfsPath = new Path(path)
val conf = new Configuration()
val dfs =
this.synchronized {
dfsPath.getFileSystem(conf)
}
// If the file exists and we have append support, append instead of creating a new file
val stream: FSDataOutputStream = {
if (dfs.isFile(dfsPath)) {
if (conf.getBoolean("hdfs.append.support", false)) {
dfs.append(dfsPath)
} else {
throw new IllegalStateException("File exists and there is no append support!")
}
} else {
dfs.create(dfsPath)
}
}
stream
}

def getInputStream(path: String): FSDataInputStream = {
val dfsPath = new Path(path)
val dfs = this.synchronized {
dfsPath.getFileSystem(new Configuration())
}
val instream = dfs.open(dfsPath)
instream
}

def checkState(state: Boolean, errorMsg: => String) {
if(!state) {
throw new IllegalStateException(errorMsg)
}
}

}
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import java.io.Closeable

private[streaming] class WriteAheadLogRandomReader(path: String) extends Closeable {

private val instream = HdfsUtils.getInputStream(path)
private var closed = false

def read(segment: FileSegment): Array[Byte] = synchronized {
assertOpen()
instream.seek(segment.offset)
val nextLength = instream.readInt()
HdfsUtils.checkState(nextLength == segment.length,
"Expected message length to be " + segment.length + ", " + "but was " + nextLength)
val buffer = new Array[Byte](nextLength)
instream.readFully(buffer)
buffer
}

override def close(): Unit = synchronized {
closed = true
instream.close()
}

private def assertOpen() {
HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
}
}

@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import java.io.Closeable

private[streaming] class WriteAheadLogReader(path: String)
extends Iterator[Array[Byte]] with Closeable {

private val instream = HdfsUtils.getInputStream(path)
private var closed = false
private var nextItem: Option[Array[Byte]] = None

override def hasNext: Boolean = synchronized {
assertOpen()
if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
true
} else {
val available = instream.available()
if (available < 4) { // Length of next block (which is an Int = 4 bytes) of data is unavailable!
false
}
val length = instream.readInt()
if (instream.available() < length) {
false
}
val buffer = new Array[Byte](length)
instream.readFully(buffer)
nextItem = Some(buffer)
true
}
}

override def next(): Array[Byte] = synchronized {
// TODO: Possible error case where there are not enough bytes in the stream
// TODO: How to handle that?
val data = nextItem.getOrElse {
throw new IllegalStateException("next called without calling hasNext or after hasNext " +
"returned false")
}
nextItem = None // Ensure the next hasNext call loads new data.
data
}

override def close(): Unit = synchronized {
closed = true
instream.close()
}

private def assertOpen() {
HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the " +
"file.")
}

}
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import java.io.Closeable

private[streaming] class WriteAheadLogWriter(path: String) extends Closeable {
private val stream = HdfsUtils.getOutputStream(path)
private var nextOffset = stream.getPos
private var closed = false

// Data is always written as:
// - Length - Long
// - Data - of length = Length
def write(data: Array[Byte]): FileSegment = synchronized {
assertOpen()
val segment = new FileSegment(path, nextOffset, data.length)
stream.writeInt(data.length)
stream.write(data)
stream.hflush()
nextOffset = stream.getPos
segment
}

override private[streaming] def close(): Unit = synchronized {
closed = true
stream.close()
}

private def assertOpen() {
HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.")
}
}

0 comments on commit 01d2bf7

Please sign in to comment.