Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support #10230

Closed
wants to merge 3 commits into from

Conversation

lidavidm
Copy link
Member

@lidavidm lidavidm commented May 3, 2021

This refactors the CSV write support to expose an explicit CSV writer class, and adds Python bindings and Datasets support.

@github-actions
Copy link

github-actions bot commented May 3, 2021

@lidavidm
Copy link
Member Author

lidavidm commented May 4, 2021

@emkornfield would you be free to take a look (at least the CSV side, if not the Datasets side)? No rush of course.

@jorisvandenbossche
Copy link
Member

Does this also enable writing CSV with the dataset API in Python? (write_dataset(..., format="csv")

@lidavidm
Copy link
Member Author

lidavidm commented May 4, 2021

@jorisvandenbossche I missed that: CsvFileFormat.make_write_options in Python needs to be updated as well.

@nealrichardson
Copy link
Contributor

There's probably a very small amount of wiring to propagate this up to the R write_dataset() function; up to you if you want to handle it here or make another JIRA for it.

@lidavidm
Copy link
Member Author

lidavidm commented May 4, 2021

I threw in R support and found & fixed a bug with scanning CSV datasets with manually-specified names.

@lidavidm lidavidm force-pushed the arrow-12512 branch 2 times, most recently from ddcf58c to 893db1f Compare May 6, 2021 13:23
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't look at the CSV writer much so some of my comments may have been on that.

Do we have a JIRA for allowing incremental writes to a CSV file (using file append)? Or would that be possible today?

MemoryPool* pool) {
static Result<std::shared_ptr<CSVConverter>> Make(
io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe take in IOContext instead of MemoryPool*? If you later decide to add support for cancellation it'll save you from having to change the API.

}

return Status::OK();
}

