Skip to content

Commit

Permalink
mostly adding debugging statements for help. Still trying to figure o…
Browse files Browse the repository at this point in the history
…ut how to know if parallelizing is sequential or not
  • Loading branch information
Tmonster committed Dec 21, 2022
1 parent f4f5834 commit 0f08574
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
11 changes: 11 additions & 0 deletions src/execution/operator/helper/physical_reservoir_sample.cpp
@@ -1,6 +1,8 @@
#include "duckdb/execution/operator/helper/physical_reservoir_sample.hpp"
#include "duckdb/execution/reservoir_sample.hpp"

#include "iostream"

namespace duckdb {

//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -43,11 +45,20 @@ SinkResultType PhysicalReservoirSample::Sink(ExecutionContext &context, GlobalSi
// we implement reservoir sampling without replacement and exponential jumps here
// the algorithm is adopted from the paper Weighted random sampling with a reservoir by Pavlos S. Efraimidis et al.
// note that the original algorithm is about weighted sampling; this is a simplified approach for uniform sampling
if (input.size() == 0) {
std::cout << "received input of size 0" << std::endl;
}
lock_guard<mutex> glock(gstate.lock);
gstate.sample->AddToReservoir(input);
return SinkResultType::NEED_MORE_INPUT;
}

void PhysicalReservoirSample::Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const {
std::cout << "physical reservoir sample combine called" << std::endl;
}



//===--------------------------------------------------------------------===//
// Source
//===--------------------------------------------------------------------===//
Expand Down
Expand Up @@ -44,6 +44,8 @@ class PhysicalReservoirSample : public PhysicalOperator {
return true;
}

void Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const override;

string ParamsToString() const override;
};

Expand Down
4 changes: 3 additions & 1 deletion src/main/client_context.cpp
Expand Up @@ -45,6 +45,8 @@
#include "duckdb/main/error_manager.hpp"
#include "duckdb/common/http_stats.hpp"

#include "iostream"

namespace duckdb {

struct ActiveQueryContext {
Expand Down Expand Up @@ -784,7 +786,7 @@ unique_ptr<QueryResult> ClientContext::Query(unique_ptr<SQLStatement> statement,

unique_ptr<QueryResult> ClientContext::Query(const string &query, bool allow_stream_result) {
auto lock = LockContext();

std::cout << "running query " << query << std::endl;
PreservedError error;
vector<unique_ptr<SQLStatement>> statements;
if (!ParseStatements(*lock, query, statements, error)) {
Expand Down
10 changes: 6 additions & 4 deletions test/sql/sample/reservoir_testing.test
Expand Up @@ -5,13 +5,15 @@
require tpch

statement ok
CALL dbgen(sf=0.1);
CALL dbgen(sf=0.5);

statement ok
PRAGMA enable_verification;

statement ok
PRAGMA threads=4;

query I
SELECT COUNT(*) FROM lineitem USING SAMPLE 4096 ROWS
SELECT COUNT(*) FROM lineitem USING SAMPLE 1000000 ROWS
----
4096

1000000

0 comments on commit 0f08574

Please sign in to comment.