Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make COPY TO align with CREATE EXTERNAL TABLE #9604

Merged
merged 8 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ mod tests {
)
})?;
for location in locations {
let sql = format!("copy (values (1,2)) to '{}';", location);
let sql = format!("copy (values (1,2)) to '{}' STORED AS PARQUET;", location);
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
//Should not fail
Expand All @@ -438,8 +438,8 @@ mod tests {
let location = "s3://bucket/path/file.parquet";

// Missing region, use object_store defaults
let sql = format!("COPY (values (1,2)) TO '{location}'
(format parquet, 'aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}')");
let sql = format!("COPY (values (1,2)) TO '{location}' STORED AS PARQUET
OPTIONS ('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}')");
copy_to_table_test(location, &sql).await?;

Ok(())
Expand Down
81 changes: 23 additions & 58 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1131,10 +1131,19 @@ impl ConfigField for TableOptions {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
// Extensions are handled in the public `ConfigOptions::set`
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
let Some(format) = &self.current_format else {
return _config_err!("Specify a format for TableOptions");
};
match key {
"csv" => self.csv.set(rem, value),
"parquet" => self.parquet.set(rem, value),
"json" => self.json.set(rem, value),
"format" => match format {
#[cfg(feature = "parquet")]
FileType::PARQUET => self.parquet.set(rem, value),
FileType::CSV => self.csv.set(rem, value),
FileType::JSON => self.json.set(rem, value),
_ => {
_config_err!("Config value \"{key}\" is not supported on {}", format)
}
},
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
}
}
Expand Down Expand Up @@ -1170,28 +1179,7 @@ impl TableOptions {
))
})?;

if prefix == "csv" || prefix == "json" || prefix == "parquet" {
if let Some(format) = &self.current_format {
match format {
FileType::CSV if prefix != "csv" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for CSV format"
)))
}
#[cfg(feature = "parquet")]
FileType::PARQUET if prefix != "parquet" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for PARQUET format"
)))
}
FileType::JSON if prefix != "json" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for JSON format"
)))
}
_ => {}
}
}
if prefix == "format" {
return ConfigField::set(self, key, value);
}

Expand Down Expand Up @@ -1251,9 +1239,7 @@ impl TableOptions {
}

let mut v = Visitor(vec![]);
self.visit(&mut v, "csv", "");
self.visit(&mut v, "json", "");
self.visit(&mut v, "parquet", "");
self.visit(&mut v, "format", "");

