From 6c070e896153ca2296c38fb5811082844d03e922 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Fri, 15 May 2026 04:31:38 +0000 Subject: [PATCH] feat(dataframe): add writeCsv with CsvWriteOptions Closes #38 Mirrors writeParquet's surface for CSV: writeCsv(path) and writeCsv(path, options). Options class exposes singleFileOutput, partitionCols, hasHeader, delimiter, quote, escape, nullValue, and fileCompressionType. Uses the proto-over-JNI pattern (CsvWriteOptionsProto) rather than writeParquet's wide-JNI signature because the writer-side option set is much wider; a single byte[] keeps the JNI signature stable as more fields are added later. FileCompressionType is reused from csv_read_options.proto since the codec set is identical between read and write. Option on the Rust side stays None when no writer-side knob is set, so DataFusion's runtime defaults are preserved. Tests: 4 new options round-trip tests plus 7 integration tests (default round-trip, single-file, custom delimiter, gzip round-trip, retain-after-write, null path/options rejection). make test passes; cargo clippy/fmt and spotless are clean. --- .../apache/datafusion/CsvWriteOptions.java | 158 +++++++++++++++++ .../java/org/apache/datafusion/DataFrame.java | 34 ++++ .../datafusion/CsvWriteOptionsTest.java | 88 ++++++++++ .../datafusion/DataFrameWriteCsvTest.java | 165 ++++++++++++++++++ native/build.rs | 1 + native/src/csv.rs | 92 +++++++++- proto/csv_write_options.proto | 42 +++++ 7 files changed, 579 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/apache/datafusion/CsvWriteOptions.java create mode 100644 core/src/test/java/org/apache/datafusion/CsvWriteOptionsTest.java create mode 100644 core/src/test/java/org/apache/datafusion/DataFrameWriteCsvTest.java create mode 100644 proto/csv_write_options.proto diff --git a/core/src/main/java/org/apache/datafusion/CsvWriteOptions.java b/core/src/main/java/org/apache/datafusion/CsvWriteOptions.java new file mode 100644 index 0000000..957b4c3 --- /dev/null +++ b/core/src/main/java/org/apache/datafusion/CsvWriteOptions.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import java.util.ArrayList; +import java.util.List; + +/** + * Configuration knobs for writing CSV, passed to {@link DataFrame#writeCsv(String, + * CsvWriteOptions)}. + * + *

Mirrors a subset of DataFusion's {@code DataFrameWriteOptions} and the writer-side {@code + * CsvOptions}. All setters return {@code this} for fluent chaining. Defaults: every field {@code + * null} or empty (meaning the DataFusion default is used). + * + *

