Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Stream operations: file access, iteration, splitting.

Summary: Intended to complement and replace strings::byLine.

Test Plan: stream_test

Reviewed By: delong.j@fb.com

FB internal diff: D463341
  • Loading branch information...
commit eed46f426a62a15ffae8a2e73ca48617754f0a76 1 parent 39786ac
@tudor tudor authored jdelong committed
View
11 folly/Range.h
@@ -187,7 +187,8 @@ class Range : private boost::totally_ordered<Range<Iter> > {
// Allow implicit conversion from Range<const char*> (aka StringPiece) to
// Range<const unsigned char*> (aka ByteRange), as they're both frequently
- // used to represent ranges of bytes.
+ // used to represent ranges of bytes. Allow explicit conversion in the other
+ // direction.
template <class OtherIter, typename std::enable_if<
(std::is_same<Iter, const unsigned char*>::value &&
std::is_same<OtherIter, const char*>::value), int>::type = 0>
@@ -196,6 +197,14 @@ class Range : private boost::totally_ordered<Range<Iter> > {
e_(reinterpret_cast<const unsigned char*>(other.end())) {
}
+ template <class OtherIter, typename std::enable_if<
+ (std::is_same<Iter, const char*>::value &&
+ std::is_same<OtherIter, const unsigned char*>::value), int>::type = 0>
+ explicit Range(const Range<OtherIter>& other)
+ : b_(reinterpret_cast<const char*>(other.begin())),
+ e_(reinterpret_cast<const char*>(other.end())) {
+ }
+
void clear() {
b_ = Iter();
e_ = Iter();
View
98 folly/experimental/io/Stream-inl.h
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_IO_STREAM_H_
+#error This file may only be included from Stream.h
+#endif
+
+#include <string.h>
+
+#include <glog/logging.h>
+
+namespace folly {
+
+template <class Stream>
+InputByteStreamSplitter<Stream>::InputByteStreamSplitter(
+ char delimiter, Stream stream)
+ : done_(false),
+ delimiter_(delimiter),
+ stream_(std::move(stream)) {
+}
+
+template <class Stream>
+bool InputByteStreamSplitter<Stream>::operator()(ByteRange& chunk) {
+ DCHECK_EQ(buffer_->length(), 0);
+ chunk.clear();
+ if (rest_.empty()) {
+ if (done_) {
+ return false;
+ } else if (!stream_(rest_)) {
+ done_ = true;
+ return false;
+ }
+ }
+
+ auto p = static_cast<const unsigned char*>(memchr(rest_.data(), delimiter_,
+ rest_.size()));
+ if (p) {
+ chunk.assign(rest_.data(), p);
+ rest_.assign(p + 1, rest_.end());
+ return true;
+ }
+
+ // Incomplete line read, copy to buffer
+ if (!buffer_) {
+ static const size_t kDefaultLineSize = 256;
+ // Arbitrarily assume that we have half of a line in rest_, and
+ // get enough room for twice that.
+ buffer_ = IOBuf::create(std::max(kDefaultLineSize, 2 * rest_.size()));
+ } else {
+ buffer_->reserve(0, rest_.size());
+ }
+ memcpy(buffer_->writableTail(), rest_.data(), rest_.size());
+ buffer_->append(rest_.size());
+
+ while (stream_(rest_)) {
+ auto p = static_cast<const unsigned char*>(
+ memchr(rest_.data(), delimiter_, rest_.size()));
+ if (p) {
+ // Copy everything up to the delimiter and return it
+ size_t n = p - rest_.data();
+ buffer_->reserve(0, n);
+ memcpy(buffer_->writableTail(), rest_.data(), n);
+ buffer_->append(n);
+ chunk.reset(buffer_->data(), buffer_->length());
+ buffer_->trimStart(buffer_->length());
+ rest_.assign(p + 1, rest_.end());
+ return true;
+ }
+
+ // Nope, copy the entire chunk that we read
+ buffer_->reserve(0, rest_.size());
+ memcpy(buffer_->writableTail(), rest_.data(), rest_.size());
+ buffer_->append(rest_.size());
+ }
+
+ // Incomplete last line
+ done_ = true;
+ rest_.clear();
+ chunk.reset(buffer_->data(), buffer_->length());
+ buffer_->trimStart(buffer_->length());
+ return true;
+}
+
+} // namespace folly
+
View
98 folly/experimental/io/Stream.cpp
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/experimental/io/Stream.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <stdexcept>
+#include <system_error>
+
+#include "folly/String.h"
+
+namespace folly {
+
+FileInputByteStream::FileInputByteStream(int fd, bool ownsFd, size_t bufferSize)
+ : fd_(fd),
+ ownsFd_(ownsFd),
+ buffer_(IOBuf::create(bufferSize)) {
+}
+
+FileInputByteStream::FileInputByteStream(int fd, bool ownsFd,
+ std::unique_ptr<IOBuf>&& buffer)
+ : fd_(fd),
+ ownsFd_(ownsFd),
+ buffer_(std::move(buffer)) {
+ buffer_->clear();
+}
+
+bool FileInputByteStream::operator()(ByteRange& chunk) {
+ ssize_t n = ::read(fd_, buffer_->writableTail(), buffer_->capacity());
+ if (n == -1) {
+ throw std::system_error(errno, std::system_category(), "read failed");
+ }
+ chunk.reset(buffer_->tail(), n);
+ return (n != 0);
+}
+
+FileInputByteStream::FileInputByteStream(FileInputByteStream&& other)
+ : fd_(other.fd_),
+ ownsFd_(other.ownsFd_),
+ buffer_(std::move(other.buffer_)) {
+ other.fd_ = -1;
+ other.ownsFd_ = false;
+}
+
+FileInputByteStream& FileInputByteStream::operator=(
+ FileInputByteStream&& other) {
+ if (&other != this) {
+ closeNoThrow();
+ fd_ = other.fd_;
+ ownsFd_ = other.ownsFd_;
+ buffer_ = std::move(other.buffer_);
+ other.fd_ = -1;
+ other.ownsFd_ = false;
+ }
+ return *this;
+}
+
+FileInputByteStream::~FileInputByteStream() {
+ closeNoThrow();
+}
+
+void FileInputByteStream::closeNoThrow() {
+ if (!ownsFd_) {
+ return;
+ }
+ ownsFd_ = false;
+ if (::close(fd_) == -1) {
+ PLOG(ERROR) << "close failed";
+ }
+}
+
+InputByteStreamSplitter<FileInputByteStream> byLine(
+ const char* fileName, char delim) {
+ int fd = ::open(fileName, O_RDONLY);
+ if (fd == -1) {
+ throw std::system_error(errno, std::system_category(), "open failed");
+ }
+ return makeInputByteStreamSplitter(delim, FileInputByteStream(fd, true));
+}
+
+} // namespace folly
+
View
226 folly/experimental/io/Stream.h
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_IO_STREAM_H_
+#define FOLLY_IO_STREAM_H_
+
+#include <boost/iterator/iterator_facade.hpp>
+#include <glog/logging.h>
+
+#include "folly/Range.h"
+#include "folly/FBString.h"
+#include "folly/experimental/io/IOBuf.h"
+
+namespace folly {
+
+/**
+ * An InputByteStream is a functional object with the following signature:
+ *
+ * bool operator()(ByteRange& data);
+ *
+ * Input byte streams must be movable.
+ *
+ * The stream returns false at EOF; otherwise, it returns true and sets data to
+ * the next chunk of data from the stream. The memory that data points to must
+ * remain valid until the next call to the stream. In case of error, the
+ * stream throws an exception.
+ *
+ * The meaning of a "chunk" is left up to the stream implementation. Some
+ * streams return chunks limited to the size of an internal buffer. Other
+ * streams return the entire input as one (potentially huge) ByteRange.
+ * Others assign meaning to chunks: StreamSplitter returns "lines" -- sequences
+ * of bytes between delimiters. This ambiguity is intentional; resolving it
+ * would significantly increase the complexity of the code.
+ *
+ * An OutputByteStream is an object with the following signature:
+ *
+ * void operator()(ByteRange data);
+ * void close();
+ *
+ * Output byte streams must be movable.
+ *
+ * The stream appends a chunk of data to the stream when calling operator().
+ * close() closes the stream, allowing us to detect any errors before
+ * destroying the stream object (to avoid throwing exceptions from the
+ * destructor). The destructor must close the stream if close() was not
+ * explicitly called, and abort the program if closing the stream caused
+ * an error.
+ *
+ * Just like with input byte streams, the meaning of a "chunk" is left up
+ * to the stream implementation. Some streams will just append all chunks
+ * as given; others might assign meaning to chunks and (for example) append
+ * delimiters between chunks.
+ */
+
+template <class Stream> class InputByteStreamIterator;
+
+/**
+ * Convenient base class template to derive all streams from; provides begin()
+ * and end() for iterator access. This class makes use of the curriously
+ * recurring template pattern; your stream class S may derive from
+ * InputByteStreamBase<S>.
+ *
+ * Deriving from InputByteStreamBase<S> is not required, but is convenient.
+ */
+template <class Derived>
+class InputByteStreamBase {
+ public:
+ InputByteStreamIterator<Derived> begin() {
+ return InputByteStreamIterator<Derived>(static_cast<Derived&>(*this));
+ }
+
+ InputByteStreamIterator<Derived> end() {
+ return InputByteStreamIterator<Derived>();
+ }
+
+ InputByteStreamBase() { }
+ InputByteStreamBase(InputByteStreamBase&&) = default;
+ InputByteStreamBase& operator=(InputByteStreamBase&&) = default;
+
+ private:
+ InputByteStreamBase(const InputByteStreamBase&) = delete;
+ InputByteStreamBase& operator=(const InputByteStreamBase&) = delete;
+};
+
+/**
+ * Stream iterator
+ */
+template <class Stream>
+class InputByteStreamIterator
+ : public boost::iterator_facade<
+ InputByteStreamIterator<Stream>,
+ const ByteRange,
+ boost::single_pass_traversal_tag> {
+ public:
+ InputByteStreamIterator() : stream_(nullptr) { }
+
+ explicit InputByteStreamIterator(Stream& stream) : stream_(&stream) {
+ increment();
+ }
+
+ private:
+ friend class boost::iterator_core_access;
+
+ void increment() {
+ DCHECK(stream_);
+ if (stream_ && !(*stream_)(chunk_)) {
+ stream_ = nullptr;
+ }
+ }
+
+ // This is a single pass iterator, so all we care about is that
+ // equal forms an equivalence class on the subset of iterators that it's
+ // defined on. In our case, only identical (same object) iterators and
+ // past-the-end iterators compare equal. (so that it != end() works)
+ bool equal(const InputByteStreamIterator& other) const {
+ return (this == &other) || (!stream_ && !other.stream_);
+ }
+
+ const ByteRange& dereference() const {
+ DCHECK(stream_);
+ return chunk_;
+ }
+
+ Stream* stream_;
+ ByteRange chunk_;
+};
+
+/**
+ * Stream that read()s from a file.
+ */
+class FileInputByteStream : public InputByteStreamBase<FileInputByteStream> {
+ public:
+ static const size_t kDefaultBufferSize = 4096;
+ explicit FileInputByteStream(int fd,
+ bool ownsFd = false,
+ size_t bufferSize = kDefaultBufferSize);
+ FileInputByteStream(int fd, bool ownsFd, std::unique_ptr<IOBuf>&& buffer);
+ FileInputByteStream(FileInputByteStream&& other);
+ FileInputByteStream& operator=(FileInputByteStream&& other);
+ ~FileInputByteStream();
+ bool operator()(ByteRange& chunk);
+
+ private:
+ void closeNoThrow();
+
+ int fd_;
+ bool ownsFd_;
+ std::unique_ptr<IOBuf> buffer_;
+};
+
+/**
+ * Split a stream on a delimiter. Returns "lines" between delimiters;
+ * the delimiters are not included in the returned string.
+ *
+ * Note that the InputByteStreamSplitter acts as a stream itself, and you can
+ * iterate over it.
+ */
+template <class Stream>
+class InputByteStreamSplitter
+ : public InputByteStreamBase<InputByteStreamSplitter<Stream>> {
+ public:
+ InputByteStreamSplitter(char delimiter, Stream stream);
+ bool operator()(ByteRange& chunk);
+
+ InputByteStreamSplitter(InputByteStreamSplitter&&) = default;
+ InputByteStreamSplitter& operator=(InputByteStreamSplitter&&) = default;
+
+ private:
+ InputByteStreamSplitter(const InputByteStreamSplitter&) = delete;
+ InputByteStreamSplitter& operator=(const InputByteStreamSplitter&) = delete;
+
+ bool done_;
+ char delimiter_;
+ Stream stream_;
+ std::unique_ptr<IOBuf> buffer_;
+ ByteRange rest_;
+};
+
+/**
+ * Shortcut to create a stream splitter around a stream and deduce
+ * the type of the template argument.
+ */
+template <class Stream>
+InputByteStreamSplitter<Stream> makeInputByteStreamSplitter(
+ char delimiter, Stream stream) {
+ return InputByteStreamSplitter<Stream>(delimiter, std::move(stream));
+}
+
+/**
+ * Create a stream that splits a file into chunks (default: lines, with
+ * '\n' as the delimiter)
+ */
+InputByteStreamSplitter<FileInputByteStream> byLine(
+ const char* fileName, char delim='\n');
+
+// overload for std::string
+inline InputByteStreamSplitter<FileInputByteStream> byLine(
+ const std::string& fileName, char delim='\n') {
+ return byLine(fileName.c_str(), delim);
+}
+
+// overload for fbstring
+inline InputByteStreamSplitter<FileInputByteStream> byLine(
+ const fbstring& fileName, char delim='\n') {
+ return byLine(fileName.c_str(), delim);
+}
+
+} // namespace folly
+
+#include "folly/experimental/io/Stream-inl.h"
+
+#endif /* FOLLY_IO_STREAM_H_ */
+
View
117 folly/experimental/io/test/StreamTest.cpp
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/experimental/io/Stream.h"
+
+#include <vector>
+#include <string>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/Benchmark.h"
+#include "folly/experimental/TestUtil.h"
+
+using namespace folly;
+
+namespace {
+
+std::vector<std::string> streamSplit(const std::string& str, char delimiter,
+ size_t maxChunkSize = (size_t)-1) {
+ size_t pos = 0;
+ auto cb = [&] (ByteRange& sp) mutable -> bool {
+ if (pos == str.size()) return false;
+ size_t n = std::min(str.size() - pos, maxChunkSize);
+ sp.reset(reinterpret_cast<const unsigned char*>(&(str[pos])), n);
+ pos += n;
+ return true;
+ };
+
+ std::vector<std::string> result;
+ for (auto line : makeInputByteStreamSplitter(delimiter, cb)) {
+ result.push_back(StringPiece(line).str());
+ }
+
+ return result;
+}
+
+} // namespace
+
+TEST(InputByteStreamSplitter, Empty) {
+ {
+ auto pieces = streamSplit("", ',');
+ EXPECT_EQ(0, pieces.size());
+ }
+
+ // The last delimiter is eaten, just like std::getline
+ {
+ auto pieces = streamSplit(",", ',');
+ EXPECT_EQ(1, pieces.size());
+ EXPECT_EQ("", pieces[0]);
+ }
+
+ {
+ auto pieces = streamSplit(",,", ',');
+ EXPECT_EQ(2, pieces.size());
+ EXPECT_EQ("", pieces[0]);
+ EXPECT_EQ("", pieces[1]);
+ }
+}
+
+TEST(InputByteStreamSplitter, Simple) {
+ std::string str = "hello,, world, goodbye, meow";
+
+ for (size_t chunkSize = 1; chunkSize <= str.size(); ++chunkSize) {
+ auto pieces = streamSplit(str, ',', chunkSize);
+ EXPECT_EQ(5, pieces.size());
+ EXPECT_EQ("hello", pieces[0]);
+ EXPECT_EQ("", pieces[1]);
+ EXPECT_EQ(" world", pieces[2]);
+ EXPECT_EQ(" goodbye", pieces[3]);
+ EXPECT_EQ(" meow", pieces[4]);
+ }
+}
+
+TEST(ByLine, Simple) {
+ test::TemporaryFile file("ByLine");
+ static const std::string lines(
+ "Hello world\n"
+ "This is the second line\n"
+ "\n"
+ "\n"
+ "a few empty lines above\n"
+ "incomplete last line");
+ EXPECT_EQ(lines.size(), write(file.fd(), lines.data(), lines.size()));
+
+ auto expected = streamSplit(lines, '\n');
+ std::vector<std::string> found;
+ for (auto& line : byLine(file.path())) {
+ found.push_back(StringPiece(line).str());
+ }
+
+ EXPECT_TRUE(expected == found);
+}
+
+int main(int argc, char *argv[]) {
+ testing::InitGoogleTest(&argc, argv);
+ google::ParseCommandLineFlags(&argc, &argv, true);
+ auto ret = RUN_ALL_TESTS();
+ if (!ret) {
+ folly::runBenchmarksOnFlag();
+ }
+ return ret;
+}
+
Please sign in to comment.
Something went wrong with that request. Please try again.