-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
bind_copy.cpp
261 lines (233 loc) · 9.87 KB
/
bind_copy.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
#include "duckdb/catalog/catalog.hpp"
#include "duckdb/catalog/catalog_entry/copy_function_catalog_entry.hpp"
#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
#include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp"
#include "duckdb/common/bind_helpers.hpp"
#include "duckdb/common/filename_pattern.hpp"
#include "duckdb/common/local_file_system.hpp"
#include "duckdb/function/table/read_csv.hpp"
#include "duckdb/main/client_context.hpp"
#include "duckdb/main/database.hpp"
#include "duckdb/parser/expression/columnref_expression.hpp"
#include "duckdb/parser/expression/star_expression.hpp"
#include "duckdb/parser/query_node/select_node.hpp"
#include "duckdb/parser/statement/copy_statement.hpp"
#include "duckdb/parser/statement/insert_statement.hpp"
#include "duckdb/parser/tableref/basetableref.hpp"
#include "duckdb/planner/binder.hpp"
#include "duckdb/planner/operator/logical_copy_to_file.hpp"
#include "duckdb/planner/operator/logical_get.hpp"
#include "duckdb/planner/operator/logical_insert.hpp"
#include <algorithm>
namespace duckdb {
static bool GetBooleanArg(ClientContext &context, const vector<Value> &arg) {
return arg.empty() || arg[0].CastAs(context, LogicalType::BOOLEAN).GetValue<bool>();
}
BoundStatement Binder::BindCopyTo(CopyStatement &stmt) {
// COPY TO a file
auto &config = DBConfig::GetConfig(context);
if (!config.options.enable_external_access) {
throw PermissionException("COPY TO is disabled by configuration");
}
BoundStatement result;
result.types = {LogicalType::BIGINT};
result.names = {"Count"};
// lookup the format in the catalog
auto ©_function =
Catalog::GetEntry<CopyFunctionCatalogEntry>(context, INVALID_CATALOG, DEFAULT_SCHEMA, stmt.info->format);
if (copy_function.function.plan) {
// plan rewrite COPY TO
return copy_function.function.plan(*this, stmt);
}
auto ©_info = *stmt.info;
// bind the select statement
auto select_node = Bind(*copy_info.select_statement);
if (!copy_function.function.copy_to_bind) {
throw NotImplementedException("COPY TO is not supported for FORMAT \"%s\"", stmt.info->format);
}
bool use_tmp_file = true;
bool overwrite_or_ignore = false;
FilenamePattern filename_pattern;
bool user_set_use_tmp_file = false;
bool per_thread_output = false;
optional_idx file_size_bytes;
vector<idx_t> partition_cols;
CopyFunctionBindInput bind_input(*stmt.info);
bind_input.file_extension = copy_function.function.extension;
auto original_options = stmt.info->options;
stmt.info->options.clear();
for (auto &option : original_options) {
auto loption = StringUtil::Lower(option.first);
if (loption == "use_tmp_file") {
use_tmp_file = GetBooleanArg(context, option.second);
user_set_use_tmp_file = true;
} else if (loption == "overwrite_or_ignore") {
overwrite_or_ignore = GetBooleanArg(context, option.second);
} else if (loption == "filename_pattern") {
if (option.second.empty()) {
throw IOException("FILENAME_PATTERN cannot be empty");
}
filename_pattern.SetFilenamePattern(
option.second[0].CastAs(context, LogicalType::VARCHAR).GetValue<string>());
} else if (loption == "file_extension") {
if (option.second.empty()) {
throw IOException("FILE_EXTENSION cannot be empty");
}
bind_input.file_extension = option.second[0].CastAs(context, LogicalType::VARCHAR).GetValue<string>();
} else if (loption == "per_thread_output") {
per_thread_output = GetBooleanArg(context, option.second);
} else if (loption == "file_size_bytes") {
if (option.second.empty()) {
throw BinderException("FILE_SIZE_BYTES cannot be empty");
}
if (!copy_function.function.file_size_bytes) {
throw NotImplementedException("FILE_SIZE_BYTES not implemented for FORMAT \"%s\"", stmt.info->format);
}
if (option.second[0].GetTypeMutable().id() == LogicalTypeId::VARCHAR) {
file_size_bytes = DBConfig::ParseMemoryLimit(option.second[0].ToString());
} else {
file_size_bytes = option.second[0].GetValue<uint64_t>();
}
} else if (loption == "partition_by") {
auto converted = ConvertVectorToValue(std::move(option.second));
partition_cols = ParseColumnsOrdered(converted, select_node.names, loption);
} else {
stmt.info->options[option.first] = option.second;
}
}
if (user_set_use_tmp_file && per_thread_output) {
throw NotImplementedException("Can't combine USE_TMP_FILE and PER_THREAD_OUTPUT for COPY");
}
if (user_set_use_tmp_file && file_size_bytes.IsValid()) {
throw NotImplementedException("Can't combine USE_TMP_FILE and FILE_SIZE_BYTES for COPY");
}
if (user_set_use_tmp_file && !partition_cols.empty()) {
throw NotImplementedException("Can't combine USE_TMP_FILE and PARTITION_BY for COPY");
}
if (per_thread_output && !partition_cols.empty()) {
throw NotImplementedException("Can't combine PER_THREAD_OUTPUT and PARTITION_BY for COPY");
}
if (file_size_bytes.IsValid() && !partition_cols.empty()) {
throw NotImplementedException("Can't combine FILE_SIZE_BYTES and PARTITION_BY for COPY");
}
bool is_remote_file = FileSystem::IsRemoteFile(stmt.info->file_path);
if (is_remote_file) {
use_tmp_file = false;
} else {
auto &fs = FileSystem::GetFileSystem(context);
bool is_file_and_exists = fs.FileExists(stmt.info->file_path);
bool is_stdout = stmt.info->file_path == "/dev/stdout";
if (!user_set_use_tmp_file) {
use_tmp_file = is_file_and_exists && !per_thread_output && partition_cols.empty() && !is_stdout;
}
}
auto unique_column_names = select_node.names;
QueryResult::DeduplicateColumns(unique_column_names);
auto file_path = stmt.info->file_path;
auto function_data =
copy_function.function.copy_to_bind(context, bind_input, unique_column_names, select_node.types);
// now create the copy information
auto copy = make_uniq<LogicalCopyToFile>(copy_function.function, std::move(function_data), std::move(stmt.info));
copy->file_path = file_path;
copy->use_tmp_file = use_tmp_file;
copy->overwrite_or_ignore = overwrite_or_ignore;
copy->filename_pattern = filename_pattern;
copy->file_extension = bind_input.file_extension;
copy->per_thread_output = per_thread_output;
if (file_size_bytes.IsValid()) {
copy->file_size_bytes = file_size_bytes;
}
copy->partition_output = !partition_cols.empty();
copy->partition_columns = std::move(partition_cols);
copy->names = unique_column_names;
copy->expected_types = select_node.types;
copy->AddChild(std::move(select_node.plan));
result.plan = std::move(copy);
return result;
}
BoundStatement Binder::BindCopyFrom(CopyStatement &stmt) {
auto &config = DBConfig::GetConfig(context);
if (!config.options.enable_external_access) {
throw PermissionException("COPY FROM is disabled by configuration");
}
BoundStatement result;
result.types = {LogicalType::BIGINT};
result.names = {"Count"};
if (stmt.info->table.empty()) {
throw ParserException("COPY FROM requires a table name to be specified");
}
// COPY FROM a file
// generate an insert statement for the the to-be-inserted table
InsertStatement insert;
insert.table = stmt.info->table;
insert.schema = stmt.info->schema;
insert.catalog = stmt.info->catalog;
insert.columns = stmt.info->select_list;
// bind the insert statement to the base table
auto insert_statement = Bind(insert);
D_ASSERT(insert_statement.plan->type == LogicalOperatorType::LOGICAL_INSERT);
auto &bound_insert = insert_statement.plan->Cast<LogicalInsert>();
// lookup the format in the catalog
auto &catalog = Catalog::GetSystemCatalog(context);
auto ©_function = catalog.GetEntry<CopyFunctionCatalogEntry>(context, DEFAULT_SCHEMA, stmt.info->format);
if (!copy_function.function.copy_from_bind) {
throw NotImplementedException("COPY FROM is not supported for FORMAT \"%s\"", stmt.info->format);
}
// lookup the table to copy into
BindSchemaOrCatalog(stmt.info->catalog, stmt.info->schema);
auto &table =
Catalog::GetEntry<TableCatalogEntry>(context, stmt.info->catalog, stmt.info->schema, stmt.info->table);
vector<string> expected_names;
if (!bound_insert.column_index_map.empty()) {
expected_names.resize(bound_insert.expected_types.size());
for (auto &col : table.GetColumns().Physical()) {
auto i = col.Physical();
if (bound_insert.column_index_map[i] != DConstants::INVALID_INDEX) {
expected_names[bound_insert.column_index_map[i]] = col.Name();
}
}
} else {
expected_names.reserve(bound_insert.expected_types.size());
for (auto &col : table.GetColumns().Physical()) {
expected_names.push_back(col.Name());
}
}
auto function_data =
copy_function.function.copy_from_bind(context, *stmt.info, expected_names, bound_insert.expected_types);
auto get = make_uniq<LogicalGet>(GenerateTableIndex(), copy_function.function.copy_from_function,
std::move(function_data), bound_insert.expected_types, expected_names);
for (idx_t i = 0; i < bound_insert.expected_types.size(); i++) {
get->column_ids.push_back(i);
}
insert_statement.plan->children.push_back(std::move(get));
result.plan = std::move(insert_statement.plan);
return result;
}
BoundStatement Binder::Bind(CopyStatement &stmt) {
if (!stmt.info->is_from && !stmt.info->select_statement) {
// copy table into file without a query
// generate SELECT * FROM table;
auto ref = make_uniq<BaseTableRef>();
ref->catalog_name = stmt.info->catalog;
ref->schema_name = stmt.info->schema;
ref->table_name = stmt.info->table;
auto statement = make_uniq<SelectNode>();
statement->from_table = std::move(ref);
if (!stmt.info->select_list.empty()) {
for (auto &name : stmt.info->select_list) {
statement->select_list.push_back(make_uniq<ColumnRefExpression>(name));
}
} else {
statement->select_list.push_back(make_uniq<StarExpression>());
}
stmt.info->select_statement = std::move(statement);
}
properties.allow_stream_result = false;
properties.return_type = StatementReturnType::CHANGED_ROWS;
if (stmt.info->is_from) {
return BindCopyFrom(stmt);
} else {
return BindCopyTo(stmt);
}
}
} // namespace duckdb