Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions core/src/main/java/org/apache/datafusion/DataFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,38 @@ public void writeCsv(String path, CsvWriteOptions options) {
writeCsvWithOptions(nativeHandle, path, options.toBytes());
}

/**
* Materialize this DataFrame as newline-delimited JSON at {@code path}. The path is treated as a
* directory unless overridden via {@link JsonWriteOptions#singleFileOutput(boolean)}. The
* receiver remains usable and must still be closed independently.
*
* @throws RuntimeException if the write fails.
*/
public void writeJson(String path) {
writeJson(path, new JsonWriteOptions());
}

/**
* Materialize this DataFrame as newline-delimited JSON at {@code path} with the supplied {@link
* JsonWriteOptions}. The receiver remains usable and must still be closed independently.
*
* @throws IllegalArgumentException if {@code path} or {@code options} is {@code null}.
* @throws RuntimeException if the write fails (path inaccessible, invalid compression spec,
* etc.).
*/
public void writeJson(String path, JsonWriteOptions options) {
if (nativeHandle == 0) {
throw new IllegalStateException("DataFrame is closed or already collected");
}
if (path == null) {
throw new IllegalArgumentException("writeJson path must be non-null");
}
if (options == null) {
throw new IllegalArgumentException("writeJson options must be non-null");
}
writeJsonWithOptions(nativeHandle, path, options.toBytes());
}

@Override
public void close() {
if (nativeHandle != 0) {
Expand Down Expand Up @@ -362,4 +394,6 @@ private static native void writeParquetWithOptions(
boolean singleFileOutputValue);

private static native void writeCsvWithOptions(long handle, String path, byte[] optionsBytes);

private static native void writeJsonWithOptions(long handle, String path, byte[] optionsBytes);
}
91 changes: 91 additions & 0 deletions core/src/main/java/org/apache/datafusion/JsonWriteOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import java.util.ArrayList;
import java.util.List;

/**
* Configuration knobs for writing JSON, passed to {@link DataFrame#writeJson(String,
* JsonWriteOptions)}.
*
* <p>Mirrors a subset of DataFusion's {@code DataFrameWriteOptions} and the writer-side {@code
* JsonOptions}. All setters return {@code this} for fluent chaining. Defaults: every field {@code
* null} or empty (meaning the DataFusion default is used).
*
* <p>Path semantics: when {@link #singleFileOutput(boolean)} is {@code true}, the path passed to
* {@code writeJson} is the literal output filename. When left unset (the default) and there are no
* partition columns, the path is treated as a directory that DataFusion populates with one or more
* part-files.
*
* <p>The output is always newline-delimited JSON (NDJSON). DataFusion's JSON writer does not emit
* the bracketed array form, so there is no toggle for it here.
*
* <p>Compression reuses {@link FileCompressionType} -- the same codec set ({@code UNCOMPRESSED},
* {@code GZIP}, {@code BZIP2}, {@code XZ}, {@code ZSTD}) the read side and the CSV writer accept.
*/
public final class JsonWriteOptions {

private Boolean singleFileOutput;
private final List<String> partitionCols = new ArrayList<>();
private FileCompressionType fileCompressionType;

/**
* When {@code true}, write to a single file at the supplied path. When left unset (the default)
* and no partition columns are configured, the path is treated as a directory and DataFusion
* writes one or more part-files.
*/
public JsonWriteOptions singleFileOutput(boolean v) {
this.singleFileOutput = v;
return this;
}

/**
* Hive-style partition columns. Each column listed here is removed from the data rows and encoded
* into the directory layout (one subdirectory per distinct value). Mutually exclusive with {@link
* #singleFileOutput(boolean)} -- DataFusion rejects the combination at write time.
*/
public JsonWriteOptions partitionCols(String... cols) {
this.partitionCols.clear();
for (String c : cols) {
this.partitionCols.add(c);
}
return this;
}

/** Output compression codec. Defaults to uncompressed. */
public JsonWriteOptions fileCompressionType(FileCompressionType t) {
this.fileCompressionType = t;
return this;
}

byte[] toBytes() {
org.apache.datafusion.protobuf.JsonWriteOptionsProto.Builder b =
org.apache.datafusion.protobuf.JsonWriteOptionsProto.newBuilder();
if (singleFileOutput != null) {
b.setSingleFileOutput(singleFileOutput);
}
b.addAllPartitionCols(partitionCols);
if (fileCompressionType != null) {
b.setFileCompressionType(FileCompressionTypes.toProto(fileCompressionType));
}
return b.build().toByteArray();
}
}
162 changes: 162 additions & 0 deletions core/src/test/java/org/apache/datafusion/DataFrameWriteJsonTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Stream;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class DataFrameWriteJsonTest {

/**
* Build a 3-row DataFrame {@code (id BIGINT, name VARCHAR)}, write it as JSON via the SUT, then
* read the result back through {@link SessionContext#registerJson} and run {@code COUNT(*)}.
* Returns the row count.
*/
private static long countRowsAt(Path dirOrFile, NdJsonReadOptions readOpts) throws Exception {
try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext()) {
ctx.registerJson("t", dirOrFile.toAbsolutePath().toString(), readOpts);
try (DataFrame df = ctx.sql("SELECT COUNT(*) FROM t");
ArrowReader reader = df.collect(allocator)) {
assertTrue(reader.loadNextBatch());
VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertEquals(1, root.getRowCount());
return ((BigIntVector) root.getVector(0)).get(0);
}
}
}

/** Materialise a 3-row table the test can hand to {@code writeJson}. */
private static String threeRowSql() {
return "SELECT * FROM (VALUES (CAST(1 AS BIGINT), 'alice'),"
+ " (CAST(2 AS BIGINT), 'bob'), (CAST(3 AS BIGINT), 'carol')) AS t(id, name)";
}

@Test
void writeJsonRoundTripsRowCount(@TempDir Path tempDir) throws Exception {
Path out = tempDir.resolve("out");

try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql(threeRowSql())) {
df.writeJson(out.toString());
}

assertEquals(3L, countRowsAt(out, new NdJsonReadOptions()));
}