Status Close() override { return Status::OK(); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to close owned_sink_?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IPC reader doesn't do this either, oddly. I guess it is not a Rust 'exclusively owned' sink but merely, 'keep this sink alive'. (Though, that does beg the question: what's the point? Either you're the only one keeping it alive, and so you should close it, or you aren't the only one, and you don't need a shared_ptr. I would guess it's just less of a footgun to have a strong reference than a potentially dangling one, though.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other places I can think of (Buffer comes to mind), the way this is handled is passing a unique_ptr instead of a shared_ptr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we don't close either in other "writer" classes. This is more flexible, though of course in the general case not very useful.

TableBatchReader reader(table);
reader.set_chunksize(options.batch_size);
RETURN_NOT_OK(PrepareForContentsWrite(options, out));
reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a little odd to have two options to control batch_size. I suppose it's a "default" batch size and a "specific for this table" batch size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bit of an impedance mismatch because I elected to reuse the ipc::RecordBatchWriter interface, which has parameters like that in the API. I could at least introduce an overload that doesn't require specifying it for convenience.


Status PrepareForContentsWrite(const WriteOptions& options, io::OutputStream* out) {
Status PrepareForContentsWrite() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does data_buffer_ ever revert back to nullptr? Why isn't it just initialized once at construction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at one point I might have been using as a signal to see if header was written, but I can't really remember. I agree it is strange and I don't have a strong justification for this pattern. It might of been to avoid having to make a factory function for the private class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

@@ -355,7 +370,9 @@ class CSVConverter {
return header_length + (kQuoteDelimiterCount * schema_->num_fields());
}

Status WriteHeader(io::OutputStream* out) {
Status WriteHeader() {
if (header_written_) return Status::OK();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be clearer to return Invalid here to inform the caller they are doing something odd? Or is it sometimes hard for the caller to know when the header will be written?

new CsvFileWriteOptions(shared_from_this()));
csv_options->options =
std::make_shared<csv::WriteOptions>(csv::WriteOptions::Defaults());
csv_options->pool = default_memory_pool();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little surprised that pool is not a property of FileWriteOptions.

class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions {
public:
/// Options passed to csv::MakeCSVWriter. use_threads is ignored
std::shared_ptr<csv::WriteOptions> options;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

options is a little ambiguous. Perhaps format_options or csv_options or writer_options?


cdef class WriteOptions(_Weakrefable):
cdef:
unique_ptr[CCSVWriteOptions] options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be a unique_ptr? CCSVWriteOptions is pretty trivial.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly for consistency with the other options, and in case we add things to WriteOptions that would make it a non-standard layout type, in which case Cython will generate a lot of compiler warnings as it relies on sizeof.

@@ -1747,8 +1749,15 @@ cdef class CsvFileFormat(FileFormat):
FileFormat.init(self, sp)
self.csv_format = <CCsvFileFormat*> sp.get()

def make_write_options(self):
raise NotImplemented("writing CSV datasets")
def make_write_options(self, WriteOptions options=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of confusing having a method named make_write_options that takes in an instance of WriteOptions. Perhaps in C++ it wouldn't be so bad but for Python I think we might want something more understandable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have it take **kwargs which get forwarded to csv.WriteOptions; now that I look, that's what ParquetFileFormat does.

table = pa.table([
pa.array(range(20)), pa.array(np.random.randn(20)),
pa.array(np.repeat(['a', 'b'], 10))
], names=["f1", "f2", "part"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column here is named part which makes me think it is going to be used for partitioning but that isn't actually done. I'm not sure this is a problem as much as an observation.

@@ -403,34 +415,44 @@ class CSVConverter {
}

static constexpr int64_t kColumnSizeGuess = 8;
io::OutputStream* sink_;
std::shared_ptr<io::OutputStream> owned_sink_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shared_ptr seems strange in general for a OutputStream which seems for the most part should have only one owner.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it seems weird, but both the IPC and Parquet writers use shared_ptr for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, except that in Python any object can be shared, even if it's logically "owned" by something.

ASSIGN_OR_RAISE(std::unique_ptr<CSVConverter> converter,
CSVConverter::Make(table.schema(), pool));
return converter->WriteCSV(table, options, output);
ASSIGN_OR_RAISE(auto converter, MakeCSVWriter(output, table.schema(), options));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should converter now be writer? (same question below)

@@ -83,6 +82,35 @@ struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions {
csv::ReadOptions read_options = csv::ReadOptions::Defaults();
};

class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions {
public:
/// Options passed to csv::MakeCSVWriter. use_threads is ignored
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is use_threads used elsewhere? The way the code is structured threads could be used for the casts, so if it is important we might want to file a follow-up JIRA.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this from the equivalent IPC struct - it doesn't apply here since there's no such parameter of course.

else:
raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")


cdef class CsvWriter(_CRecordBatchWriter):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: as much as I appreciate Csv naming convention I think CSV is used everywhere else?

@@ -1819,6 +1824,28 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions):
self.read_options)


cdef class CsvFileWriteOptions(FileWriteOptions):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same nit on Csv vs CSV

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately in the context of datasets (and only datasets) all other classes already use Csv.

@emkornfield
Copy link
Contributor

Took a quick pass through, seems OK to me. (didn't look at R stuff at all) and I agree with Weston's comments.

@lidavidm
Copy link
Member Author

Thanks for the reviews. I think I've addressed all feedback, minus the shared_ptr - while this is weird, it is the pattern used by IPC and Parquet as well and I think we may was well be consistent across the formats. (Also, IPC exposes both the output-owning and output-borrowing APIs too, even though it expects the caller to close the stream in both cases.)

@lidavidm
Copy link
Member Author

Just to follow up - APIs like FileSystem return shared_ptr<OutputStream> so it would be very annoying to take unique_ptr. And we could just take only OutputStream* but IMO even if the caller is supposed to keep the pointer alive, it's safer to offer the option to take a shared_ptr by default to minimize mistakes.

@lidavidm
Copy link
Member Author

Rebased/fixed conflicts.

@lidavidm
Copy link
Member Author

Rebased/fixed conflicts again.

On the shared ptr vs raw pointer: I think the other issue is that we allow constructing a shared_ptr<Writer> at which point we should hold a shared_ptr<OutputStream> or unique_ptr<OutputStream>. But the filesystem interfaces only give a shared_ptr<OutputStream>, hence we can only take a shared_ptr here.

We could perhaps not allow you to get a shared_ptr<Writer> and only let you use Writer or Writer* and then only accept a OutputStream*.

@emkornfield
Copy link
Contributor

I don't think I had any objections before but i can rereview if you want. The writer api stuff is unfortunate but doesn't need to be addressed here.

@lidavidm
Copy link
Member Author

Ah, ok, I was unsure how to deal with that part. I'd appreciate another look (though I don't think anything substantial has changed; I did have to rebase a few times) before I merge, though.

@lidavidm
Copy link
Member Author

Rebased and fixed conflicts here.

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

std::vector<std::unique_ptr<ColumnPopulator>> column_populators_;
std::vector<int32_t, arrow::stl::allocator<int32_t>> offsets_;
std::shared_ptr<ResizableBuffer> data_buffer_;
const std::shared_ptr<Schema> schema_;
MemoryPool* pool_;
WriteOptions options_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: const?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the const.

}
return std::unique_ptr<CSVConverter>(
new CSVConverter(std::move(schema), std::move(populators), pool));
auto writer = std::shared_ptr<CSVWriterImpl>(new CSVWriterImpl(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: std::make_shared?

std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool)
: column_populators_(std::move(populators)),
offsets_(0, 0, ::arrow::stl::allocator<char*>(pool)),
CSVWriterImpl(io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess this would need to be public to used std::make_shared.


/// \brief Create a new CSV writer.
///
/// \param[in] sink output stream to write to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also note that ownership is not taken here?

CCSVWriteOptions c_write_options
CMemoryPool* c_memory_pool = maybe_unbox_memory_pool(memory_pool)
_get_write_options(write_options, &c_write_options)
c_write_options.io_context = CIOContext(c_memory_pool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IOContext is new to me in general. Should we be making new API's take that instead of Memory pool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say yes, since it wraps up a memory pool, thread pool, and cancellation token all in one.

@emkornfield
Copy link
Contributor

A few random nits, but the core C++ looks OK to me.

@pitrou pitrou closed this in 0ebed2b Jul 5, 2021
@pitrou
Copy link
Member

pitrou commented Jul 5, 2021

Thanks for the updates @lidavidm !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants