Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/Java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ jobs:
path: ${{ github.workspace }}/ccache
key: ${{ steps.cache_key.outputs.value }}

- name: Enforce jemalloc
working-directory: src/duckdb
run: |
patch -p1 < ${{ github.workspace }}/data/vendoring/jemalloc_enforce.patch
git diff HEAD

- name: Build
uses: ./.github/composite-actions/linux-build-docker
with:
Expand Down
10 changes: 9 additions & 1 deletion .github/workflows/Vendor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,16 @@ jobs:
echo "Checking out engine ref: ${{ inputs.duckdb-sha }} on branch: ${{ github.ref_name }}"
git checkout ${{ inputs.duckdb-sha }}

- name: Vendor sources
- name: Apply local patches
id: apply_local_patches
if: ${{ steps.checkout_engine_rev.outcome == 'success' }}
working-directory: .git/duckdb
run: |
patch -p1 < ${{ github.workspace }}/data/vendoring/jemalloc_packaging.patch
git diff HEAD

- name: Vendor sources
if: ${{ steps.apply_local_patches.outcome == 'success' }}
id: vendor
run: |
REV=$(cd .git/duckdb && git rev-parse --short HEAD && cd ../..)
Expand Down
608 changes: 343 additions & 265 deletions CMakeLists.txt

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion CMakeLists.txt.in
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,18 @@ endif()
set(DUCKDB_INCLUDE_DIRS
${INCLUDES})

set(JEMALLOC_INCLUDE_DIRS
${JEMALLOC_INCLUDES})

set(DUCKDB_DEFINITIONS
${DEFINES})

set(DUCKDB_SRC_FILES
${SOURCES})

set(JEMALLOC_SRC_FILES
${JEMALLOC_SOURCES})


# a few OS-specific variables

Expand Down Expand Up @@ -96,7 +102,10 @@ add_jar(duckdb_jdbc_tests ${JAVA_TEST_FILES} INCLUDE_JARS duckdb_jdbc_nolib)
# main shared lib compilation

if(MSVC OR ZOS)
list(APPEND DUCKDB_INCLUDE_DIRS src/stubs)
list(APPEND DUCKDB_SRC_FILES duckdb_java.def)
else()
list(APPEND DUCKDB_SRC_FILES ${JEMALLOC_SRC_FILES})
endif()

