Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions core/src/main/java/org/apache/datafusion/CsvWriteOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import java.util.ArrayList;
import java.util.List;

/**
* Configuration knobs for writing CSV, passed to {@link DataFrame#writeCsv(String,
* CsvWriteOptions)}.
*
* <p>Mirrors a subset of DataFusion's {@code DataFrameWriteOptions} and the writer-side {@code
* CsvOptions}. All setters return {@code this} for fluent chaining. Defaults: every field {@code
* null} or empty (meaning the DataFusion default is used).
*
* <p>Path semantics: when {@link #singleFileOutput(boolean)} is {@code true}, the path passed to
* {@code writeCsv} is the literal output filename. When left unset (the default) and there are no
* partition columns, the path is treated as a directory that DataFusion populates with one or more
* part-files.
*
* <p>Compression reuses {@link CsvReadOptions.FileCompressionType} -- both the read and write sides
* accept the same codec set ({@code UNCOMPRESSED}, {@code GZIP}, {@code BZIP2}, {@code XZ}, {@code
* ZSTD}).
*/
public final class CsvWriteOptions {

private Boolean singleFileOutput;
private final List<String> partitionCols = new ArrayList<>();
private Boolean hasHeader;
private Byte delimiter;
private Byte quote;
private Byte escape;
private String nullValue;
private CsvReadOptions.FileCompressionType fileCompressionType;

/**
* When {@code true}, write to a single file at the supplied path. When left unset (the default)
* and no partition columns are configured, the path is treated as a directory and DataFusion
* writes one or more part-files.
*/
public CsvWriteOptions singleFileOutput(boolean v) {
this.singleFileOutput = v;
return this;
}

/**
* Hive-style partition columns. Each column listed here is removed from the data rows and encoded
* into the directory layout (one subdirectory per distinct value). Mutually exclusive with {@link
* #singleFileOutput(boolean)} -- DataFusion rejects the combination at write time.
*/
public CsvWriteOptions partitionCols(String... cols) {
this.partitionCols.clear();
for (String c : cols) {
this.partitionCols.add(c);
}
return this;
}

/** Whether to write a header row. Defaults to DataFusion's setting (typically {@code true}). */
public CsvWriteOptions hasHeader(boolean v) {
this.hasHeader = v;
return this;
}

/** Field delimiter byte. Defaults to {@code ','}. */
public CsvWriteOptions delimiter(byte b) {
this.delimiter = b;
return this;
}

/** Quote character byte. Defaults to {@code '"'}. */
public CsvWriteOptions quote(byte b) {
this.quote = b;
return this;
}

/** Escape character byte. Defaults to none. */
public CsvWriteOptions escape(byte b) {
this.escape = b;
return this;
}

/** String to write for SQL NULL values. Defaults to the empty string. */
public CsvWriteOptions nullValue(String s) {
this.nullValue = s;
return this;
}

/** Output compression codec. Defaults to uncompressed. */
public CsvWriteOptions fileCompressionType(CsvReadOptions.FileCompressionType t) {
this.fileCompressionType = t;
return this;
}

byte[] toBytes() {
org.apache.datafusion.protobuf.CsvWriteOptionsProto.Builder b =
org.apache.datafusion.protobuf.CsvWriteOptionsProto.newBuilder();
if (singleFileOutput != null) {
b.setSingleFileOutput(singleFileOutput);
}
b.addAllPartitionCols(partitionCols);
if (hasHeader != null) {
b.setHasHeader(hasHeader);
}
if (delimiter != null) {
b.setDelimiter(delimiter & 0xFF);
}
if (quote != null) {
b.setQuote(quote & 0xFF);
}
if (escape != null) {
b.setEscape(escape & 0xFF);
}
if (nullValue != null) {
b.setNullValue(nullValue);
}
if (fileCompressionType != null) {
b.setFileCompressionType(toProto(fileCompressionType));
}
return b.build().toByteArray();
}

private static org.apache.datafusion.protobuf.FileCompressionType toProto(
CsvReadOptions.FileCompressionType t) {
switch (t) {
case UNCOMPRESSED:
return org.apache.datafusion.protobuf.FileCompressionType
.FILE_COMPRESSION_TYPE_UNCOMPRESSED;
case GZIP:
return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_GZIP;
case BZIP2:
return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_BZIP2;
case XZ:
return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_XZ;
case ZSTD:
return org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_ZSTD;
default:
throw new IllegalArgumentException("unhandled FileCompressionType: " + t);
}
}
}
34 changes: 34 additions & 0 deletions core/src/main/java/org/apache/datafusion/DataFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,38 @@ public void writeParquet(String path, ParquetWriteOptions options) {
options.singleFileOutput() != null && options.singleFileOutput());
}