Path semantics: when {@link #singleFileOutput(boolean)} is {@code true}, the path passed to + * {@code writeCsv} is the literal output filename. When left unset (the default) and there are no + * partition columns, the path is treated as a directory that DataFusion populates with one or more + * part-files. + * + *

Compression reuses {@link CsvReadOptions.FileCompressionType} -- both the read and write sides + * accept the same codec set ({@code UNCOMPRESSED}, {@code GZIP}, {@code BZIP2}, {@code XZ}, {@code + * ZSTD}). + */ +public final class CsvWriteOptions { + + private Boolean singleFileOutput; + private final List partitionCols = new ArrayList<>(); + private Boolean hasHeader; + private Byte delimiter; + private Byte quote; + private Byte escape; + private String nullValue; + private CsvReadOptions.FileCompressionType fileCompressionType; + + /** + * When {@code true}, write to a single file at the supplied path. When left unset (the default) + * and no partition columns are configured, the path is treated as a directory and DataFusion + * writes one or more part-files. + */ + public CsvWriteOptions singleFileOutput(boolean v) { + this.singleFileOutput = v; + return this; + } + + /** + * Hive-style partition columns. Each column listed here is removed from the data rows and encoded + * into the directory layout (one subdirectory per distinct value). Mutually exclusive with {@link + * #singleFileOutput(boolean)} -- DataFusion rejects the combination at write time. + */ + public CsvWriteOptions partitionCols(String... cols) { + this.partitionCols.clear(); + for (String c : cols) { + this.partitionCols.add(c); + } + return this; + } + + /** Whether to write a header row. Defaults to DataFusion's setting (typically {@code true}). */ + public CsvWriteOptions hasHeader(boolean v) { + this.hasHeader = v; + return this; + } + + /** Field delimiter byte. Defaults to {@code ','}. */ + public CsvWriteOptions delimiter(byte b) { + this.delimiter = b; + return this; + } + + /** Quote character byte. Defaults to {@code '"'}. */ + public CsvWriteOptions quote(byte b) { + this.quote = b; + return this; + } + + /** Escape character byte. Defaults to none. */ + public CsvWriteOptions escape(byte b) { + this.escape = b; + return this; + } + + /** String to write for SQL NULL values. Defaults to the empty string. */ + public CsvWriteOptions nullValue(String s) { + this.nullValue = s; + return this; + } + + /** Output compression codec. Defaults to uncompressed. */ + public CsvWriteOptions fileCompressionType(CsvReadOptions.FileCompressionType t) { + this.fileCompressionType = t; + return this; + } + + byte[] toBytes() { + org.apache.datafusion.protobuf.CsvWriteOptionsProto.Builder b = + org.apache.datafusion.protobuf.CsvWriteOptionsProto.newBuilder(); + if (singleFileOutput != null) { + b.setSingleFileOutput(singleFileOutput); + } + b.addAllPartitionCols(partitionCols); + if (hasHeader != null) { + b.setHasHeader(hasHeader); + } + if (delimiter != null) { + b.setDelimiter(delimiter & 0xFF); + } + if (quote != null) { + b.setQuote(quote & 0xFF); + } + if (escape != null) { + b.setEscape(escape & 0xFF); + } + if (nullValue != null) { + b.setNullValue(nullValue); + } + if (fileCompressionType != null) { + b.setFileCompressionType(toProto(fileCompressionType)); + } + return b.build().toByteArray(); + } + + private static org.apache.datafusion.protobuf.FileCompressionType toProto( + CsvReadOptions.FileCompressionType t) { + switch (t) { + case UNCOMPRESSED: + return org.apache.datafusion.protobuf.FileCompressionType + .FILE_COMPRESSION_TYPE_UNCOMPRESSED; + case GZIP: + return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_GZIP; + case BZIP2: + return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_BZIP2; + case XZ: + return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_XZ; + case ZSTD: + return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_ZSTD; + default: + throw new IllegalArgumentException("unhandled FileCompressionType: " + t); + } + } +} diff --git a/core/src/main/java/org/apache/datafusion/DataFrame.java b/core/src/main/java/org/apache/datafusion/DataFrame.java index dceb497..01af6d0 100644 --- a/core/src/main/java/org/apache/datafusion/DataFrame.java +++ b/core/src/main/java/org/apache/datafusion/DataFrame.java @@ -201,6 +201,38 @@ public void writeParquet(String path, ParquetWriteOptions options) { options.singleFileOutput() != null && options.singleFileOutput()); } + /** + * Materialize this DataFrame as CSV at {@code path}. The path is treated as a directory unless + * overridden via {@link CsvWriteOptions#singleFileOutput(boolean)}. The receiver remains usable + * and must still be closed independently. + * + * @throws RuntimeException if the write fails. + */ + public void writeCsv(String path) { + writeCsv(path, new CsvWriteOptions()); + } + + /** + * Materialize this DataFrame as CSV at {@code path} with the supplied {@link CsvWriteOptions}. + * The receiver remains usable and must still be closed independently. + * + * @throws IllegalArgumentException if {@code path} or {@code options} is {@code null}. + * @throws RuntimeException if the write fails (path inaccessible, invalid compression spec, + * etc.). + */ + public void writeCsv(String path, CsvWriteOptions options) { + if (nativeHandle == 0) { + throw new IllegalStateException("DataFrame is closed or already collected"); + } + if (path == null) { + throw new IllegalArgumentException("writeCsv path must be non-null"); + } + if (options == null) { + throw new IllegalArgumentException("writeCsv options must be non-null"); + } + writeCsvWithOptions(nativeHandle, path, options.toBytes()); + } + @Override public void close() { if (nativeHandle != 0) { @@ -237,4 +269,6 @@ private static native void writeParquetWithOptions( String compression, boolean singleFileOutputSet, boolean singleFileOutputValue); + + private static native void writeCsvWithOptions(long handle, String path, byte[] optionsBytes); } diff --git a/core/src/test/java/org/apache/datafusion/CsvWriteOptionsTest.java b/core/src/test/java/org/apache/datafusion/CsvWriteOptionsTest.java new file mode 100644 index 0000000..743729a --- /dev/null +++ b/core/src/test/java/org/apache/datafusion/CsvWriteOptionsTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.datafusion.protobuf.CsvWriteOptionsProto; +import org.apache.datafusion.protobuf.FileCompressionType; +import org.junit.jupiter.api.Test; + +class CsvWriteOptionsTest { + + @Test + void defaultsLeaveEverythingUnset() throws Exception { + CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(new CsvWriteOptions().toBytes()); + + assertFalse(p.hasSingleFileOutput()); + assertEquals(0, p.getPartitionColsCount()); + assertFalse(p.hasHasHeader()); + assertFalse(p.hasDelimiter()); + assertFalse(p.hasQuote()); + assertFalse(p.hasEscape()); + assertFalse(p.hasNullValue()); + assertFalse(p.hasFileCompressionType()); + } + + @Test + void fluentSettersRoundTripThroughProto() throws Exception { + CsvWriteOptions opts = + new CsvWriteOptions() + .singleFileOutput(true) + .partitionCols("region", "year") + .hasHeader(false) + .delimiter((byte) '|') + .quote((byte) '\'') + .escape((byte) '\\') + .nullValue("\\N") + .fileCompressionType(CsvReadOptions.FileCompressionType.GZIP); + + CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes()); + + assertTrue(p.getSingleFileOutput()); + assertEquals(2, p.getPartitionColsCount()); + assertEquals("region", p.getPartitionCols(0)); + assertEquals("year", p.getPartitionCols(1)); + assertFalse(p.getHasHeader()); + assertEquals((int) '|', p.getDelimiter()); + assertEquals((int) '\'', p.getQuote()); + assertEquals((int) '\\', p.getEscape()); + assertEquals("\\N", p.getNullValue()); + assertEquals(FileCompressionType.FILE_COMPRESSION_TYPE_GZIP, p.getFileCompressionType()); + } + + @Test + void partitionColsResetOnSubsequentCalls() throws Exception { + CsvWriteOptions opts = new CsvWriteOptions().partitionCols("a", "b", "c").partitionCols("only"); + + CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes()); + assertEquals(1, p.getPartitionColsCount()); + assertEquals("only", p.getPartitionCols(0)); + } + + @Test + void delimiterAcceptsHighByteWithoutSignExtension() throws Exception { + CsvWriteOptions opts = new CsvWriteOptions().delimiter((byte) 0xC2); + CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes()); + assertEquals(0xC2, p.getDelimiter()); + } +} diff --git a/core/src/test/java/org/apache/datafusion/DataFrameWriteCsvTest.java b/core/src/test/java/org/apache/datafusion/DataFrameWriteCsvTest.java new file mode 100644 index 0000000..a424a4b --- /dev/null +++ b/core/src/test/java/org/apache/datafusion/DataFrameWriteCsvTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class DataFrameWriteCsvTest { + + private static Path writeCsv(Path dir, String name, String contents) throws IOException { + Path file = dir.resolve(name); + Files.writeString(file, contents); + return file; + } + + private static long countRowsAt(Path dirOrFile, CsvReadOptions readOpts) throws Exception { + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext()) { + ctx.registerCsv("t", dirOrFile.toAbsolutePath().toString(), readOpts); + try (DataFrame df = ctx.sql("SELECT COUNT(*) FROM t"); + ArrowReader reader = df.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + assertEquals(1, root.getRowCount()); + return ((BigIntVector) root.getVector(0)).get(0); + } + } + } + + @Test + void writeCsvRoundTripsRowCount(@TempDir Path tempDir) throws Exception { + Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n"); + Path out = tempDir.resolve("out"); + + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) { + df.writeCsv(out.toString()); + } + + assertEquals(3L, countRowsAt(out, new CsvReadOptions())); + } + + @Test + void writeCsvSingleFileProducesOneFile(@TempDir Path tempDir) throws Exception { + Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n"); + Path out = tempDir.resolve("out.csv"); + + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) { + df.writeCsv(out.toString(), new CsvWriteOptions().singleFileOutput(true)); + } + + assertTrue(Files.isRegularFile(out), "expected single file at " + out); + assertEquals(3L, countRowsAt(out, new CsvReadOptions())); + } + + @Test + void writeCsvWithCustomDelimiter(@TempDir Path tempDir) throws Exception { + Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n"); + Path out = tempDir.resolve("out.csv"); + + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) { + df.writeCsv( + out.toString(), new CsvWriteOptions().singleFileOutput(true).delimiter((byte) '|')); + } + + String written = Files.readString(out); + assertTrue(written.contains("|"), "expected pipe delimiter in output, got: " + written); + + assertEquals(2L, countRowsAt(out, new CsvReadOptions().delimiter((byte) '|'))); + } + + @Test + void writeCsvWithGzipCompressionRoundTrips(@TempDir Path tempDir) throws Exception { + Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n4,dan\n"); + Path out = tempDir.resolve("gz-out"); + + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) { + df.writeCsv( + out.toString(), + new CsvWriteOptions().fileCompressionType(CsvReadOptions.FileCompressionType.GZIP)); + } + + try (Stream stream = Files.walk(out)) { + List files = stream.filter(Files::isRegularFile).toList(); + assertTrue(!files.isEmpty(), "expected at least one part-file under " + out); + for (Path p : files) { + assertTrue( + p.getFileName().toString().endsWith(".gz"), + "expected .gz suffix on " + p.getFileName()); + } + } + + CsvReadOptions readOpts = + new CsvReadOptions() + .fileCompressionType(CsvReadOptions.FileCompressionType.GZIP) + .fileExtension(".csv.gz"); + assertEquals(4L, countRowsAt(out, readOpts)); + } + + @Test + void writeCsvRetainsDataFrame(@TempDir Path tempDir) throws Exception { + Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n"); + Path out = tempDir.resolve("retained"); + + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) { + df.writeCsv(out.toString()); + assertEquals(3L, df.count()); + } + } + + @Test + void writeCsvRejectsNullPath(@TempDir Path tempDir) throws Exception { + Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n"); + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) { + assertThrows(IllegalArgumentException.class, () -> df.writeCsv(null)); + } + } + + @Test + void writeCsvRejectsNullOptions(@TempDir Path tempDir) throws Exception { + Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n"); + Path out = tempDir.resolve("out"); + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) { + assertThrows(IllegalArgumentException.class, () -> df.writeCsv(out.toString(), null)); + } + } +} diff --git a/native/build.rs b/native/build.rs index 5a27cb0..b833b3f 100644 --- a/native/build.rs +++ b/native/build.rs @@ -19,6 +19,7 @@ fn main() { const PROTOS: &[&str] = &[ "../proto/session_options.proto", "../proto/csv_read_options.proto", + "../proto/csv_write_options.proto", "../proto/parquet_read_options.proto", ]; for p in PROTOS { diff --git a/native/src/csv.rs b/native/src/csv.rs index 3201951..70b38e1 100644 --- a/native/src/csv.rs +++ b/native/src/csv.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +use datafusion::common::config::CsvOptions; +use datafusion::common::parsers::CompressionTypeVariant; +use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::error::DataFusionError; use datafusion::prelude::{CsvReadOptions, SessionContext}; @@ -24,7 +27,9 @@ use jni::JNIEnv; use prost::Message; use crate::errors::{try_unwrap_or_throw, JniResult}; -use crate::proto_gen::{CsvReadOptionsProto, FileCompressionType as ProtoFileCompressionType}; +use crate::proto_gen::{ + CsvReadOptionsProto, CsvWriteOptionsProto, FileCompressionType as ProtoFileCompressionType, +}; use crate::runtime; use crate::schema::decode_optional_schema; @@ -128,3 +133,88 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_readCsvWithOpti }) }) } + +fn proto_compression_to_variant(p: ProtoFileCompressionType) -> JniResult { + match p { + ProtoFileCompressionType::Unspecified => { + Err("CsvWriteOptionsProto.file_compression_type is UNSPECIFIED".into()) + } + ProtoFileCompressionType::Uncompressed => Ok(CompressionTypeVariant::UNCOMPRESSED), + ProtoFileCompressionType::Gzip => Ok(CompressionTypeVariant::GZIP), + ProtoFileCompressionType::Bzip2 => Ok(CompressionTypeVariant::BZIP2), + ProtoFileCompressionType::Xz => Ok(CompressionTypeVariant::XZ), + ProtoFileCompressionType::Zstd => Ok(CompressionTypeVariant::ZSTD), + } +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_datafusion_DataFrame_writeCsvWithOptions<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, + path: JString<'local>, + options_bytes: JByteArray<'local>, +) { + try_unwrap_or_throw(&mut env, (), |env| -> JniResult<()> { + if handle == 0 { + return Err("DataFrame handle is null".into()); + } + let df = unsafe { &*(handle as *const DataFrame) }.clone(); + let path: String = env.get_string(&path)?.into(); + let bytes: Vec = env.convert_byte_array(&options_bytes)?; + let p = CsvWriteOptionsProto::decode(bytes.as_slice())?; + + // Decode the file_compression_type field eagerly so an unknown wire + // value surfaces as a clear error rather than a silent default. + let compression = if p.file_compression_type.is_some() { + Some(proto_compression_to_variant(p.file_compression_type())?) + } else { + None + }; + + let mut write_opts = DataFrameWriteOptions::new(); + if let Some(v) = p.single_file_output { + write_opts = write_opts.with_single_file_output(v); + } + if !p.partition_cols.is_empty() { + write_opts = write_opts.with_partition_by(p.partition_cols.clone()); + } + + // Build CsvOptions only when at least one writer-side knob is set, so + // the DataFusion default is preserved when the caller passes + // `new CsvWriteOptions()`. + let writer_opts: Option = if p.has_header.is_some() + || p.delimiter.is_some() + || p.quote.is_some() + || p.escape.is_some() + || p.null_value.is_some() + || compression.is_some() + { + let mut o = CsvOptions::default(); + if let Some(v) = p.has_header { + o = o.with_has_header(v); + } + if let Some(v) = p.delimiter { + o = o.with_delimiter(v as u8); + } + if let Some(v) = p.quote { + o = o.with_quote(v as u8); + } + if let Some(v) = p.escape { + o = o.with_escape(Some(v as u8)); + } + if let Some(v) = p.null_value { + o.null_value = Some(v); + } + if let Some(v) = compression { + o = o.with_file_compression_type(v); + } + Some(o) + } else { + None + }; + + runtime().block_on(df.write_csv(&path, write_opts, writer_opts))?; + Ok(()) + }) +} diff --git a/proto/csv_write_options.proto b/proto/csv_write_options.proto new file mode 100644 index 0000000..5d294b2 --- /dev/null +++ b/proto/csv_write_options.proto @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto3"; + +package datafusion_java; + +import "csv_read_options.proto"; + +option java_package = "org.apache.datafusion.protobuf"; +option java_multiple_files = true; + +// Options used to write CSV files. Fields with non-null Java defaults are +// always sent; fields marked `optional` preserve unset-ness so the Rust side +// can leave a DataFusion default in place. `FileCompressionType` is reused +// from `csv_read_options.proto` since the codec set is identical to the read +// side. Note: `partition_cols` is sent as a repeated string so an empty list +// (the default) round-trips unambiguously. +message CsvWriteOptionsProto { + optional bool single_file_output = 1; + repeated string partition_cols = 2; + optional bool has_header = 3; + optional uint32 delimiter = 4; + optional uint32 quote = 5; + optional uint32 escape = 6; + optional string null_value = 7; + optional FileCompressionType file_compression_type = 8; +}