diff --git a/core/src/main/java/org/apache/datafusion/CsvWriteOptions.java b/core/src/main/java/org/apache/datafusion/CsvWriteOptions.java
new file mode 100644
index 0000000..957b4c3
--- /dev/null
+++ b/core/src/main/java/org/apache/datafusion/CsvWriteOptions.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Configuration knobs for writing CSV, passed to {@link DataFrame#writeCsv(String,
+ * CsvWriteOptions)}.
+ *
+ *
Mirrors a subset of DataFusion's {@code DataFrameWriteOptions} and the writer-side {@code
+ * CsvOptions}. All setters return {@code this} for fluent chaining. Defaults: every field {@code
+ * null} or empty (meaning the DataFusion default is used).
+ *
+ *
Path semantics: when {@link #singleFileOutput(boolean)} is {@code true}, the path passed to
+ * {@code writeCsv} is the literal output filename. When left unset (the default) and there are no
+ * partition columns, the path is treated as a directory that DataFusion populates with one or more
+ * part-files.
+ *
+ *
Compression reuses {@link CsvReadOptions.FileCompressionType} -- both the read and write sides
+ * accept the same codec set ({@code UNCOMPRESSED}, {@code GZIP}, {@code BZIP2}, {@code XZ}, {@code
+ * ZSTD}).
+ */
+public final class CsvWriteOptions {
+
+ private Boolean singleFileOutput;
+ private final List partitionCols = new ArrayList<>();
+ private Boolean hasHeader;
+ private Byte delimiter;
+ private Byte quote;
+ private Byte escape;
+ private String nullValue;
+ private CsvReadOptions.FileCompressionType fileCompressionType;
+
+ /**
+ * When {@code true}, write to a single file at the supplied path. When left unset (the default)
+ * and no partition columns are configured, the path is treated as a directory and DataFusion
+ * writes one or more part-files.
+ */
+ public CsvWriteOptions singleFileOutput(boolean v) {
+ this.singleFileOutput = v;
+ return this;
+ }
+
+ /**
+ * Hive-style partition columns. Each column listed here is removed from the data rows and encoded
+ * into the directory layout (one subdirectory per distinct value). Mutually exclusive with {@link
+ * #singleFileOutput(boolean)} -- DataFusion rejects the combination at write time.
+ */
+ public CsvWriteOptions partitionCols(String... cols) {
+ this.partitionCols.clear();
+ for (String c : cols) {
+ this.partitionCols.add(c);
+ }
+ return this;
+ }
+
+ /** Whether to write a header row. Defaults to DataFusion's setting (typically {@code true}). */
+ public CsvWriteOptions hasHeader(boolean v) {
+ this.hasHeader = v;
+ return this;
+ }
+
+ /** Field delimiter byte. Defaults to {@code ','}. */
+ public CsvWriteOptions delimiter(byte b) {
+ this.delimiter = b;
+ return this;
+ }
+
+ /** Quote character byte. Defaults to {@code '"'}. */
+ public CsvWriteOptions quote(byte b) {
+ this.quote = b;
+ return this;
+ }
+
+ /** Escape character byte. Defaults to none. */
+ public CsvWriteOptions escape(byte b) {
+ this.escape = b;
+ return this;
+ }
+
+ /** String to write for SQL NULL values. Defaults to the empty string. */
+ public CsvWriteOptions nullValue(String s) {
+ this.nullValue = s;
+ return this;
+ }
+
+ /** Output compression codec. Defaults to uncompressed. */
+ public CsvWriteOptions fileCompressionType(CsvReadOptions.FileCompressionType t) {
+ this.fileCompressionType = t;
+ return this;
+ }
+
+ byte[] toBytes() {
+ org.apache.datafusion.protobuf.CsvWriteOptionsProto.Builder b =
+ org.apache.datafusion.protobuf.CsvWriteOptionsProto.newBuilder();
+ if (singleFileOutput != null) {
+ b.setSingleFileOutput(singleFileOutput);
+ }
+ b.addAllPartitionCols(partitionCols);
+ if (hasHeader != null) {
+ b.setHasHeader(hasHeader);
+ }
+ if (delimiter != null) {
+ b.setDelimiter(delimiter & 0xFF);
+ }
+ if (quote != null) {
+ b.setQuote(quote & 0xFF);
+ }
+ if (escape != null) {
+ b.setEscape(escape & 0xFF);
+ }
+ if (nullValue != null) {
+ b.setNullValue(nullValue);
+ }
+ if (fileCompressionType != null) {
+ b.setFileCompressionType(toProto(fileCompressionType));
+ }
+ return b.build().toByteArray();
+ }
+
+ private static org.apache.datafusion.protobuf.FileCompressionType toProto(
+ CsvReadOptions.FileCompressionType t) {
+ switch (t) {
+ case UNCOMPRESSED:
+ return org.apache.datafusion.protobuf.FileCompressionType
+ .FILE_COMPRESSION_TYPE_UNCOMPRESSED;
+ case GZIP:
+ return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_GZIP;
+ case BZIP2:
+ return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_BZIP2;
+ case XZ:
+ return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_XZ;
+ case ZSTD:
+ return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_ZSTD;
+ default:
+ throw new IllegalArgumentException("unhandled FileCompressionType: " + t);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/datafusion/DataFrame.java b/core/src/main/java/org/apache/datafusion/DataFrame.java
index dceb497..01af6d0 100644
--- a/core/src/main/java/org/apache/datafusion/DataFrame.java
+++ b/core/src/main/java/org/apache/datafusion/DataFrame.java
@@ -201,6 +201,38 @@ public void writeParquet(String path, ParquetWriteOptions options) {
options.singleFileOutput() != null && options.singleFileOutput());
}
+ /**
+ * Materialize this DataFrame as CSV at {@code path}. The path is treated as a directory unless
+ * overridden via {@link CsvWriteOptions#singleFileOutput(boolean)}. The receiver remains usable
+ * and must still be closed independently.
+ *
+ * @throws RuntimeException if the write fails.
+ */
+ public void writeCsv(String path) {
+ writeCsv(path, new CsvWriteOptions());
+ }
+
+ /**
+ * Materialize this DataFrame as CSV at {@code path} with the supplied {@link CsvWriteOptions}.
+ * The receiver remains usable and must still be closed independently.
+ *
+ * @throws IllegalArgumentException if {@code path} or {@code options} is {@code null}.
+ * @throws RuntimeException if the write fails (path inaccessible, invalid compression spec,
+ * etc.).
+ */
+ public void writeCsv(String path, CsvWriteOptions options) {
+ if (nativeHandle == 0) {
+ throw new IllegalStateException("DataFrame is closed or already collected");
+ }
+ if (path == null) {
+ throw new IllegalArgumentException("writeCsv path must be non-null");
+ }
+ if (options == null) {
+ throw new IllegalArgumentException("writeCsv options must be non-null");
+ }
+ writeCsvWithOptions(nativeHandle, path, options.toBytes());
+ }
+
@Override
public void close() {
if (nativeHandle != 0) {
@@ -237,4 +269,6 @@ private static native void writeParquetWithOptions(
String compression,
boolean singleFileOutputSet,
boolean singleFileOutputValue);
+
+ private static native void writeCsvWithOptions(long handle, String path, byte[] optionsBytes);
}
diff --git a/core/src/test/java/org/apache/datafusion/CsvWriteOptionsTest.java b/core/src/test/java/org/apache/datafusion/CsvWriteOptionsTest.java
new file mode 100644
index 0000000..743729a
--- /dev/null
+++ b/core/src/test/java/org/apache/datafusion/CsvWriteOptionsTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.datafusion.protobuf.CsvWriteOptionsProto;
+import org.apache.datafusion.protobuf.FileCompressionType;
+import org.junit.jupiter.api.Test;
+
+class CsvWriteOptionsTest {
+
+ @Test
+ void defaultsLeaveEverythingUnset() throws Exception {
+ CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(new CsvWriteOptions().toBytes());
+
+ assertFalse(p.hasSingleFileOutput());
+ assertEquals(0, p.getPartitionColsCount());
+ assertFalse(p.hasHasHeader());
+ assertFalse(p.hasDelimiter());
+ assertFalse(p.hasQuote());
+ assertFalse(p.hasEscape());
+ assertFalse(p.hasNullValue());
+ assertFalse(p.hasFileCompressionType());
+ }
+
+ @Test
+ void fluentSettersRoundTripThroughProto() throws Exception {
+ CsvWriteOptions opts =
+ new CsvWriteOptions()
+ .singleFileOutput(true)
+ .partitionCols("region", "year")
+ .hasHeader(false)
+ .delimiter((byte) '|')
+ .quote((byte) '\'')
+ .escape((byte) '\\')
+ .nullValue("\\N")
+ .fileCompressionType(CsvReadOptions.FileCompressionType.GZIP);
+
+ CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes());
+
+ assertTrue(p.getSingleFileOutput());
+ assertEquals(2, p.getPartitionColsCount());
+ assertEquals("region", p.getPartitionCols(0));
+ assertEquals("year", p.getPartitionCols(1));
+ assertFalse(p.getHasHeader());
+ assertEquals((int) '|', p.getDelimiter());
+ assertEquals((int) '\'', p.getQuote());
+ assertEquals((int) '\\', p.getEscape());
+ assertEquals("\\N", p.getNullValue());
+ assertEquals(FileCompressionType.FILE_COMPRESSION_TYPE_GZIP, p.getFileCompressionType());
+ }
+
+ @Test
+ void partitionColsResetOnSubsequentCalls() throws Exception {
+ CsvWriteOptions opts = new CsvWriteOptions().partitionCols("a", "b", "c").partitionCols("only");
+
+ CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes());
+ assertEquals(1, p.getPartitionColsCount());
+ assertEquals("only", p.getPartitionCols(0));
+ }
+
+ @Test
+ void delimiterAcceptsHighByteWithoutSignExtension() throws Exception {
+ CsvWriteOptions opts = new CsvWriteOptions().delimiter((byte) 0xC2);
+ CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes());
+ assertEquals(0xC2, p.getDelimiter());
+ }
+}
diff --git a/core/src/test/java/org/apache/datafusion/DataFrameWriteCsvTest.java b/core/src/test/java/org/apache/datafusion/DataFrameWriteCsvTest.java
new file mode 100644
index 0000000..a424a4b
--- /dev/null
+++ b/core/src/test/java/org/apache/datafusion/DataFrameWriteCsvTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class DataFrameWriteCsvTest {
+
+ private static Path writeCsv(Path dir, String name, String contents) throws IOException {
+ Path file = dir.resolve(name);
+ Files.writeString(file, contents);
+ return file;
+ }
+
+ private static long countRowsAt(Path dirOrFile, CsvReadOptions readOpts) throws Exception {
+ try (BufferAllocator allocator = new RootAllocator();
+ SessionContext ctx = new SessionContext()) {
+ ctx.registerCsv("t", dirOrFile.toAbsolutePath().toString(), readOpts);
+ try (DataFrame df = ctx.sql("SELECT COUNT(*) FROM t");
+ ArrowReader reader = df.collect(allocator)) {
+ assertTrue(reader.loadNextBatch());
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ assertEquals(1, root.getRowCount());
+ return ((BigIntVector) root.getVector(0)).get(0);
+ }
+ }
+ }
+
+ @Test
+ void writeCsvRoundTripsRowCount(@TempDir Path tempDir) throws Exception {
+ Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n");
+ Path out = tempDir.resolve("out");
+
+ try (SessionContext ctx = new SessionContext();
+ DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+ df.writeCsv(out.toString());
+ }
+
+ assertEquals(3L, countRowsAt(out, new CsvReadOptions()));
+ }
+
+ @Test
+ void writeCsvSingleFileProducesOneFile(@TempDir Path tempDir) throws Exception {
+ Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n");
+ Path out = tempDir.resolve("out.csv");
+
+ try (SessionContext ctx = new SessionContext();
+ DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+ df.writeCsv(out.toString(), new CsvWriteOptions().singleFileOutput(true));
+ }
+
+ assertTrue(Files.isRegularFile(out), "expected single file at " + out);
+ assertEquals(3L, countRowsAt(out, new CsvReadOptions()));
+ }
+
+ @Test
+ void writeCsvWithCustomDelimiter(@TempDir Path tempDir) throws Exception {
+ Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n");
+ Path out = tempDir.resolve("out.csv");
+
+ try (SessionContext ctx = new SessionContext();
+ DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+ df.writeCsv(
+ out.toString(), new CsvWriteOptions().singleFileOutput(true).delimiter((byte) '|'));
+ }
+
+ String written = Files.readString(out);
+ assertTrue(written.contains("|"), "expected pipe delimiter in output, got: " + written);
+
+ assertEquals(2L, countRowsAt(out, new CsvReadOptions().delimiter((byte) '|')));
+ }
+
+ @Test
+ void writeCsvWithGzipCompressionRoundTrips(@TempDir Path tempDir) throws Exception {
+ Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n4,dan\n");
+ Path out = tempDir.resolve("gz-out");
+
+ try (SessionContext ctx = new SessionContext();
+ DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+ df.writeCsv(
+ out.toString(),
+ new CsvWriteOptions().fileCompressionType(CsvReadOptions.FileCompressionType.GZIP));
+ }
+
+ try (Stream stream = Files.walk(out)) {
+ List files = stream.filter(Files::isRegularFile).toList();
+ assertTrue(!files.isEmpty(), "expected at least one part-file under " + out);
+ for (Path p : files) {
+ assertTrue(
+ p.getFileName().toString().endsWith(".gz"),
+ "expected .gz suffix on " + p.getFileName());
+ }
+ }
+
+ CsvReadOptions readOpts =
+ new CsvReadOptions()
+ .fileCompressionType(CsvReadOptions.FileCompressionType.GZIP)
+ .fileExtension(".csv.gz");
+ assertEquals(4L, countRowsAt(out, readOpts));
+ }
+
+ @Test
+ void writeCsvRetainsDataFrame(@TempDir Path tempDir) throws Exception {
+ Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n");
+ Path out = tempDir.resolve("retained");
+
+ try (SessionContext ctx = new SessionContext();
+ DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+ df.writeCsv(out.toString());
+ assertEquals(3L, df.count());
+ }
+ }
+
+ @Test
+ void writeCsvRejectsNullPath(@TempDir Path tempDir) throws Exception {
+ Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n");
+ try (SessionContext ctx = new SessionContext();
+ DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+ assertThrows(IllegalArgumentException.class, () -> df.writeCsv(null));
+ }
+ }
+
+ @Test
+ void writeCsvRejectsNullOptions(@TempDir Path tempDir) throws Exception {
+ Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n");
+ Path out = tempDir.resolve("out");
+ try (SessionContext ctx = new SessionContext();
+ DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+ assertThrows(IllegalArgumentException.class, () -> df.writeCsv(out.toString(), null));
+ }
+ }
+}
diff --git a/native/build.rs b/native/build.rs
index 5a27cb0..b833b3f 100644
--- a/native/build.rs
+++ b/native/build.rs
@@ -19,6 +19,7 @@ fn main() {
const PROTOS: &[&str] = &[
"../proto/session_options.proto",
"../proto/csv_read_options.proto",
+ "../proto/csv_write_options.proto",
"../proto/parquet_read_options.proto",
];
for p in PROTOS {
diff --git a/native/src/csv.rs b/native/src/csv.rs
index 3201951..70b38e1 100644
--- a/native/src/csv.rs
+++ b/native/src/csv.rs
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::common::config::CsvOptions;
+use datafusion::common::parsers::CompressionTypeVariant;
+use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::error::DataFusionError;
use datafusion::prelude::{CsvReadOptions, SessionContext};
@@ -24,7 +27,9 @@ use jni::JNIEnv;
use prost::Message;
use crate::errors::{try_unwrap_or_throw, JniResult};
-use crate::proto_gen::{CsvReadOptionsProto, FileCompressionType as ProtoFileCompressionType};
+use crate::proto_gen::{
+ CsvReadOptionsProto, CsvWriteOptionsProto, FileCompressionType as ProtoFileCompressionType,
+};
use crate::runtime;
use crate::schema::decode_optional_schema;
@@ -128,3 +133,88 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_readCsvWithOpti
})
})
}
+
+fn proto_compression_to_variant(p: ProtoFileCompressionType) -> JniResult {
+ match p {
+ ProtoFileCompressionType::Unspecified => {
+ Err("CsvWriteOptionsProto.file_compression_type is UNSPECIFIED".into())
+ }
+ ProtoFileCompressionType::Uncompressed => Ok(CompressionTypeVariant::UNCOMPRESSED),
+ ProtoFileCompressionType::Gzip => Ok(CompressionTypeVariant::GZIP),
+ ProtoFileCompressionType::Bzip2 => Ok(CompressionTypeVariant::BZIP2),
+ ProtoFileCompressionType::Xz => Ok(CompressionTypeVariant::XZ),
+ ProtoFileCompressionType::Zstd => Ok(CompressionTypeVariant::ZSTD),
+ }
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_datafusion_DataFrame_writeCsvWithOptions<'local>(
+ mut env: JNIEnv<'local>,
+ _class: JClass<'local>,
+ handle: jlong,
+ path: JString<'local>,
+ options_bytes: JByteArray<'local>,
+) {
+ try_unwrap_or_throw(&mut env, (), |env| -> JniResult<()> {
+ if handle == 0 {
+ return Err("DataFrame handle is null".into());
+ }
+ let df = unsafe { &*(handle as *const DataFrame) }.clone();
+ let path: String = env.get_string(&path)?.into();
+ let bytes: Vec = env.convert_byte_array(&options_bytes)?;
+ let p = CsvWriteOptionsProto::decode(bytes.as_slice())?;
+
+ // Decode the file_compression_type field eagerly so an unknown wire
+ // value surfaces as a clear error rather than a silent default.
+ let compression = if p.file_compression_type.is_some() {
+ Some(proto_compression_to_variant(p.file_compression_type())?)
+ } else {
+ None
+ };
+
+ let mut write_opts = DataFrameWriteOptions::new();
+ if let Some(v) = p.single_file_output {
+ write_opts = write_opts.with_single_file_output(v);
+ }
+ if !p.partition_cols.is_empty() {
+ write_opts = write_opts.with_partition_by(p.partition_cols.clone());
+ }
+
+ // Build CsvOptions only when at least one writer-side knob is set, so
+ // the DataFusion default is preserved when the caller passes
+ // `new CsvWriteOptions()`.
+ let writer_opts: Option = if p.has_header.is_some()
+ || p.delimiter.is_some()
+ || p.quote.is_some()
+ || p.escape.is_some()
+ || p.null_value.is_some()
+ || compression.is_some()
+ {
+ let mut o = CsvOptions::default();
+ if let Some(v) = p.has_header {
+ o = o.with_has_header(v);
+ }
+ if let Some(v) = p.delimiter {
+ o = o.with_delimiter(v as u8);
+ }
+ if let Some(v) = p.quote {
+ o = o.with_quote(v as u8);
+ }
+ if let Some(v) = p.escape {
+ o = o.with_escape(Some(v as u8));
+ }
+ if let Some(v) = p.null_value {
+ o.null_value = Some(v);
+ }
+ if let Some(v) = compression {
+ o = o.with_file_compression_type(v);
+ }
+ Some(o)
+ } else {
+ None
+ };
+
+ runtime().block_on(df.write_csv(&path, write_opts, writer_opts))?;
+ Ok(())
+ })
+}
diff --git a/proto/csv_write_options.proto b/proto/csv_write_options.proto
new file mode 100644
index 0000000..5d294b2
--- /dev/null
+++ b/proto/csv_write_options.proto
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+package datafusion_java;
+
+import "csv_read_options.proto";
+
+option java_package = "org.apache.datafusion.protobuf";
+option java_multiple_files = true;
+
+// Options used to write CSV files. Fields with non-null Java defaults are
+// always sent; fields marked `optional` preserve unset-ness so the Rust side
+// can leave a DataFusion default in place. `FileCompressionType` is reused
+// from `csv_read_options.proto` since the codec set is identical to the read
+// side. Note: `partition_cols` is sent as a repeated string so an empty list
+// (the default) round-trips unambiguously.
+message CsvWriteOptionsProto {
+ optional bool single_file_output = 1;
+ repeated string partition_cols = 2;
+ optional bool has_header = 3;
+ optional uint32 delimiter = 4;
+ optional uint32 quote = 5;
+ optional uint32 escape = 6;
+ optional string null_value = 7;
+ optional FileCompressionType file_compression_type = 8;
+}