/**
* Materialize this DataFrame as CSV at {@code path}. The path is treated as a directory unless
* overridden via {@link CsvWriteOptions#singleFileOutput(boolean)}. The receiver remains usable
* and must still be closed independently.
*
* @throws RuntimeException if the write fails.
*/
public void writeCsv(String path) {
writeCsv(path, new CsvWriteOptions());
}

/**
* Materialize this DataFrame as CSV at {@code path} with the supplied {@link CsvWriteOptions}.
* The receiver remains usable and must still be closed independently.
*
* @throws IllegalArgumentException if {@code path} or {@code options} is {@code null}.
* @throws RuntimeException if the write fails (path inaccessible, invalid compression spec,
* etc.).
*/
public void writeCsv(String path, CsvWriteOptions options) {
if (nativeHandle == 0) {
throw new IllegalStateException("DataFrame is closed or already collected");
}
if (path == null) {
throw new IllegalArgumentException("writeCsv path must be non-null");
}
if (options == null) {
throw new IllegalArgumentException("writeCsv options must be non-null");
}
writeCsvWithOptions(nativeHandle, path, options.toBytes());
}

@Override
public void close() {
if (nativeHandle != 0) {
Expand Down Expand Up @@ -237,4 +269,6 @@ private static native void writeParquetWithOptions(
String compression,
boolean singleFileOutputSet,
boolean singleFileOutputValue);

private static native void writeCsvWithOptions(long handle, String path, byte[] optionsBytes);
}
88 changes: 88 additions & 0 deletions core/src/test/java/org/apache/datafusion/CsvWriteOptionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.apache.datafusion.protobuf.CsvWriteOptionsProto;
import org.apache.datafusion.protobuf.FileCompressionType;
import org.junit.jupiter.api.Test;

class CsvWriteOptionsTest {

@Test
void defaultsLeaveEverythingUnset() throws Exception {
CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(new CsvWriteOptions().toBytes());

assertFalse(p.hasSingleFileOutput());
assertEquals(0, p.getPartitionColsCount());
assertFalse(p.hasHasHeader());
assertFalse(p.hasDelimiter());
assertFalse(p.hasQuote());
assertFalse(p.hasEscape());
assertFalse(p.hasNullValue());
assertFalse(p.hasFileCompressionType());
}

@Test
void fluentSettersRoundTripThroughProto() throws Exception {
CsvWriteOptions opts =
new CsvWriteOptions()
.singleFileOutput(true)
.partitionCols("region", "year")
.hasHeader(false)
.delimiter((byte) '|')
.quote((byte) '\'')
.escape((byte) '\\')
.nullValue("\\N")
.fileCompressionType(CsvReadOptions.FileCompressionType.GZIP);

CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes());

assertTrue(p.getSingleFileOutput());
assertEquals(2, p.getPartitionColsCount());
assertEquals("region", p.getPartitionCols(0));
assertEquals("year", p.getPartitionCols(1));
assertFalse(p.getHasHeader());
assertEquals((int) '|', p.getDelimiter());
assertEquals((int) '\'', p.getQuote());
assertEquals((int) '\\', p.getEscape());
assertEquals("\\N", p.getNullValue());
assertEquals(FileCompressionType.FILE_COMPRESSION_TYPE_GZIP, p.getFileCompressionType());
}

@Test
void partitionColsResetOnSubsequentCalls() throws Exception {
CsvWriteOptions opts = new CsvWriteOptions().partitionCols("a", "b", "c").partitionCols("only");

CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes());
assertEquals(1, p.getPartitionColsCount());
assertEquals("only", p.getPartitionCols(0));
}

@Test
void delimiterAcceptsHighByteWithoutSignExtension() throws Exception {
CsvWriteOptions opts = new CsvWriteOptions().delimiter((byte) 0xC2);
CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes());
assertEquals(0xC2, p.getDelimiter());
}
}
Loading