v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
v.0
Expand Down Expand Up @@ -1558,6 +1544,7 @@ mod tests {
use crate::config::{
ConfigEntry, ConfigExtension, ExtensionOptions, Extensions, TableOptions,
};
use crate::FileType;

#[derive(Default, Debug, Clone)]
pub struct TestExtensionConfig {
Expand Down Expand Up @@ -1611,12 +1598,13 @@ mod tests {
}

#[test]
fn alter_kafka_config() {
fn alter_test_extension_config() {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let mut table_config = TableOptions::new().with_extensions(extension);
table_config.set("parquet.write_batch_size", "10").unwrap();
assert_eq!(table_config.parquet.global.write_batch_size, 10);
table_config.set_file_format(FileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter, b';');
table_config.set("test.bootstrap.servers", "asd").unwrap();
let kafka_config = table_config
.extensions
Expand All @@ -1628,38 +1616,15 @@ mod tests {
);
}

#[test]
fn parquet_table_options() {
let mut table_config = TableOptions::new();
table_config
.set("parquet.bloom_filter_enabled::col1", "true")
.unwrap();
assert_eq!(
table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
Some(true)
);
}

metesynnada marked this conversation as resolved.
Show resolved Hide resolved
#[test]
fn csv_u8_table_options() {
let mut table_config = TableOptions::new();
table_config.set("csv.delimiter", ";").unwrap();
table_config.set_file_format(FileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter as char, ';');
table_config.set("csv.escape", "\"").unwrap();
table_config.set("format.escape", "\"").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '"');
table_config.set("csv.escape", "\'").unwrap();
table_config.set("format.escape", "\'").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
}

#[test]
fn parquet_table_options_config_entry() {
let mut table_config = TableOptions::new();
table_config
.set("parquet.bloom_filter_enabled::col1", "true")
.unwrap();
let entries = table_config.entries();
assert!(entries
.iter()
.any(|item| item.key == "parquet.bloom_filter_enabled::col1"))
}
metesynnada marked this conversation as resolved.
Show resolved Hide resolved
}
85 changes: 43 additions & 42 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod tests {
config::TableOptions,
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
parsers::CompressionTypeVariant,
Result,
FileType, Result,
};

use parquet::{
Expand All @@ -47,35 +47,36 @@ mod tests {
#[test]
fn test_writeroptions_parquet_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("parquet.max_row_group_size".to_owned(), "123".to_owned());
option_map.insert("parquet.data_pagesize_limit".to_owned(), "123".to_owned());
option_map.insert("parquet.write_batch_size".to_owned(), "123".to_owned());
option_map.insert("parquet.writer_version".to_owned(), "2.0".to_owned());
option_map.insert("format.max_row_group_size".to_owned(), "123".to_owned());
option_map.insert("format.data_pagesize_limit".to_owned(), "123".to_owned());
option_map.insert("format.write_batch_size".to_owned(), "123".to_owned());
option_map.insert("format.writer_version".to_owned(), "2.0".to_owned());
option_map.insert(
"parquet.dictionary_page_size_limit".to_owned(),
"format.dictionary_page_size_limit".to_owned(),
"123".to_owned(),
);
option_map.insert(
"parquet.created_by".to_owned(),
"format.created_by".to_owned(),
"df write unit test".to_owned(),
);
option_map.insert(
"parquet.column_index_truncate_length".to_owned(),
"format.column_index_truncate_length".to_owned(),
"123".to_owned(),
);
option_map.insert(
"parquet.data_page_row_count_limit".to_owned(),
"format.data_page_row_count_limit".to_owned(),
"123".to_owned(),
);
option_map.insert("parquet.bloom_filter_enabled".to_owned(), "true".to_owned());
option_map.insert("parquet.encoding".to_owned(), "plain".to_owned());
option_map.insert("parquet.dictionary_enabled".to_owned(), "true".to_owned());
option_map.insert("parquet.compression".to_owned(), "zstd(4)".to_owned());
option_map.insert("parquet.statistics_enabled".to_owned(), "page".to_owned());
option_map.insert("parquet.bloom_filter_fpp".to_owned(), "0.123".to_owned());
option_map.insert("parquet.bloom_filter_ndv".to_owned(), "123".to_owned());
option_map.insert("format.bloom_filter_enabled".to_owned(), "true".to_owned());
option_map.insert("format.encoding".to_owned(), "plain".to_owned());
option_map.insert("format.dictionary_enabled".to_owned(), "true".to_owned());
option_map.insert("format.compression".to_owned(), "zstd(4)".to_owned());
option_map.insert("format.statistics_enabled".to_owned(), "page".to_owned());
option_map.insert("format.bloom_filter_fpp".to_owned(), "0.123".to_owned());
option_map.insert("format.bloom_filter_ndv".to_owned(), "123".to_owned());

let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
Expand Down Expand Up @@ -131,54 +132,52 @@ mod tests {
let mut option_map: HashMap<String, String> = HashMap::new();

option_map.insert(
"parquet.bloom_filter_enabled::col1".to_owned(),
"format.bloom_filter_enabled::col1".to_owned(),
"true".to_owned(),
);
option_map.insert(
"parquet.bloom_filter_enabled::col2.nested".to_owned(),
"format.bloom_filter_enabled::col2.nested".to_owned(),
"true".to_owned(),
);
option_map.insert("parquet.encoding::col1".to_owned(), "plain".to_owned());
option_map.insert("parquet.encoding::col2.nested".to_owned(), "rle".to_owned());
option_map.insert("format.encoding::col1".to_owned(), "plain".to_owned());
option_map.insert("format.encoding::col2.nested".to_owned(), "rle".to_owned());
option_map.insert(
"parquet.dictionary_enabled::col1".to_owned(),
"format.dictionary_enabled::col1".to_owned(),
"true".to_owned(),
);
option_map.insert(
"parquet.dictionary_enabled::col2.nested".to_owned(),
"format.dictionary_enabled::col2.nested".to_owned(),
"true".to_owned(),
);
option_map.insert("parquet.compression::col1".to_owned(), "zstd(4)".to_owned());
option_map.insert("format.compression::col1".to_owned(), "zstd(4)".to_owned());
option_map.insert(
"parquet.compression::col2.nested".to_owned(),
"format.compression::col2.nested".to_owned(),
"zstd(10)".to_owned(),
);
option_map.insert(
"parquet.statistics_enabled::col1".to_owned(),
"format.statistics_enabled::col1".to_owned(),
"page".to_owned(),
);
option_map.insert(
"parquet.statistics_enabled::col2.nested".to_owned(),
"format.statistics_enabled::col2.nested".to_owned(),
"none".to_owned(),
);
option_map.insert(
"parquet.bloom_filter_fpp::col1".to_owned(),
"format.bloom_filter_fpp::col1".to_owned(),
"0.123".to_owned(),
);
option_map.insert(
"parquet.bloom_filter_fpp::col2.nested".to_owned(),
"format.bloom_filter_fpp::col2.nested".to_owned(),
"0.456".to_owned(),
);
option_map.insert("format.bloom_filter_ndv::col1".to_owned(), "123".to_owned());
option_map.insert(
"parquet.bloom_filter_ndv::col1".to_owned(),
"123".to_owned(),
);
option_map.insert(
"parquet.bloom_filter_ndv::col2.nested".to_owned(),
"format.bloom_filter_ndv::col2.nested".to_owned(),
"456".to_owned(),
);

let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
Expand Down Expand Up @@ -271,16 +270,17 @@ mod tests {
// for StatementOptions
fn test_writeroptions_csv_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("csv.has_header".to_owned(), "true".to_owned());
option_map.insert("csv.date_format".to_owned(), "123".to_owned());
option_map.insert("csv.datetime_format".to_owned(), "123".to_owned());
option_map.insert("csv.timestamp_format".to_owned(), "2.0".to_owned());
option_map.insert("csv.time_format".to_owned(), "123".to_owned());
option_map.insert("csv.null_value".to_owned(), "123".to_owned());
option_map.insert("csv.compression".to_owned(), "gzip".to_owned());
option_map.insert("csv.delimiter".to_owned(), ";".to_owned());
option_map.insert("format.has_header".to_owned(), "true".to_owned());
option_map.insert("format.date_format".to_owned(), "123".to_owned());
option_map.insert("format.datetime_format".to_owned(), "123".to_owned());
option_map.insert("format.timestamp_format".to_owned(), "2.0".to_owned());
option_map.insert("format.time_format".to_owned(), "123".to_owned());
option_map.insert("format.null_value".to_owned(), "123".to_owned());
option_map.insert("format.compression".to_owned(), "gzip".to_owned());
option_map.insert("format.delimiter".to_owned(), ";".to_owned());

let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::CSV);
table_config.alter_with_string_hash_map(&option_map)?;

let csv_options = CsvWriterOptions::try_from(&table_config.csv)?;
Expand All @@ -299,9 +299,10 @@ mod tests {
// for StatementOptions
fn test_writeroptions_json_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("json.compression".to_owned(), "gzip".to_owned());
option_map.insert("format.compression".to_owned(), "gzip".to_owned());

let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::JSON);
table_config.alter_with_string_hash_map(&option_map)?;

let json_options = JsonWriterOptions::try_from(&table_config.json)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ mod tests {
let name = OwnedTableReference::bare("foo".to_string());

let mut options = HashMap::new();
options.insert("csv.schema_infer_max_rec".to_owned(), "1000".to_owned());
options.insert("format.schema_infer_max_rec".to_owned(), "1000".to_owned());
let cmd = CreateExternalTable {
name,
location: csv_file.path().to_str().unwrap().to_string(),
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1785,11 +1785,7 @@ impl SessionState {
.0
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
}
DFStatement::CopyTo(CopyToStatement {
source,
target: _,
options: _,
}) => match source {
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
CopyToSource::Relation(table_name) => {
visitor.insert(table_name);
}
Expand Down
12 changes: 8 additions & 4 deletions datafusion/core/tests/sql/sql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use datafusion::prelude::*;

use tempfile::TempDir;

#[tokio::test]
Expand All @@ -27,7 +28,7 @@ async fn unsupported_ddl_returns_error() {
// disallow ddl
let options = SQLOptions::new().with_allow_ddl(false);

let sql = "create view test_view as select * from test";
let sql = "CREATE VIEW test_view AS SELECT * FROM test";
let df = ctx.sql_with_options(sql, options).await;
assert_eq!(
df.unwrap_err().strip_backtrace(),
Expand All @@ -46,7 +47,7 @@ async fn unsupported_dml_returns_error() {

let options = SQLOptions::new().with_allow_dml(false);

let sql = "insert into test values (1)";
let sql = "INSERT INTO test VALUES (1)";
let df = ctx.sql_with_options(sql, options).await;
assert_eq!(
df.unwrap_err().strip_backtrace(),
Expand All @@ -67,7 +68,10 @@ async fn unsupported_copy_returns_error() {

let options = SQLOptions::new().with_allow_dml(false);

let sql = format!("copy (values(1)) to '{}'", tmpfile.to_string_lossy());
let sql = format!(
"COPY (values(1)) TO '{}' STORED AS parquet",
tmpfile.to_string_lossy()
);
let df = ctx.sql_with_options(&sql, options).await;
assert_eq!(
df.unwrap_err().strip_backtrace(),
Expand Down Expand Up @@ -106,7 +110,7 @@ async fn ddl_can_not_be_planned_by_session_state() {
let state = ctx.state();

// can not create a logical plan for catalog DDL
let sql = "drop table test";
let sql = "DROP TABLE test";
let plan = state.create_logical_plan(sql).await.unwrap();
let physical_plan = state.create_physical_plan(&plan).await;
assert_eq!(
Expand Down
Loading