add_library(duckdb_java SHARED
Expand Down Expand Up @@ -126,6 +135,11 @@ target_include_directories(duckdb_java PRIVATE
${JAVA_INCLUDE_PATH2}
${DUCKDB_INCLUDE_DIRS})

if (NOT MSVC AND NOT ZOS)
target_include_directories(duckdb_java PRIVATE
${JEMALLOC_INCLUDE_DIRS})
endif()

target_link_libraries(duckdb_java PRIVATE
duckdb-native
${CMAKE_DL_LIBS})
Expand Down Expand Up @@ -154,7 +168,7 @@ target_compile_definitions(duckdb_java PRIVATE

if(NOT MSVC AND NOT ZOS)
target_compile_definitions(duckdb_java PRIVATE
-DDUCKDB_EXTENSION_JEMALLOC_LINKED)
-DDUCKDB_ENABLE_JEMALLOC)
endif()

if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
Expand Down
12 changes: 12 additions & 0 deletions data/vendoring/jemalloc_enforce.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
diff --git a/src/common/allocator/allocator_standard.cpp b/src/common/allocator/allocator_standard.cpp
index 45e03d46b1..260a6fee8b 100644
--- a/src/common/allocator/allocator_standard.cpp
+++ b/src/common/allocator/allocator_standard.cpp
@@ -1,6 +1,7 @@
#include "duckdb/common/allocator.hpp"

#ifndef DUCKDB_ENABLE_JEMALLOC
+#error "jemalloc not enabled"

#include "duckdb/common/assert.hpp"
#include "duckdb/common/atomic.hpp"
32 changes: 32 additions & 0 deletions data/vendoring/jemalloc_packaging.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
diff --git a/scripts/package_build.py b/scripts/package_build.py
index 2f218a9db1..f6fea2fc34 100644
--- a/scripts/package_build.py
+++ b/scripts/package_build.py
@@ -41,6 +41,7 @@ def third_party_includes():
includes += [os.path.join('third_party', 'vergesort')]
includes += [os.path.join('third_party', 'yyjson', 'include')]
includes += [os.path.join('third_party', 'zstd', 'include')]
+ includes += [os.path.join('third_party', 'jemalloc', 'include')]
return includes


@@ -58,6 +59,7 @@ def third_party_sources():
sources += [os.path.join('third_party', 'mbedtls')]
sources += [os.path.join('third_party', 'yyjson')]
sources += [os.path.join('third_party', 'zstd')]
+ sources += [os.path.join('third_party', 'jemalloc')]
return sources


diff --git a/src/common/allocator/allocator_jemalloc.cpp b/src/common/allocator/allocator_jemalloc.cpp
index 4587add993..b795fa85ba 100644
--- a/src/common/allocator/allocator_jemalloc.cpp
+++ b/src/common/allocator/allocator_jemalloc.cpp
@@ -1,5 +1,6 @@
#include "duckdb/common/allocator.hpp"
#include "duckdb/common/numeric_utils.hpp"
+#include "duckdb/common/string_util.hpp"

#include <thread>
#include <cstdint>

10 changes: 7 additions & 3 deletions src/duckdb/extension/parquet/parquet_column_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,21 @@ unique_ptr<BaseStatistics> ParquetColumnSchema::Stats(const FileMetaData &file_m
return nullptr;
}
if (schema_type == ParquetColumnSchemaType::FILE_ROW_NUMBER) {
auto stats = NumericStats::CreateUnknown(type);
auto &row_groups = file_meta_data.row_groups;
D_ASSERT(row_group_idx_p < row_groups.size());
if (row_groups[row_group_idx_p].num_rows == 0) {
return NumericStats::CreateEmpty(type).ToUnique();
}

idx_t row_group_offset_min = 0;
for (idx_t i = 0; i < row_group_idx_p; i++) {
row_group_offset_min += row_groups[i].num_rows;
}

auto stats = NumericStats::CreateUnknown(type);
NumericStats::SetMin(stats, Value::BIGINT(UnsafeNumericCast<int64_t>(row_group_offset_min)));
NumericStats::SetMax(stats, Value::BIGINT(UnsafeNumericCast<int64_t>(row_group_offset_min +
row_groups[row_group_idx_p].num_rows)));
NumericStats::SetMax(stats, Value::BIGINT(UnsafeNumericCast<int64_t>(
row_group_offset_min + row_groups[row_group_idx_p].num_rows - 1)));
stats.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
return stats.ToUnique();
}
Expand Down
6 changes: 6 additions & 0 deletions src/duckdb/extension/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,12 @@ struct ParquetPartitionRowGroup : public PartitionRowGroup {
D_ASSERT(metadata.row_groups.size() > row_group_idx);
D_ASSERT(root_schema->children.size() > primary_index);

// Special handle generated columns.
const auto &column_schema = root_schema->children[primary_index];
if (column_schema.schema_type == ParquetColumnSchemaType::FILE_ROW_NUMBER) {
return true;
}

const auto &row_group = metadata.row_groups[row_group_idx];
const auto &column_chunk = row_group.columns[primary_index];

Expand Down
14 changes: 13 additions & 1 deletion src/duckdb/extension/parquet/writer/enum_column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@
namespace duckdb {
using duckdb_parquet::Encoding;

namespace {
class EnumStatisticsState : public StringStatisticsState {
public:
bool MinIsExact() override {
return false;
}
bool MaxIsExact() override {
return false;
}
};
} // namespace

class EnumWriterPageState : public ColumnWriterPageState {
public:
explicit EnumWriterPageState(uint32_t bit_width) : encoder(bit_width), written_value(false) {
Expand All @@ -23,7 +35,7 @@ EnumColumnWriter::EnumColumnWriter(ParquetWriter &writer, ParquetColumnSchema &&
}

unique_ptr<ColumnWriterStatistics> EnumColumnWriter::InitializeStatsState() {
return make_uniq<StringStatisticsState>();
return make_uniq<EnumStatisticsState>();
}

template <class T>
Expand Down
11 changes: 10 additions & 1 deletion src/duckdb/src/common/allocator/allocator_jemalloc.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "duckdb/common/allocator.hpp"
#include "duckdb/common/operator/numeric_cast.hpp"
#include "duckdb/common/numeric_utils.hpp"
#include "duckdb/common/string_util.hpp"

#include <thread>
#include <cstdint>
Expand Down Expand Up @@ -32,6 +33,14 @@ static string PurgeArenaString(idx_t arena_idx) {
return StringUtil::Format("arena.%llu.purge", arena_idx);
}

static void JemallocCTL(const char *name, void *old_ptr, size_t *old_len, void *new_ptr, size_t new_len) {
if (duckdb_je_mallctl(name, old_ptr, old_len, new_ptr, new_len) != 0) {
#ifdef DEBUG
throw InternalException("je_mallctl failed for setting \"%s\"", name);
#endif
}
}

template <class T>
static void SetJemallocCTL(const char *name, T &val) {
JemallocCTL(name, nullptr, nullptr, &val, sizeof(T));
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "3-dev353"
#define DUCKDB_PATCH_VERSION "3-dev373"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 5
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.5.3-dev353"
#define DUCKDB_VERSION "v1.5.3-dev373"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "6221d3e3f5"
#define DUCKDB_SOURCE_ID "f1a6e65815"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
3 changes: 3 additions & 0 deletions src/duckdb/src/include/duckdb/main/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ struct DBConfigOptions {
LogConfig log_config = LogConfig();
//! Physical memory that the block allocator is allowed to use (this memory is never freed and cannot be reduced)
idx_t block_allocator_size = 0;
//! The maximum data to buffer in row groups (in bytes) to buffer prior to flushing.
//! When inserting large chunks of data we
optional_idx write_buffer_row_group_memory_limit;

bool operator==(const DBConfigOptions &other) const;
};
Expand Down
13 changes: 13 additions & 0 deletions src/duckdb/src/include/duckdb/main/settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1628,6 +1628,19 @@ struct WriteBufferRowGroupCountSetting {
static constexpr idx_t SettingIndex = 94;
};

struct WriteBufferRowGroupMemoryLimitSetting {
using RETURN_TYPE = string;
static constexpr const char *Name = "write_buffer_row_group_memory_limit";
static constexpr const char *Description =
"The maximum data to buffer in row groups (in bytes) to buffer prior to flushing them together. When either "
"this limit is reached, or write_buffer_row_group_count is reached, we flush the data to disk. Defaults to 20% "
"of memory limit divided by thread count.";
static constexpr const char *InputType = "VARCHAR";
static void SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &parameter);
static void ResetGlobal(DatabaseInstance *db, DBConfig &config);
static Value GetSetting(const ClientContext &context);
};

struct ZstdMinStringLengthSetting {
using RETURN_TYPE = idx_t;
static constexpr const char *Name = "zstd_min_string_length";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ struct OptimisticWriteCollection {

shared_ptr<RowGroupCollection> collection;
set<idx_t> unflushed_row_groups;
idx_t unflushed_data_size = 0;
idx_t prev_allocated_size = 0;
idx_t complete_row_groups = 0;
vector<unique_ptr<PartialBlockManager>> partial_block_managers;

Expand Down
1 change: 1 addition & 0 deletions src/duckdb/src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ static const ConfigurationOption internal_options[] = {
DUCKDB_SETTING(WalAutocheckpointEntriesSetting),
DUCKDB_SETTING_CALLBACK(WarningsAsErrorsSetting),
DUCKDB_SETTING(WriteBufferRowGroupCountSetting),
DUCKDB_GLOBAL(WriteBufferRowGroupMemoryLimitSetting),
DUCKDB_SETTING(ZstdMinStringLengthSetting),
FINAL_SETTING};

Expand Down
26 changes: 26 additions & 0 deletions src/duckdb/src/main/settings/custom_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1699,6 +1699,32 @@ void WarningsAsErrorsSetting::OnSet(SettingCallbackInfo &info, Value &input) {
}
}

//===----------------------------------------------------------------------===//
// Streaming Buffer Size
//===----------------------------------------------------------------------===//
void WriteBufferRowGroupMemoryLimitSetting::SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &input) {
if (input.IsNull() || input.ToString().empty()) {
config.options.write_buffer_row_group_memory_limit = optional_idx();
} else {
config.options.write_buffer_row_group_memory_limit = DBConfig::ParseMemoryLimit(input.ToString());
}
}

void WriteBufferRowGroupMemoryLimitSetting::ResetGlobal(DatabaseInstance *db, DBConfig &config) {
config.options.write_buffer_row_group_memory_limit = optional_idx();
}

Value WriteBufferRowGroupMemoryLimitSetting::GetSetting(const ClientContext &context) {
auto &config = DBConfig::GetConfig(context);
idx_t bytes = 0;
if (config.options.write_buffer_row_group_memory_limit.IsValid()) {
bytes = config.options.write_buffer_row_group_memory_limit.GetIndex();
} else {
bytes = config.options.maximum_memory / 5 / (config.options.maximum_threads + 1);
}
return Value(StringUtil::BytesToHumanReadableString(bytes));
}

void CurrentTransactionInvalidationPolicySetting::OnSet(SettingCallbackInfo &info, Value &input) {
if (!info.context) {
throw InvalidInputException(
Expand Down
25 changes: 24 additions & 1 deletion src/duckdb/src/storage/optimistic_data_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,30 @@ void OptimisticDataWriter::WriteNewRowGroup(OptimisticWriteCollection &row_group

row_groups.unflushed_row_groups.insert(row_groups.complete_row_groups);
row_groups.complete_row_groups++;
auto allocated_size = row_groups.collection->GetAllocationSize();
if (row_groups.prev_allocated_size > allocated_size) {
throw InternalException("Row group prev allocated size is larger than currently allocated size");
}
row_groups.unflushed_data_size += allocated_size - row_groups.prev_allocated_size;
row_groups.prev_allocated_size = allocated_size;
auto unflushed_row_groups = row_groups.unflushed_row_groups.size();
if (unflushed_row_groups >= Settings::Get<WriteBufferRowGroupCountSetting>(context)) {
// check if we should flush the row groups
// first check the amount of row groups
bool need_to_flush = unflushed_row_groups >= Settings::Get<WriteBufferRowGroupCountSetting>(context);
if (!need_to_flush) {
// we don't need to flush based on the amount of row groups - but we still might need to flush based on the
// amount of
auto &config = DBConfig::GetConfig(context);
auto memory_limit = config.options.write_buffer_row_group_memory_limit;
if (!memory_limit.IsValid()) {
memory_limit = config.options.maximum_memory / 5 / (config.options.maximum_threads + 1);
}
if (row_groups.unflushed_data_size >= memory_limit.GetIndex()) {
// we exhausted our memory available for buffering - flush
need_to_flush = true;
}
}
if (need_to_flush) {
// we have crossed our flush threshold - flush any unwritten row groups to disk
vector<const_reference<RowGroup>> to_flush;
vector<int64_t> segment_indexes;
Expand All @@ -79,6 +101,7 @@ void OptimisticDataWriter::WriteNewRowGroup(OptimisticWriteCollection &row_group
}
FlushToDisk(row_groups, to_flush, segment_indexes);
row_groups.unflushed_row_groups.clear();
row_groups.unflushed_data_size = 0;
}
}

Expand Down
22 changes: 22 additions & 0 deletions src/duckdb/third_party/jemalloc/include/duckdb/malloc_ncpus.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// malloc_ncpus.h
//
//
//===----------------------------------------------------------------------===//

#ifndef MALLOC_NCPUS_H
#define MALLOC_NCPUS_H

#ifdef __cplusplus
extern "C" {
#endif

unsigned duckdb_malloc_ncpus();

#ifdef __cplusplus
}
#endif

#endif // MALLOC_NCPUS_H
Loading
Loading