diff --git a/core/src/main/java/org/apache/datafusion/DataFrame.java b/core/src/main/java/org/apache/datafusion/DataFrame.java index c19ce39..86dd523 100644 --- a/core/src/main/java/org/apache/datafusion/DataFrame.java +++ b/core/src/main/java/org/apache/datafusion/DataFrame.java @@ -318,6 +318,38 @@ public void writeCsv(String path, CsvWriteOptions options) { writeCsvWithOptions(nativeHandle, path, options.toBytes()); } + /** + * Materialize this DataFrame as newline-delimited JSON at {@code path}. The path is treated as a + * directory unless overridden via {@link JsonWriteOptions#singleFileOutput(boolean)}. The + * receiver remains usable and must still be closed independently. + * + * @throws RuntimeException if the write fails. + */ + public void writeJson(String path) { + writeJson(path, new JsonWriteOptions()); + } + + /** + * Materialize this DataFrame as newline-delimited JSON at {@code path} with the supplied {@link + * JsonWriteOptions}. The receiver remains usable and must still be closed independently. + * + * @throws IllegalArgumentException if {@code path} or {@code options} is {@code null}. + * @throws RuntimeException if the write fails (path inaccessible, invalid compression spec, + * etc.). + */ + public void writeJson(String path, JsonWriteOptions options) { + if (nativeHandle == 0) { + throw new IllegalStateException("DataFrame is closed or already collected"); + } + if (path == null) { + throw new IllegalArgumentException("writeJson path must be non-null"); + } + if (options == null) { + throw new IllegalArgumentException("writeJson options must be non-null"); + } + writeJsonWithOptions(nativeHandle, path, options.toBytes()); + } + @Override public void close() { if (nativeHandle != 0) { @@ -362,4 +394,6 @@ private static native void writeParquetWithOptions( boolean singleFileOutputValue); private static native void writeCsvWithOptions(long handle, String path, byte[] optionsBytes); + + private static native void writeJsonWithOptions(long handle, String path, byte[] optionsBytes); } diff --git a/core/src/main/java/org/apache/datafusion/JsonWriteOptions.java b/core/src/main/java/org/apache/datafusion/JsonWriteOptions.java new file mode 100644 index 0000000..ae3c520 --- /dev/null +++ b/core/src/main/java/org/apache/datafusion/JsonWriteOptions.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import java.util.ArrayList; +import java.util.List; + +/** + * Configuration knobs for writing JSON, passed to {@link DataFrame#writeJson(String, + * JsonWriteOptions)}. + * + *

Mirrors a subset of DataFusion's {@code DataFrameWriteOptions} and the writer-side {@code + * JsonOptions}. All setters return {@code this} for fluent chaining. Defaults: every field {@code + * null} or empty (meaning the DataFusion default is used). + * + *

Path semantics: when {@link #singleFileOutput(boolean)} is {@code true}, the path passed to + * {@code writeJson} is the literal output filename. When left unset (the default) and there are no + * partition columns, the path is treated as a directory that DataFusion populates with one or more + * part-files. + * + *

The output is always newline-delimited JSON (NDJSON). DataFusion's JSON writer does not emit + * the bracketed array form, so there is no toggle for it here. + * + *

Compression reuses {@link FileCompressionType} -- the same codec set ({@code UNCOMPRESSED}, + * {@code GZIP}, {@code BZIP2}, {@code XZ}, {@code ZSTD}) the read side and the CSV writer accept. + */ +public final class JsonWriteOptions { + + private Boolean singleFileOutput; + private final List partitionCols = new ArrayList<>(); + private FileCompressionType fileCompressionType; + + /** + * When {@code true}, write to a single file at the supplied path. When left unset (the default) + * and no partition columns are configured, the path is treated as a directory and DataFusion + * writes one or more part-files. + */ + public JsonWriteOptions singleFileOutput(boolean v) { + this.singleFileOutput = v; + return this; + } + + /** + * Hive-style partition columns. Each column listed here is removed from the data rows and encoded + * into the directory layout (one subdirectory per distinct value). Mutually exclusive with {@link + * #singleFileOutput(boolean)} -- DataFusion rejects the combination at write time. + */ + public JsonWriteOptions partitionCols(String... cols) { + this.partitionCols.clear(); + for (String c : cols) { + this.partitionCols.add(c); + } + return this; + } + + /** Output compression codec. Defaults to uncompressed. */ + public JsonWriteOptions fileCompressionType(FileCompressionType t) { + this.fileCompressionType = t; + return this; + } + + byte[] toBytes() { + org.apache.datafusion.protobuf.JsonWriteOptionsProto.Builder b = + org.apache.datafusion.protobuf.JsonWriteOptionsProto.newBuilder(); + if (singleFileOutput != null) { + b.setSingleFileOutput(singleFileOutput); + } + b.addAllPartitionCols(partitionCols); + if (fileCompressionType != null) { + b.setFileCompressionType(FileCompressionTypes.toProto(fileCompressionType)); + } + return b.build().toByteArray(); + } +} diff --git a/core/src/test/java/org/apache/datafusion/DataFrameWriteJsonTest.java b/core/src/test/java/org/apache/datafusion/DataFrameWriteJsonTest.java new file mode 100644 index 0000000..34e82bb --- /dev/null +++ b/core/src/test/java/org/apache/datafusion/DataFrameWriteJsonTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class DataFrameWriteJsonTest { + + /** + * Build a 3-row DataFrame {@code (id BIGINT, name VARCHAR)}, write it as JSON via the SUT, then + * read the result back through {@link SessionContext#registerJson} and run {@code COUNT(*)}. + * Returns the row count. + */ + private static long countRowsAt(Path dirOrFile, NdJsonReadOptions readOpts) throws Exception { + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext()) { + ctx.registerJson("t", dirOrFile.toAbsolutePath().toString(), readOpts); + try (DataFrame df = ctx.sql("SELECT COUNT(*) FROM t"); + ArrowReader reader = df.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + assertEquals(1, root.getRowCount()); + return ((BigIntVector) root.getVector(0)).get(0); + } + } + } + + /** Materialise a 3-row table the test can hand to {@code writeJson}. */ + private static String threeRowSql() { + return "SELECT * FROM (VALUES (CAST(1 AS BIGINT), 'alice')," + + " (CAST(2 AS BIGINT), 'bob'), (CAST(3 AS BIGINT), 'carol')) AS t(id, name)"; + } + + @Test + void writeJsonRoundTripsRowCount(@TempDir Path tempDir) throws Exception { + Path out = tempDir.resolve("out"); + + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.sql(threeRowSql())) { + df.writeJson(out.toString()); + } + + assertEquals(3L, countRowsAt(out, new NdJsonReadOptions())); + } + + @Test + void writeJsonSingleFileProducesOneFile(@TempDir Path tempDir) throws Exception { + Path out = tempDir.resolve("out.json"); + + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.sql(threeRowSql())) { + df.writeJson(out.toString(), new JsonWriteOptions().singleFileOutput(true)); + } + + assertTrue(Files.isRegularFile(out), "expected single file at " + out); + assertEquals(3L, countRowsAt(out, new NdJsonReadOptions())); + } + + @Test + void writeJsonWithGzipCompressionRoundTrips(@TempDir Path tempDir) throws Exception { + Path out = tempDir.resolve("gz-out"); + + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.sql(threeRowSql())) { + df.writeJson( + out.toString(), new JsonWriteOptions().fileCompressionType(FileCompressionType.GZIP)); + } + + try (Stream stream = Files.walk(out)) { + List files = stream.filter(Files::isRegularFile).toList(); + assertTrue(!files.isEmpty(), "expected at least one part-file under " + out); + for (Path p : files) { + assertTrue( + p.getFileName().toString().endsWith(".gz"), + "expected .gz suffix on " + p.getFileName()); + } + } + + NdJsonReadOptions readOpts = + new NdJsonReadOptions() + .fileCompressionType(FileCompressionType.GZIP) + .fileExtension(".json.gz"); + assertEquals(3L, countRowsAt(out, readOpts)); + } + + @Test + void writeJsonDefaultsToDirectoryEvenWithExtensionInPath(@TempDir Path tempDir) throws Exception { + // The Javadoc promises "directory unless overridden via singleFileOutput(true)". DataFusion's + // own DataFrameWriteOptions defaults to Automatic mode, where an extension in the path + // (".json" here) silently flips the output to a single file. The native handler explicitly + // pins the default to directory mode so this contract holds regardless of path shape. + Path out = tempDir.resolve("out.json"); + + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.sql(threeRowSql())) { + df.writeJson(out.toString()); + } + + assertTrue(Files.isDirectory(out), "expected directory output at " + out + ", got a file"); + assertEquals(3L, countRowsAt(out, new NdJsonReadOptions())); + } + + @Test + void writeJsonRetainsDataFrame(@TempDir Path tempDir) throws Exception { + Path out = tempDir.resolve("retained"); + + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.sql(threeRowSql())) { + df.writeJson(out.toString()); + assertEquals(3L, df.count()); + } + } + + @Test + void writeJsonRejectsNullPath() { + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.sql(threeRowSql())) { + assertThrows(IllegalArgumentException.class, () -> df.writeJson(null)); + } + } + + @Test + void writeJsonRejectsNullOptions(@TempDir Path tempDir) { + Path out = tempDir.resolve("out"); + try (SessionContext ctx = new SessionContext(); + DataFrame df = ctx.sql(threeRowSql())) { + assertThrows(IllegalArgumentException.class, () -> df.writeJson(out.toString(), null)); + } + } +} diff --git a/core/src/test/java/org/apache/datafusion/JsonWriteOptionsTest.java b/core/src/test/java/org/apache/datafusion/JsonWriteOptionsTest.java new file mode 100644 index 0000000..a3bb021 --- /dev/null +++ b/core/src/test/java/org/apache/datafusion/JsonWriteOptionsTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.datafusion.protobuf.JsonWriteOptionsProto; +import org.junit.jupiter.api.Test; + +class JsonWriteOptionsTest { + + @Test + void defaultsLeaveEverythingUnset() throws Exception { + JsonWriteOptionsProto p = JsonWriteOptionsProto.parseFrom(new JsonWriteOptions().toBytes()); + + assertFalse(p.hasSingleFileOutput()); + assertEquals(0, p.getPartitionColsCount()); + assertFalse(p.hasFileCompressionType()); + } + + @Test + void fluentSettersRoundTripThroughProto() throws Exception { + JsonWriteOptions opts = + new JsonWriteOptions() + .singleFileOutput(true) + .partitionCols("region", "year") + .fileCompressionType(FileCompressionType.GZIP); + + JsonWriteOptionsProto p = JsonWriteOptionsProto.parseFrom(opts.toBytes()); + + assertTrue(p.getSingleFileOutput()); + assertEquals(2, p.getPartitionColsCount()); + assertEquals("region", p.getPartitionCols(0)); + assertEquals("year", p.getPartitionCols(1)); + assertEquals( + org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_GZIP, + p.getFileCompressionType()); + } + + @Test + void partitionColsResetOnSubsequentCalls() throws Exception { + JsonWriteOptions opts = + new JsonWriteOptions().partitionCols("a", "b", "c").partitionCols("only"); + + JsonWriteOptionsProto p = JsonWriteOptionsProto.parseFrom(opts.toBytes()); + assertEquals(1, p.getPartitionColsCount()); + assertEquals("only", p.getPartitionCols(0)); + } +} diff --git a/native/build.rs b/native/build.rs index 52b1127..0c5361a 100644 --- a/native/build.rs +++ b/native/build.rs @@ -23,6 +23,7 @@ fn main() { "../proto/csv_read_options.proto", "../proto/csv_write_options.proto", "../proto/json_read_options.proto", + "../proto/json_write_options.proto", "../proto/parquet_read_options.proto", ]; for p in PROTOS { diff --git a/native/src/json.rs b/native/src/json.rs index 43f0055..8eea32f 100644 --- a/native/src/json.rs +++ b/native/src/json.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +use datafusion::common::config::JsonOptions; +use datafusion::common::parsers::CompressionTypeVariant; +use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::error::DataFusionError; use datafusion::prelude::JsonReadOptions; @@ -25,7 +28,9 @@ use jni::JNIEnv; use prost::Message; use crate::errors::{try_unwrap_or_throw, JniResult}; -use crate::proto_gen::{FileCompressionType as ProtoFileCompressionType, NdJsonReadOptionsProto}; +use crate::proto_gen::{ + FileCompressionType as ProtoFileCompressionType, JsonWriteOptionsProto, NdJsonReadOptionsProto, +}; use crate::runtime; use crate::schema::decode_optional_schema; @@ -114,3 +119,67 @@ pub extern "system" fn Java_org_apache_datafusion_SessionContext_readJsonWithOpt }) }) } + +fn proto_compression_to_variant(p: ProtoFileCompressionType) -> JniResult { + match p { + ProtoFileCompressionType::Unspecified => { + Err("JsonWriteOptionsProto.file_compression_type is UNSPECIFIED".into()) + } + ProtoFileCompressionType::Uncompressed => Ok(CompressionTypeVariant::UNCOMPRESSED), + ProtoFileCompressionType::Gzip => Ok(CompressionTypeVariant::GZIP), + ProtoFileCompressionType::Bzip2 => Ok(CompressionTypeVariant::BZIP2), + ProtoFileCompressionType::Xz => Ok(CompressionTypeVariant::XZ), + ProtoFileCompressionType::Zstd => Ok(CompressionTypeVariant::ZSTD), + } +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_datafusion_DataFrame_writeJsonWithOptions<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, + path: JString<'local>, + options_bytes: JByteArray<'local>, +) { + try_unwrap_or_throw(&mut env, (), |env| -> JniResult<()> { + if handle == 0 { + return Err("DataFrame handle is null".into()); + } + let df = unsafe { &*(handle as *const DataFrame) }.clone(); + let path: String = env.get_string(&path)?.into(); + let bytes: Vec = env.convert_byte_array(&options_bytes)?; + let p = JsonWriteOptionsProto::decode(bytes.as_slice())?; + + // Decode the file_compression_type field eagerly so an unknown wire + // value surfaces as a clear error rather than a silent default. + let compression = if p.file_compression_type.is_some() { + Some(proto_compression_to_variant(p.file_compression_type())?) + } else { + None + }; + + // When the caller left `singleFileOutput` unset, force directory output (`false`) + // rather than leaving DataFusion in `Automatic` mode. Automatic mode treats paths + // with an extension (e.g. `out.json`) as single-file targets, which would silently + // contradict the documented "directory unless overridden" default and surprise any + // caller that hands writeJson a `.json` path. + let mut write_opts = DataFrameWriteOptions::new() + .with_single_file_output(p.single_file_output.unwrap_or(false)); + if !p.partition_cols.is_empty() { + write_opts = write_opts.with_partition_by(p.partition_cols.clone()); + } + + // Build JsonOptions only when a writer-side knob is set, so the + // DataFusion default is preserved when the caller passes + // `new JsonWriteOptions()`. JsonOptions has no fluent setters -- + // the fields are public, so use struct-update syntax (same + // idiom we use for ArrowReadOptions / AvroReadOptions). + let writer_opts: Option = compression.map(|c| JsonOptions { + compression: c, + ..JsonOptions::default() + }); + + runtime().block_on(df.write_json(&path, write_opts, writer_opts))?; + Ok(()) + }) +} diff --git a/proto/json_write_options.proto b/proto/json_write_options.proto new file mode 100644 index 0000000..ff31b4c --- /dev/null +++ b/proto/json_write_options.proto @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto3"; + +package datafusion_java; + +import "file_compression_type.proto"; + +option java_package = "org.apache.datafusion.protobuf"; +option java_multiple_files = true; + +// Options used to write JSON files. Fields with non-null Java defaults are +// always sent; fields marked `optional` preserve unset-ness so the Rust side +// can leave a DataFusion default in place. `FileCompressionType` is shared +// with the read side and the CSV writer since the codec set is identical. +// `partition_cols` is sent as a repeated string so an empty list (the +// default) round-trips unambiguously. +// +// The output is always newline-delimited JSON (NDJSON) -- DataFusion's JSON +// writer only emits NDJSON, so the read-side `newline_delimited` toggle has +// no equivalent here. +message JsonWriteOptionsProto { + optional bool single_file_output = 1; + repeated string partition_cols = 2; + optional FileCompressionType file_compression_type = 3; +}