From 830268af8282dea845de50df837121d5310d010d Mon Sep 17 00:00:00 2001 From: Nolan Carroll Date: Fri, 29 Jan 2016 12:26:55 -0500 Subject: [PATCH] Move management of the file stream into DataFileWriter, to allow DataFileWriterBase to act on a stream which it does not own. --- lang/c++/api/DataFile.hh | 36 +++++++++++++++++++++++++++++------- lang/c++/api/Stream.hh | 6 +++--- lang/c++/impl/DataFile.cc | 18 ++++++------------ 3 files changed, 38 insertions(+), 22 deletions(-) diff --git a/lang/c++/api/DataFile.hh b/lang/c++/api/DataFile.hh index 98779b6b101..3fbbc7321bc 100644 --- a/lang/c++/api/DataFile.hh +++ b/lang/c++/api/DataFile.hh @@ -60,7 +60,7 @@ class AVRO_DECL DataFileWriterBase : boost::noncopyable { const size_t syncInterval_; Codec codec_; - std::auto_ptr stream_; + OutputStream* stream_; std::auto_ptr buffer_; const DataFileSync sync_; int64_t objectCount_; @@ -98,13 +98,14 @@ public: void incr() { ++objectCount_; } + /** - * Constructs a data file writer with the given sync interval and name. + * Constructs a data file writer with the given stream, schema, + * sync interval, and name */ - DataFileWriterBase(const char* filename, const ValidSchema& schema, + DataFileWriterBase(OutputStream *outputStream, const ValidSchema& schema, size_t syncInterval, Codec codec = NULL_CODEC); - ~DataFileWriterBase(); /** * Closes the current file. Once closed this datafile object cannot be * used for writing any more. @@ -128,14 +129,33 @@ public: template class DataFileWriter : boost::noncopyable { std::auto_ptr base_; + std::auto_ptr filestream_; public: /** * Constructs a new data file. */ DataFileWriter(const char* filename, const ValidSchema& schema, - size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : - base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) { } + size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) + { + filestream_ = fileOutputStream(filename); + base_ = std::auto_ptr(new DataFileWriterBase(filestream_.get(), schema, syncInterval, codec)); + } + /** + * Constructs a new data file using an OutputStream + */ + DataFileWriter(OutputStream *outputStream, const ValidSchema& schema, + size_t syncInterval = 16 * 1024, Codec code = NULL_CODEC) : + base_(new DataFileWriterBase(outputStream, schema, syncInterval, code)) { } + + + ~DataFileWriter() + { + if (filestream_.get()) { + close(); + } + } + /** * Writes the given piece of data into the file. */ @@ -149,7 +169,9 @@ public: * Closes the current file. Once closed this datafile object cannot be * used for writing any more. */ - void close() { base_->close(); } + void close() { + base_->close(); + } /** * Returns the schema for this data file. diff --git a/lang/c++/api/Stream.hh b/lang/c++/api/Stream.hh index 92b2334d2fb..7aebe29d1e1 100644 --- a/lang/c++/api/Stream.hh +++ b/lang/c++/api/Stream.hh @@ -70,7 +70,7 @@ public: /** * Returns the number of bytes read from this stream so far. * All the bytes made available through next are considered - * to be used unless, retutned back using backup. + * to be used unless, returned back using backup. */ virtual size_t byteCount() const = 0; }; @@ -108,7 +108,7 @@ public: /** * Number of bytes written so far into this stream. The whole buffer * returned by next() is assumed to be written unless some of - * it was retutned using backup(). + * it was returned using backup(). */ virtual uint64_t byteCount() const = 0; @@ -265,7 +265,7 @@ struct StreamReader { } /** - * Get as many byes from the underlying stream as possible in a single + * Get as many bytes from the underlying stream as possible in a single * chunk. * \return true if some data could be obtained. False is no more * data is available on the stream. diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc index 035dd27404a..810fdda3310 100644 --- a/lang/c++/impl/DataFile.cc +++ b/lang/c++/impl/DataFile.cc @@ -63,14 +63,16 @@ static string toString(const ValidSchema& schema) return oss.str(); } -DataFileWriterBase::DataFileWriterBase(const char* filename, +DataFileWriterBase::DataFileWriterBase(OutputStream *outputStream, const ValidSchema& schema, size_t syncInterval, Codec codec) : - filename_(filename), schema_(schema), encoderPtr_(binaryEncoder()), + schema_(schema), + encoderPtr_(binaryEncoder()), syncInterval_(syncInterval), codec_(codec), - stream_(fileOutputStream(filename)), + stream_(outputStream), buffer_(memoryOutputStream()), - sync_(makeSync()), objectCount_(0) + sync_(makeSync()), + objectCount_(0) { if (syncInterval < minSyncInterval || syncInterval > maxSyncInterval) { throw Exception(boost::format("Invalid sync interval: %1%. " @@ -92,17 +94,9 @@ DataFileWriterBase::DataFileWriterBase(const char* filename, encoderPtr_->init(*buffer_); } -DataFileWriterBase::~DataFileWriterBase() -{ - if (stream_.get()) { - close(); - } -} - void DataFileWriterBase::close() { flush(); - stream_.reset(); } void DataFileWriterBase::sync()