Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/duckdb/extension/json/include/json_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ class JSONReader : public BaseFileReader {

void Initialize(Allocator &allocator, idx_t buffer_size);
bool InitializeScan(JSONReaderScanState &state, JSONFileReadType file_read_type);
void ParseJSON(JSONReaderScanState &scan_state, char *const json_start, const idx_t json_size,
bool ParseJSON(JSONReaderScanState &scan_state, char *const json_start, const idx_t json_size,
const idx_t remaining);
void ParseNextChunk(JSONReaderScanState &scan_state);
bool ParseNextChunk(JSONReaderScanState &scan_state);
idx_t Scan(JSONReaderScanState &scan_state);
bool ReadNextBuffer(JSONReaderScanState &scan_state);
bool PrepareBufferForRead(JSONReaderScanState &scan_state);
Expand Down
25 changes: 16 additions & 9 deletions src/duckdb/extension/json/json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ static pair<JSONFormat, JSONRecordType> DetectFormatAndRecordType(char *const bu
return make_pair(JSONFormat::ARRAY, JSONRecordType::VALUES);
}

void JSONReader::ParseJSON(JSONReaderScanState &scan_state, char *const json_start, const idx_t json_size,
bool JSONReader::ParseJSON(JSONReaderScanState &scan_state, char *const json_start, const idx_t json_size,
const idx_t remaining) {
yyjson_doc *doc;
yyjson_read_err err;
Expand All @@ -640,7 +640,7 @@ void JSONReader::ParseJSON(JSONReaderScanState &scan_state, char *const json_sta
}
if (!can_ignore_this_error) {
AddParseError(scan_state, scan_state.lines_or_objects_in_buffer, err, extra);
return;
return false;
}
}

Expand All @@ -652,7 +652,7 @@ void JSONReader::ParseJSON(JSONReaderScanState &scan_state, char *const json_sta
err.msg = "unexpected end of data";
err.pos = json_size;
AddParseError(scan_state, scan_state.lines_or_objects_in_buffer, err, "Try auto-detecting the JSON format");
return;
return false;
} else if (!options.ignore_errors && read_size < json_size) {
idx_t off = read_size;
idx_t rem = json_size;
Expand All @@ -662,20 +662,21 @@ void JSONReader::ParseJSON(JSONReaderScanState &scan_state, char *const json_sta
err.msg = "unexpected content after document";
err.pos = read_size;
AddParseError(scan_state, scan_state.lines_or_objects_in_buffer, err, "Try auto-detecting the JSON format");
return;
return false;
}
}

scan_state.lines_or_objects_in_buffer++;
if (!doc) {
scan_state.values[scan_state.scan_count] = nullptr;
return;
return true;
}

// Set the JSONLine and trim
scan_state.units[scan_state.scan_count] = JSONString(json_start, json_size);
TrimWhitespace(scan_state.units[scan_state.scan_count]);
scan_state.values[scan_state.scan_count] = doc->root;
return true;
}

void JSONReader::AutoDetect(Allocator &allocator, idx_t buffer_capacity) {
Expand Down Expand Up @@ -762,7 +763,7 @@ bool JSONReader::CopyRemainderFromPreviousBuffer(JSONReaderScanState &scan_state
return true;
}

void JSONReader::ParseNextChunk(JSONReaderScanState &scan_state) {
bool JSONReader::ParseNextChunk(JSONReaderScanState &scan_state) {
const auto format = GetFormat();
auto &buffer_ptr = scan_state.buffer_ptr;
auto &buffer_offset = scan_state.buffer_offset;
Expand Down Expand Up @@ -796,7 +797,9 @@ void JSONReader::ParseNextChunk(JSONReaderScanState &scan_state) {
}

idx_t json_size = json_end - json_start;
ParseJSON(scan_state, json_start, json_size, remaining);
if (!ParseJSON(scan_state, json_start, json_size, remaining)) {
return false;
}
buffer_offset += json_size;

if (format == JSONFormat::ARRAY) {
Expand All @@ -809,11 +812,12 @@ void JSONReader::ParseNextChunk(JSONReaderScanState &scan_state) {
err.msg = "unexpected character";
err.pos = json_size;
AddParseError(scan_state, scan_state.lines_or_objects_in_buffer, err);
return;
return false;
}
}
SkipWhitespace(buffer_ptr, buffer_offset, buffer_size);
}
return true;
}

void JSONReader::Initialize(Allocator &allocator, idx_t buffer_size) {
Expand Down Expand Up @@ -868,7 +872,10 @@ idx_t JSONReader::Scan(JSONReaderScanState &scan_state) {
return 0;
}
}
ParseNextChunk(scan_state);
if (!ParseNextChunk(scan_state)) {
// found an error but we can't handle it - return
return 0;
}
}
return scan_state.scan_count;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ class StringColumnReader : public ColumnReader {
const StringColumnType string_column_type;

public:
static void VerifyString(const char *str_data, uint32_t str_len, const bool isVarchar);
static bool IsValid(const char *str_data, uint32_t str_len, bool is_varchar);
static bool IsValid(const string &str, bool is_varchar);
static void VerifyString(const char *str_data, uint32_t str_len, bool is_varchar);
void VerifyString(const char *str_data, uint32_t str_len) const;

static void ReferenceBlock(Vector &result, shared_ptr<ResizeableBuffer> &block);
Expand Down
12 changes: 4 additions & 8 deletions src/duckdb/extension/parquet/parquet_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,18 +395,14 @@ unique_ptr<BaseStatistics> ParquetStatisticsUtils::TransformColumnStatistics(con
break;
case LogicalTypeId::VARCHAR: {
auto string_stats = StringStats::CreateUnknown(type);
if (parquet_stats.__isset.min_value) {
StringColumnReader::VerifyString(parquet_stats.min_value.c_str(), parquet_stats.min_value.size(), true);
if (parquet_stats.__isset.min_value && StringColumnReader::IsValid(parquet_stats.min_value, true)) {
StringStats::SetMin(string_stats, parquet_stats.min_value);
} else if (parquet_stats.__isset.min) {
StringColumnReader::VerifyString(parquet_stats.min.c_str(), parquet_stats.min.size(), true);
} else if (parquet_stats.__isset.min && StringColumnReader::IsValid(parquet_stats.min, true)) {
StringStats::SetMin(string_stats, parquet_stats.min);
}
if (parquet_stats.__isset.max_value) {
StringColumnReader::VerifyString(parquet_stats.max_value.c_str(), parquet_stats.max_value.size(), true);
if (parquet_stats.__isset.max_value && StringColumnReader::IsValid(parquet_stats.max_value, true)) {
StringStats::SetMax(string_stats, parquet_stats.max_value);
} else if (parquet_stats.__isset.max) {
StringColumnReader::VerifyString(parquet_stats.max.c_str(), parquet_stats.max.size(), true);
} else if (parquet_stats.__isset.max && StringColumnReader::IsValid(parquet_stats.max, true)) {
StringStats::SetMax(string_stats, parquet_stats.max);
}
row_group_stats = string_stats.ToUnique();
Expand Down
13 changes: 10 additions & 3 deletions src/duckdb/extension/parquet/reader/string_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,23 @@ StringColumnReader::StringColumnReader(ParquetReader &reader, const ParquetColum
}
}

void StringColumnReader::VerifyString(const char *str_data, uint32_t str_len, const bool is_varchar) {
bool StringColumnReader::IsValid(const char *str_data, uint32_t str_len, const bool is_varchar) {
if (!is_varchar) {
return;
return true;
}
// verify if a string is actually UTF8, and if there are no null bytes in the middle of the string
// technically Parquet should guarantee this, but reality is often disappointing
UnicodeInvalidReason reason;
size_t pos;
auto utf_type = Utf8Proc::Analyze(str_data, str_len, &reason, &pos);
if (utf_type == UnicodeType::INVALID) {
return utf_type != UnicodeType::INVALID;
}

bool StringColumnReader::IsValid(const string &str, bool is_varchar) {
return IsValid(str.c_str(), str.size(), is_varchar);
}
void StringColumnReader::VerifyString(const char *str_data, uint32_t str_len, const bool is_varchar) {
if (!IsValid(str_data, str_len, is_varchar)) {
throw InvalidInputException("Invalid string encoding found in Parquet file: value \"%s\" is not valid UTF8!",
Blob::ToString(string_t(str_data, str_len)));
}
Expand Down
19 changes: 13 additions & 6 deletions src/duckdb/src/catalog/catalog_search_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,15 @@ void CatalogSearchPath::Set(CatalogSearchEntry new_value, CatalogSetPathType set
Set(std::move(new_paths), set_type);
}

const vector<CatalogSearchEntry> &CatalogSearchPath::Get() const {
return paths;
vector<CatalogSearchEntry> CatalogSearchPath::Get() const {
vector<CatalogSearchEntry> res;
for (auto &path : paths) {
if (path.schema.empty()) {
continue;
}
res.emplace_back(path);
}
return res;
}

string CatalogSearchPath::GetDefaultSchema(const string &catalog) const {
Expand Down Expand Up @@ -250,7 +257,7 @@ vector<string> CatalogSearchPath::GetCatalogsForSchema(const string &schema) con
catalogs.push_back(SYSTEM_CATALOG);
} else {
for (auto &path : paths) {
if (StringUtil::CIEquals(path.schema, schema)) {
if (StringUtil::CIEquals(path.schema, schema) || path.schema.empty()) {
catalogs.push_back(path.catalog);
}
}
Expand All @@ -261,24 +268,24 @@ vector<string> CatalogSearchPath::GetCatalogsForSchema(const string &schema) con
vector<string> CatalogSearchPath::GetSchemasForCatalog(const string &catalog) const {
vector<string> schemas;
for (auto &path : paths) {
if (StringUtil::CIEquals(path.catalog, catalog)) {
if (!path.schema.empty() && StringUtil::CIEquals(path.catalog, catalog)) {
schemas.push_back(path.schema);
}
}
return schemas;
}

const CatalogSearchEntry &CatalogSearchPath::GetDefault() const {
const auto &paths = Get();
D_ASSERT(paths.size() >= 2);
D_ASSERT(!paths[1].schema.empty());
return paths[1];
}

void CatalogSearchPath::SetPathsInternal(vector<CatalogSearchEntry> new_paths) {
this->set_paths = std::move(new_paths);

paths.clear();
paths.reserve(set_paths.size() + 3);
paths.reserve(set_paths.size() + 4);
paths.emplace_back(TEMP_CATALOG, DEFAULT_SCHEMA);
for (auto &path : set_paths) {
paths.push_back(path);
Expand Down
31 changes: 31 additions & 0 deletions src/duckdb/src/common/adbc/adbc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include "duckdb/common/adbc/single_batch_array_stream.hpp"
#include "duckdb/function/table/arrow.hpp"
#include "duckdb/common/adbc/wrappers.hpp"
#include <algorithm>
#include <cstring>
#include <stdlib.h>
static void ReleaseError(struct AdbcError *error);

Expand Down Expand Up @@ -1075,6 +1077,22 @@ AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct Arr
if (has_stream && to_table) {
return IngestToTableFromBoundStream(wrapper, rows_affected, error);
}

if (!wrapper->statement) {
if (out) {
out->private_data = nullptr;
out->get_schema = nullptr;
out->get_next = nullptr;
out->release = nullptr;
out->get_last_error = nullptr;
}

if (rows_affected) {
*rows_affected = 0;
}
return ADBC_STATUS_OK;
}

auto stream_wrapper = static_cast<DuckDBAdbcStreamWrapper *>(malloc(sizeof(DuckDBAdbcStreamWrapper)));
if (!stream_wrapper) {
SetError(error, "Allocation error");
Expand Down Expand Up @@ -1230,6 +1248,12 @@ AdbcStatusCode StatementSetSqlQuery(struct AdbcStatement *statement, const char
return ADBC_STATUS_INVALID_ARGUMENT;
}

auto query_len = strlen(query);
if (std::all_of(query, query + query_len, duckdb::StringUtil::CharacterIsSpace)) {
SetError(error, "No statements found");
return ADBC_STATUS_INVALID_ARGUMENT;
}

auto wrapper = static_cast<DuckDBAdbcStatementWrapper *>(statement->private_data);
if (wrapper->ingestion_stream.release) {
// Release any resources currently held by the ingestion stream before we overwrite it
Expand All @@ -1249,6 +1273,13 @@ AdbcStatusCode StatementSetSqlQuery(struct AdbcStatement *statement, const char
duckdb_destroy_extracted(&extracted_statements);
return ADBC_STATUS_INTERNAL;
}

if (extract_statements_size == 0) {
// Query is non-empty, but there are no actual statements.
duckdb_destroy_extracted(&extracted_statements);
return ADBC_STATUS_OK;
}

// Now lets loop over the statements, and execute every one
for (idx_t i = 0; i < extract_statements_size - 1; i++) {
duckdb_prepared_statement statement_internal = nullptr;
Expand Down
22 changes: 0 additions & 22 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
#include "duckdb/common/enums/profiler_format.hpp"
#include "duckdb/common/enums/quantile_enum.hpp"
#include "duckdb/common/enums/relation_type.hpp"
#include "duckdb/common/enums/scan_options.hpp"
#include "duckdb/common/enums/set_operation_type.hpp"
#include "duckdb/common/enums/set_scope.hpp"
#include "duckdb/common/enums/set_type.hpp"
Expand Down Expand Up @@ -4845,27 +4844,6 @@ TableReferenceType EnumUtil::FromString<TableReferenceType>(const char *value) {
return static_cast<TableReferenceType>(StringUtil::StringToEnum(GetTableReferenceTypeValues(), 13, "TableReferenceType", value));
}

const StringUtil::EnumStringLiteral *GetTableScanTypeValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(TableScanType::TABLE_SCAN_REGULAR), "TABLE_SCAN_REGULAR" },
{ static_cast<uint32_t>(TableScanType::TABLE_SCAN_COMMITTED_ROWS), "TABLE_SCAN_COMMITTED_ROWS" },
{ static_cast<uint32_t>(TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES), "TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES" },
{ static_cast<uint32_t>(TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED), "TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED" },
{ static_cast<uint32_t>(TableScanType::TABLE_SCAN_LATEST_COMMITTED_ROWS), "TABLE_SCAN_LATEST_COMMITTED_ROWS" }
};
return values;
}

template<>
const char* EnumUtil::ToChars<TableScanType>(TableScanType value) {
return StringUtil::EnumToString(GetTableScanTypeValues(), 5, "TableScanType", static_cast<uint32_t>(value));
}

template<>
TableScanType EnumUtil::FromString<TableScanType>(const char *value) {
return static_cast<TableScanType>(StringUtil::StringToEnum(GetTableScanTypeValues(), 5, "TableScanType", value));
}

const StringUtil::EnumStringLiteral *GetTaskExecutionModeValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(TaskExecutionMode::PROCESS_ALL), "PROCESS_ALL" },
Expand Down
8 changes: 8 additions & 0 deletions src/duckdb/src/common/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,14 @@ int64_t FileHandle::Write(void *buffer, idx_t nr_bytes) {
return file_system.Write(*this, buffer, UnsafeNumericCast<int64_t>(nr_bytes));
}

int64_t FileHandle::Write(QueryContext context, void *buffer, idx_t nr_bytes) {
if (context.GetClientContext() != nullptr) {
context.GetClientContext()->client_data->profiler->AddToCounter(MetricType::TOTAL_BYTES_READ, nr_bytes);
}

return file_system.Write(*this, buffer, UnsafeNumericCast<int64_t>(nr_bytes));
}

void FileHandle::Read(void *buffer, idx_t nr_bytes, idx_t location) {
file_system.Read(*this, buffer, UnsafeNumericCast<int64_t>(nr_bytes), location);
}
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/gzip_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void MiniZStreamWrapper::Initialize(QueryContext context, CompressedFile &file,
total_size = 0;

MiniZStream::InitializeGZIPHeader(gzip_hdr);
file.child_handle->Write(gzip_hdr, GZIP_HEADER_MINSIZE);
file.child_handle->Write(context, gzip_hdr, GZIP_HEADER_MINSIZE);

auto ret = mz_deflateInit2(mz_stream_ptr.get(), duckdb_miniz::MZ_DEFAULT_LEVEL, MZ_DEFLATED,
-MZ_DEFAULT_WINDOW_BITS, 1, 0);
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/pipe_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ int64_t PipeFile::ReadChunk(void *buffer, int64_t nr_bytes) {
return child_handle->Read(context, buffer, UnsafeNumericCast<idx_t>(nr_bytes));
}
int64_t PipeFile::WriteChunk(void *buffer, int64_t nr_bytes) {
return child_handle->Write(buffer, UnsafeNumericCast<idx_t>(nr_bytes));
return child_handle->Write(context, buffer, UnsafeNumericCast<idx_t>(nr_bytes));
}

void PipeFileSystem::Reset(FileHandle &handle) {
Expand Down
2 changes: 2 additions & 0 deletions src/duckdb/src/common/radix_partitioning.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct ComputePartitionIndicesFunctor {
UnaryExecutor::Execute<hash_t, hash_t>(hashes, partition_indices, append_count,
[&](hash_t hash) { return CONSTANTS::ApplyMask(hash); });
} else {
partition_indices.SetVectorType(VectorType::FLAT_VECTOR);
// We could just slice the "hashes" vector and use the UnaryExecutor
// But slicing a dictionary vector causes SelectionData to be allocated
// Instead, we just directly compute the partition indices using the selection vectors
Expand Down Expand Up @@ -231,6 +232,7 @@ void RadixPartitionedTupleData::ComputePartitionIndices(Vector &row_locations, i
utility_vector = make_uniq<Vector>(LogicalType::HASH);
}
Vector &intermediate = *utility_vector;
intermediate.SetVectorType(VectorType::FLAT_VECTOR);
partitions[0]->Gather(row_locations, *FlatVector::IncrementalSelectionVector(), count, hash_col_idx, intermediate,
*FlatVector::IncrementalSelectionVector(), nullptr);
RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, intermediate, partition_indices, count,
Expand Down
8 changes: 4 additions & 4 deletions src/duckdb/src/execution/aggregate_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ const PartitionedTupleData &GroupedAggregateHashTable::GetPartitionedData() cons
}

unique_ptr<PartitionedTupleData> GroupedAggregateHashTable::AcquirePartitionedData() {
// Flush/unpin partitioned data
partitioned_data->FlushAppendState(state.partitioned_append_state);
partitioned_data->Unpin();

if (radix_bits >= UNPARTITIONED_RADIX_BITS_THRESHOLD) {
// Flush/unpin unpartitioned data and append to partitioned data
if (unpartitioned_data) {
Expand All @@ -120,6 +116,10 @@ unique_ptr<PartitionedTupleData> GroupedAggregateHashTable::AcquirePartitionedDa
InitializeUnpartitionedData();
}

// Flush/unpin partitioned data
partitioned_data->FlushAppendState(state.partitioned_append_state);
partitioned_data->Unpin();

// Return and re-initialize
auto result = std::move(partitioned_data);
InitializePartitionedData();
Expand Down
Loading