@Test
void writeJsonSingleFileProducesOneFile(@TempDir Path tempDir) throws Exception {
Path out = tempDir.resolve("out.json");

try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql(threeRowSql())) {
df.writeJson(out.toString(), new JsonWriteOptions().singleFileOutput(true));
}

assertTrue(Files.isRegularFile(out), "expected single file at " + out);
assertEquals(3L, countRowsAt(out, new NdJsonReadOptions()));
}

@Test
void writeJsonWithGzipCompressionRoundTrips(@TempDir Path tempDir) throws Exception {
Path out = tempDir.resolve("gz-out");

try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql(threeRowSql())) {
df.writeJson(
out.toString(), new JsonWriteOptions().fileCompressionType(FileCompressionType.GZIP));
}

try (Stream<Path> stream = Files.walk(out)) {
List<Path> files = stream.filter(Files::isRegularFile).toList();
assertTrue(!files.isEmpty(), "expected at least one part-file under " + out);
for (Path p : files) {
assertTrue(
p.getFileName().toString().endsWith(".gz"),
"expected .gz suffix on " + p.getFileName());
}
}

NdJsonReadOptions readOpts =
new NdJsonReadOptions()
.fileCompressionType(FileCompressionType.GZIP)
.fileExtension(".json.gz");
assertEquals(3L, countRowsAt(out, readOpts));
}

@Test
void writeJsonDefaultsToDirectoryEvenWithExtensionInPath(@TempDir Path tempDir) throws Exception {
// The Javadoc promises "directory unless overridden via singleFileOutput(true)". DataFusion's
// own DataFrameWriteOptions defaults to Automatic mode, where an extension in the path
// (".json" here) silently flips the output to a single file. The native handler explicitly
// pins the default to directory mode so this contract holds regardless of path shape.
Path out = tempDir.resolve("out.json");

try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql(threeRowSql())) {
df.writeJson(out.toString());
}

assertTrue(Files.isDirectory(out), "expected directory output at " + out + ", got a file");
assertEquals(3L, countRowsAt(out, new NdJsonReadOptions()));
}

@Test
void writeJsonRetainsDataFrame(@TempDir Path tempDir) throws Exception {
Path out = tempDir.resolve("retained");

try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql(threeRowSql())) {
df.writeJson(out.toString());
assertEquals(3L, df.count());
}
}

@Test
void writeJsonRejectsNullPath() {
try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql(threeRowSql())) {
assertThrows(IllegalArgumentException.class, () -> df.writeJson(null));
}
}

@Test
void writeJsonRejectsNullOptions(@TempDir Path tempDir) {
Path out = tempDir.resolve("out");
try (SessionContext ctx = new SessionContext();
DataFrame df = ctx.sql(threeRowSql())) {
assertThrows(IllegalArgumentException.class, () -> df.writeJson(out.toString(), null));
}
}
}
68 changes: 68 additions & 0 deletions core/src/test/java/org/apache/datafusion/JsonWriteOptionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.apache.datafusion.protobuf.JsonWriteOptionsProto;
import org.junit.jupiter.api.Test;

class JsonWriteOptionsTest {

@Test
void defaultsLeaveEverythingUnset() throws Exception {
JsonWriteOptionsProto p = JsonWriteOptionsProto.parseFrom(new JsonWriteOptions().toBytes());

assertFalse(p.hasSingleFileOutput());
assertEquals(0, p.getPartitionColsCount());
assertFalse(p.hasFileCompressionType());
}

@Test
void fluentSettersRoundTripThroughProto() throws Exception {
JsonWriteOptions opts =
new JsonWriteOptions()
.singleFileOutput(true)
.partitionCols("region", "year")
.fileCompressionType(FileCompressionType.GZIP);

JsonWriteOptionsProto p = JsonWriteOptionsProto.parseFrom(opts.toBytes());

assertTrue(p.getSingleFileOutput());
assertEquals(2, p.getPartitionColsCount());
assertEquals("region", p.getPartitionCols(0));
assertEquals("year", p.getPartitionCols(1));
assertEquals(
org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_GZIP,
p.getFileCompressionType());
}

@Test
void partitionColsResetOnSubsequentCalls() throws Exception {
JsonWriteOptions opts =
new JsonWriteOptions().partitionCols("a", "b", "c").partitionCols("only");

JsonWriteOptionsProto p = JsonWriteOptionsProto.parseFrom(opts.toBytes());
assertEquals(1, p.getPartitionColsCount());
assertEquals("only", p.getPartitionCols(0));
}
}
1 change: 1 addition & 0 deletions native/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fn main() {
"../proto/csv_read_options.proto",
"../proto/csv_write_options.proto",
"../proto/json_read_options.proto",
"../proto/json_write_options.proto",
"../proto/parquet_read_options.proto",
];
for p in PROTOS {
Expand Down
Loading
Loading