Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into python_disallow_cro…
Browse files Browse the repository at this point in the history
…ss_connection_replacement_scans
  • Loading branch information
Tishj committed May 21, 2024
2 parents cfdece7 + 6187c4c commit 04c93e3
Show file tree
Hide file tree
Showing 31 changed files with 649 additions and 114 deletions.
10 changes: 8 additions & 2 deletions extension/parquet/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,18 +687,24 @@ void BasicColumnWriter::FinalizeWrite(ColumnWriterState &state_p) {
if (HasDictionary(state)) {
column_chunk.meta_data.statistics.distinct_count = DictionarySize(state);
column_chunk.meta_data.statistics.__isset.distinct_count = true;
column_chunk.meta_data.dictionary_page_offset = start_offset;
column_chunk.meta_data.dictionary_page_offset = column_writer.GetTotalWritten();
column_chunk.meta_data.__isset.dictionary_page_offset = true;
FlushDictionary(state, state.stats_state.get());
}

// record the start position of the pages for this column
column_chunk.meta_data.data_page_offset = column_writer.GetTotalWritten();
column_chunk.meta_data.data_page_offset = 0;
SetParquetStatistics(state, column_chunk);

// write the individual pages to disk
idx_t total_uncompressed_size = 0;
for (auto &write_info : state.write_info) {
// set the data page offset whenever we see the *first* data page
if (column_chunk.meta_data.data_page_offset == 0 && (write_info.page_header.type == PageType::DATA_PAGE ||
write_info.page_header.type == PageType::DATA_PAGE_V2)) {
column_chunk.meta_data.data_page_offset = column_writer.GetTotalWritten();
;
}
D_ASSERT(write_info.page_header.uncompressed_page_size > 0);
auto header_start_offset = column_writer.GetTotalWritten();
writer.Write(write_info.page_header);
Expand Down
38 changes: 38 additions & 0 deletions extension/parquet/parquet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,42 @@ void ParquetWriter::PrepareRowGroup(ColumnDataCollection &buffer, PreparedRowGro
result.heaps = buffer.GetHeapReferences();
}

// Validation code adapted from Impala
static void ValidateOffsetInFile(const string &filename, idx_t col_idx, idx_t file_length, idx_t offset,
const string &offset_name) {
if (offset < 0 || offset >= file_length) {
throw IOException("File '%s': metadata is corrupt. Column %d has invalid "
"%s (offset=%llu file_size=%llu).",
filename, col_idx, offset_name, offset, file_length);
}
}

static void ValidateColumnOffsets(const string &filename, idx_t file_length, const ParquetRowGroup &row_group) {
for (idx_t i = 0; i < row_group.columns.size(); ++i) {
const auto &col_chunk = row_group.columns[i];
ValidateOffsetInFile(filename, i, file_length, col_chunk.meta_data.data_page_offset, "data page offset");
auto col_start = NumericCast<idx_t>(col_chunk.meta_data.data_page_offset);
// The file format requires that if a dictionary page exists, it be before data pages.
if (col_chunk.meta_data.__isset.dictionary_page_offset) {
ValidateOffsetInFile(filename, i, file_length, col_chunk.meta_data.dictionary_page_offset,
"dictionary page offset");
if (NumericCast<idx_t>(col_chunk.meta_data.dictionary_page_offset) >= col_start) {
throw IOException("Parquet file '%s': metadata is corrupt. Dictionary "
"page (offset=%llu) must come before any data pages (offset=%llu).",
filename, col_chunk.meta_data.dictionary_page_offset, col_start);
}
col_start = col_chunk.meta_data.dictionary_page_offset;
}
auto col_len = NumericCast<idx_t>(col_chunk.meta_data.total_compressed_size);
auto col_end = col_start + col_len;
if (col_end <= 0 || col_end > file_length) {
throw IOException("Parquet file '%s': metadata is corrupt. Column %llu has "
"invalid column offsets (offset=%llu, size=%llu, file_size=%llu).",
filename, i, col_start, col_len, file_length);
}
}
}

void ParquetWriter::FlushRowGroup(PreparedRowGroup &prepared) {
lock_guard<mutex> glock(lock);
auto &row_group = prepared.row_group;
Expand All @@ -496,6 +532,8 @@ void ParquetWriter::FlushRowGroup(PreparedRowGroup &prepared) {
auto write_state = std::move(states[col_idx]);
col_writer->FinalizeWrite(*write_state);
}
// let's make sure all offsets are ay-okay
ValidateColumnOffsets(file_name, writer->GetTotalWritten(), row_group);

// append the row group to the file meta data
file_meta_data.row_groups.push_back(row_group);
Expand Down
2 changes: 1 addition & 1 deletion scripts/coverage_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ lcov --config-file .github/workflows/lcovrc --remove coverage.info $(< .github/w
genhtml -o coverage_html lcov.info

# check that coverage passes threshold
python3 scripts/check_coverage.py
# python3 scripts/check_coverage.py
42 changes: 42 additions & 0 deletions scripts/regression_test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import argparse
from typing import Dict, List, Any
import numpy as np

TPCH_QUERIES = []
res = duckdb.execute(
Expand Down Expand Up @@ -317,6 +318,39 @@ def benchmark(self, benchmark_name) -> BenchmarkResult:
return result


class PandasAnalyzerBenchmark:
def __init__(self):
self.initialize_connection()
self.generate()

def initialize_connection(self):
self.con = duckdb.connect()
if not threads:
return
print_msg(f'Limiting threads to {threads}')
self.con.execute(f"SET threads={threads}")

def generate(self):
return

def benchmark(self, benchmark_name) -> BenchmarkResult:
result = BenchmarkResult(benchmark_name)
data = [None] * 9999999 + [1] # Last element is 1, others are None

# Create the DataFrame with the specified data and column type as object
pandas_df = pd.DataFrame(data, columns=['Column'], dtype=object)
for _ in range(nruns):
duration = 0.0
start = time.time()
for _ in range(30):
res = self.con.execute("""select * from pandas_df""").df()
end = time.time()
duration = float(end - start)
del res
result.add(duration)
return result


def test_arrow_dictionaries_scan():
DICT_SIZE = 26 * 1000
print_msg(f"Generating a unique dictionary of size {DICT_SIZE}")
Expand All @@ -336,6 +370,13 @@ def test_loading_pandas_df_many_times():
result.write()


def test_pandas_analyze():
test = PandasAnalyzerBenchmark()
benchmark_name = f"pandas_analyze"
result = test.benchmark(benchmark_name)
result.write()


def test_call_and_select_statements():
test = SelectAndCallBenchmark()
queries = {
Expand All @@ -351,6 +392,7 @@ def main():
test_tpch()
test_arrow_dictionaries_scan()
test_loading_pandas_df_many_times()
test_pandas_analyze()
test_call_and_select_statements()

close_result()
Expand Down
17 changes: 11 additions & 6 deletions scripts/run_extension_medata_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ EOL
# Build the extensions using the first config
LOCAL_EXTENSION_REPO=$LOCAL_EXTENSION_REPO_UPDATED EXTENSION_CONFIGS=$TEST_DIR/extension_config_before.cmake make debug

# Set the version and platform now that we have a build
DUCKDB_VERSION=`$DUCKDB_BUILD_DIR/duckdb -csv -noheader -c 'select source_id from pragma_version()'`
DUCKDB_PLATFORM=`cat $DUCKDB_BUILD_DIR/duckdb_platform_out`

# Install the extension from the initial config
$DUCKDB_BUILD_DIR/duckdb -unsigned -c "set extension_directory='$LOCAL_EXTENSION_DIR'; set custom_extension_repository='$LOCAL_EXTENSION_REPO_UPDATED'; install tpch; install json; install inet;"

Expand All @@ -67,8 +71,6 @@ EOL
$DUCKDB_BUILD_DIR/duckdb -unsigned -c "set extension_directory='$LOCAL_EXTENSION_DIR'; install '$DIRECT_INSTALL_DIR/tpcds.duckdb_extension';"

# Delete the info file from the inet extension
DUCKDB_VERSION=`$DUCKDB_BUILD_DIR/duckdb -csv -noheader -c 'select source_id from pragma_version()'`
DUCKDB_PLATFORM=`cat $DUCKDB_BUILD_DIR/duckdb_platform_out`
rm $LOCAL_EXTENSION_DIR/$DUCKDB_VERSION/$DUCKDB_PLATFORM/inet.duckdb_extension.info

# Set updated extension config where we update the tpch and inet extension but not the json extension
Expand Down Expand Up @@ -143,14 +145,10 @@ EOL
$DUCKDB_BUILD_DIR/duckdb -unsigned -c "set allow_extensions_metadata_mismatch=true; set extension_directory='$LOCAL_EXTENSION_REPO_VERSION_AND_PLATFORM_INCORRECT'; install '$DIRECT_INSTALL_DIR/json_incorrect_version_and_platform.duckdb_extension'"

# Create dir with malformed info file
DUCKDB_VERSION=`$DUCKDB_BUILD_DIR/duckdb -csv -noheader -c 'select source_id from pragma_version()'`
DUCKDB_PLATFORM=`cat $DUCKDB_BUILD_DIR/duckdb_platform_out`
$DUCKDB_BUILD_DIR/duckdb -unsigned -c "set extension_directory='$LOCAL_EXTENSION_DIR_MALFORMED_INFO'; install '$DIRECT_INSTALL_DIR/tpcds.duckdb_extension';"
echo blablablab > $LOCAL_EXTENSION_DIR_MALFORMED_INFO/$DUCKDB_VERSION/$DUCKDB_PLATFORM/tpcds.duckdb_extension.info

# Create dir with malformed info file: we install a new version from LOCAL_EXTENSION_REPO_UPDATED but preserve the old info file
DUCKDB_VERSION=`$DUCKDB_BUILD_DIR/duckdb -csv -noheader -c 'select source_id from pragma_version()'`
DUCKDB_PLATFORM=`cat $DUCKDB_BUILD_DIR/duckdb_platform_out`
$DUCKDB_BUILD_DIR/duckdb -unsigned -c "set extension_directory='$LOCAL_EXTENSION_DIR_INFO_INCORRECT_VERSION'; install 'tpch' from '$LOCAL_EXTENSION_REPO_UPDATED'"
cp $LOCAL_EXTENSION_DIR/$DUCKDB_VERSION/$DUCKDB_PLATFORM/tpch.duckdb_extension.info $LOCAL_EXTENSION_DIR_INFO_INCORRECT_VERSION/$DUCKDB_VERSION/$DUCKDB_PLATFORM/tpch.duckdb_extension.info

Expand All @@ -160,11 +158,18 @@ EOL
cp -R $TEST_DIR $TEST_DIR_COPY
fi

###########################
### Set version and platform
###########################
DUCKDB_VERSION=`$DUCKDB_BUILD_DIR/duckdb -csv -noheader -c 'select source_id from pragma_version()'`
DUCKDB_PLATFORM=`cat $DUCKDB_BUILD_DIR/duckdb_platform_out`

###########################
### Populate the minio repositories
###########################
AWS_DEFAULT_REGION=eu-west-1 AWS_ACCESS_KEY_ID=minio_duckdb_user AWS_SECRET_ACCESS_KEY=minio_duckdb_user_password aws --endpoint-url http://duckdb-minio.com:9000 s3 sync $LOCAL_EXTENSION_REPO_UPDATED s3://test-bucket-public/ci-test-repo
export REMOTE_EXTENSION_REPO_UPDATED=http://duckdb-minio.com:9000/test-bucket-public/ci-test-repo
export REMOTE_EXTENSION_REPO_DIRECT_PATH=http://duckdb-minio.com:9000/test-bucket-public/ci-test-repo/$DUCKDB_VERSION/$DUCKDB_PLATFORM

################
### Run test
Expand Down
28 changes: 28 additions & 0 deletions scripts/run_tests_one_by_one.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
import subprocess
import time
import threading

import argparse

Expand All @@ -22,6 +23,9 @@ def valid_timeout(value):
parser.add_argument('--no-assertions', action='store_false', help='Disable assertions')
parser.add_argument('--time_execution', action='store_true', help='Measure and print the execution time of each test')
parser.add_argument('--list', action='store_true', help='Print the list of tests to run')
parser.add_argument(
'--print-interval', action='store', help='Prints "Still running..." every N seconds', default=300.0, type=float
)
parser.add_argument(
'--timeout',
action='store',
Expand Down Expand Up @@ -98,9 +102,29 @@ def parse_assertions(stdout):
return "ERROR"


is_active = False


def print_interval_background(interval):
global is_active
current_ticker = 0.0
while is_active:
time.sleep(0.1)
current_ticker += 0.1
if current_ticker >= interval:
print("Still running...")
current_ticker = 0


for test_number, test_case in enumerate(test_cases):
if not profile:
print(f"[{test_number}/{test_count}]: {test_case}", end="", flush=True)

# start the background thread
is_active = True
background_print_thread = threading.Thread(target=print_interval_background, args=[args.print_interval])
background_print_thread.start()

start = time.time()
try:
res = subprocess.run(
Expand All @@ -115,6 +139,10 @@ def parse_assertions(stdout):
stderr = res.stderr.decode('utf8')
end = time.time()

# joint he background print thread
is_active = False
background_print_thread.join()

additional_data = ""
if assertions:
additional_data += " (" + parse_assertions(stdout) + ")"
Expand Down
6 changes: 5 additions & 1 deletion src/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ data_ptr_t Allocator::DefaultAllocate(PrivateAllocatorData *private_data, idx_t
#ifdef USE_JEMALLOC
return JemallocExtension::Allocate(private_data, size);
#else
return data_ptr_cast(malloc(size));
auto default_allocate_result = malloc(size);
if (!default_allocate_result) {
throw std::bad_alloc();
}
return data_ptr_cast(default_allocate_result);
#endif
}

Expand Down
9 changes: 6 additions & 3 deletions src/common/serializer/memory_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

namespace duckdb {

MemoryStream::MemoryStream(idx_t capacity)
: position(0), capacity(capacity), owns_data(true), data(static_cast<data_ptr_t>(malloc(capacity))) {
MemoryStream::MemoryStream(idx_t capacity) : position(0), capacity(capacity), owns_data(true) {
auto data_malloc_result = malloc(capacity);
if (!data_malloc_result) {
throw std::bad_alloc();
}
data = static_cast<data_ptr_t>(data_malloc_result);
}

MemoryStream::MemoryStream(data_ptr_t buffer, idx_t capacity)
Expand All @@ -25,7 +29,6 @@ void MemoryStream::WriteData(const_data_ptr_t source, idx_t write_size) {
throw SerializationException("Failed to serialize: not enough space in buffer to fulfill write request");
}
}

memcpy(data + position, source, write_size);
position += write_size;
}
Expand Down
4 changes: 2 additions & 2 deletions src/function/cast/string_cast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ bool VectorStringToList::StringToNestedTypeCastLoop(const string_t *source_data,
if (!VectorStringToList::SplitStringList(source_data[idx], child_data, total, varchar_vector)) {
string text = "Type VARCHAR with value '" + source_data[idx].GetString() +
"' can't be cast to the destination type LIST";
HandleVectorCastError::Operation<string_t>(text, result_mask, idx, vector_cast_data);
HandleVectorCastError::Operation<string_t>(text, result_mask, i, vector_cast_data);
}
list_data[i].length = total - list_data[i].offset; // length is the amount of parts coming from this string
}
Expand Down Expand Up @@ -422,7 +422,7 @@ bool VectorStringToArray::StringToNestedTypeCastLoop(const string_t *source_data
if (!VectorStringToList::SplitStringList(source_data[idx], child_data, total, varchar_vector)) {
auto text = StringUtil::Format("Type VARCHAR with value '%s' can't be cast to the destination type ARRAY",
source_data[idx].GetString());
HandleVectorCastError::Operation<string_t>(text, result_mask, idx, vector_cast_data);
HandleVectorCastError::Operation<string_t>(text, result_mask, i, vector_cast_data);
}
}
D_ASSERT(total == child_count);
Expand Down
12 changes: 11 additions & 1 deletion src/include/duckdb/main/client_context_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@ class ErrorData;
class MetaTransaction;
class PreparedStatementData;
class SQLStatement;
struct PendingQueryParameters;

enum class RebindQueryInfo { DO_NOT_REBIND, ATTEMPT_TO_REBIND };

struct PreparedStatementCallbackInfo {
PreparedStatementCallbackInfo(PreparedStatementData &prepared_statement, const PendingQueryParameters &parameters)
: prepared_statement(prepared_statement), parameters(parameters) {
}

PreparedStatementData &prepared_statement;
const PendingQueryParameters &parameters;
};

//! ClientContextState is virtual base class for ClientContext-local (or Query-Local, using QueryEnd callback) state
//! e.g. caches that need to live as long as a ClientContext or Query.
class ClientContextState {
Expand Down Expand Up @@ -48,7 +58,7 @@ class ClientContextState {
PreparedStatementMode mode) {
return RebindQueryInfo::DO_NOT_REBIND;
}
virtual RebindQueryInfo OnExecutePrepared(ClientContext &context, PreparedStatementData &prepared_statement,
virtual RebindQueryInfo OnExecutePrepared(ClientContext &context, PreparedStatementCallbackInfo &info,
RebindQueryInfo current_rebind) {
return RebindQueryInfo::DO_NOT_REBIND;
}
Expand Down
2 changes: 2 additions & 0 deletions src/include/duckdb/main/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ struct DBConfigOptions {
string extension_directory;
//! Whether unsigned extensions should be loaded
bool allow_unsigned_extensions = false;
//! Whether community extensions should be loaded
bool allow_community_extensions = true;
//! Whether extensions with missing metadata should be loaded
bool allow_extensions_metadata_mismatch = false;
//! Enable emitting FSST Vectors
Expand Down
5 changes: 3 additions & 2 deletions src/include/duckdb/main/extension_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class ExtensionHelper {
static string ExtensionDirectory(ClientContext &context);
static string ExtensionDirectory(DBConfig &config, FileSystem &fs);

static bool CheckExtensionSignature(FileHandle &handle, ParsedExtensionMetaData &parsed_metadata);
static bool CheckExtensionSignature(FileHandle &handle, ParsedExtensionMetaData &parsed_metadata,
const bool allow_community_extensions);
static ParsedExtensionMetaData ParseExtensionMetaData(const char *metadata);
static ParsedExtensionMetaData ParseExtensionMetaData(FileHandle &handle);

Expand All @@ -135,7 +136,7 @@ class ExtensionHelper {
static ExtensionAlias GetExtensionAlias(idx_t index);

//! Get public signing keys for extension signing
static const vector<string> GetPublicKeys();
static const vector<string> GetPublicKeys(bool allow_community_extension = false);

// Returns extension name, or empty string if not a replacement open path
static string ExtractExtensionPrefixFromPath(const string &path);
Expand Down
9 changes: 9 additions & 0 deletions src/include/duckdb/main/settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ struct AllowUnsignedExtensionsSetting {
static Value GetSetting(const ClientContext &context);
};

struct AllowCommunityExtensionsSetting {
static constexpr const char *Name = "allow_community_extensions";
static constexpr const char *Description = "Allow to load community built extensions";
static constexpr const LogicalTypeId InputType = LogicalTypeId::BOOLEAN;
static void SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &parameter);
static void ResetGlobal(DatabaseInstance *db, DBConfig &config);
static Value GetSetting(const ClientContext &context);
};

struct AllowExtensionsMetadataMismatchSetting {
static constexpr const char *Name = "allow_extensions_metadata_mismatch";
static constexpr const char *Description = "Allow to load extensions with not compatible metadata";
Expand Down
Loading

0 comments on commit 04c93e3

Please sign in to comment.