Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Compilation OU and Metrics #1634

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions Jenkinsfile-utils.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ void stageModeling() {
'-DCMAKE_BUILD_TYPE=Release -DNOISEPAGE_UNITY_BUILD=ON -DNOISEPAGE_USE_JEMALLOC=ON'
])
buildNoisePageTarget("execution_runners")
buildNoisePageTarget("compilation_runner")

// The forecaster_standalone script runs TPC-C with query trace enabled.
// The forecaster_standalone script uses SET to enable query trace.
Expand Down Expand Up @@ -256,6 +257,11 @@ void stageModeling() {
../benchmark/execution_runners --execution_runner_rows_limit=100 --rerun=0 --warm_num=1
''', label: 'OU model training data generation'

sh script :'''
cd build/bin
../benchmark/compilation_runner
''', label: 'Compilation model training data generation'

// Recompile the noisepage DBMS in Debug mode with code coverage.
buildNoisePage([buildCommand:'ninja noisepage', cmake:
'-DCMAKE_BUILD_TYPE=Debug -DNOISEPAGE_UNITY_BUILD=OFF -DNOISEPAGE_GENERATE_COVERAGE=ON'
Expand Down
68 changes: 68 additions & 0 deletions benchmark/runner/compilation_runner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#include <fstream>
#include <string>
#include <vector>

#include "benchmark/benchmark.h"
#include "benchmark_util/data_table_benchmark_util.h"
#include "common/dedicated_thread_registry.h"
#include "common/scoped_timer.h"
#include "execution/compiler/executable_query.h"
#include "execution/exec/execution_settings.h"
#include "execution/execution_util.h"
#include "execution/vm/module.h"
#include "loggers/execution_logger.h"
#include "metrics/metrics_thread.h"
#include "storage/garbage_collector_thread.h"
#include "storage/write_ahead_log/log_manager.h"

#define LOG_TEST_LOG_FILE_NAME "benchmark.txt"

namespace noisepage::runner {

class CompilationRunner : public benchmark::Fixture {};

// NOLINTNEXTLINE
BENCHMARK_DEFINE_F(CompilationRunner, Compilation)(benchmark::State &state) {
noisepage::LoggersUtil::Initialize();
execution::ExecutionUtil::InitTPL("./bytecode_handlers_ir.bc");

auto *const metrics_manager = new metrics::MetricsManager();
metrics_manager->EnableMetric(metrics::MetricsComponent::COMPILATION);
metrics_manager->RegisterThread();

const std::string &path = "../../sample_tpl/tpl_tests.txt";
std::ifstream tpl_tests(path);

std::string input_line;
size_t identifier = 0;
while (std::getline(tpl_tests, input_line)) {
if (input_line.find(".tpl") != std::string::npos && input_line[0] != '#') {
// We have found a valid test
std::string tpl = input_line.substr(0, input_line.find(','));
std::string target = "../sample_tpl/" + tpl;

std::ifstream input(target);
std::string contents((std::istreambuf_iterator<char>(input)), (std::istreambuf_iterator<char>()));
EXECUTION_LOG_INFO("Running compilation on {}", target);

execution::exec::ExecutionSettings exec_settings;
auto exec_query = execution::compiler::ExecutableQuery(contents, nullptr, false, 16, exec_settings,
transaction::timestamp_t(0));
for (const auto &fragment : exec_query.GetFragments()) {
fragment->GetModule()->CompileToMachineCode(execution::query_id_t(identifier));
}

identifier++;
}
}

metrics_manager->Aggregate();
metrics_manager->ToOutput(nullptr);
metrics_manager->UnregisterThread();
delete metrics_manager;
noisepage::LoggersUtil::ShutDown();
}

BENCHMARK_REGISTER_F(CompilationRunner, Compilation)->Unit(benchmark::kMillisecond)->UseManualTime()->Iterations(1);

} // namespace noisepage::runner
2 changes: 1 addition & 1 deletion benchmark/runner/execution_runners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include "runner/execution_runners_argument_generator.h"
#include "runner/execution_runners_data_config.h"
#include "runner/execution_runners_settings.h"
#include "self_driving/modeling/operating_unit.h"
#include "self_driving/modeling/execution_operating_unit.h"
#include "self_driving/modeling/operating_unit_defs.h"
#include "self_driving/modeling/operating_unit_recorder.h"
#include "storage/index/bplustree.h"
Expand Down
14 changes: 14 additions & 0 deletions script/self_driving/modeling/data/opunit_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,27 @@ def get_ou_runner_data(filename, model_results_path, txn_sample_rate, model_map=
if "execution" in filename:
# Handle the execution data
return _execution_get_ou_runner_data(filename, model_map, predict_cache, trim)
if "compilation" in filename:
return _compilation_get_data(filename)
if "gc" in filename or "log" in filename:
# Handle of the gc or log data with interval-based conversion
return _interval_get_ou_runner_data(filename, model_results_path)

return _default_get_ou_runner_data(filename)


def _compilation_get_data(filename):
df = pd.read_csv(filename, skipinitialspace=True)
headers = list(df.columns.values)
data_info.instance.parse_csv_header(headers, False)
file_name = os.path.splitext(os.path.basename(filename))[0]

# Remove the first two (query_id and the name)
x = df.iloc[:, 2:-data_info.instance.METRICS_OUTPUT_NUM].values
y = df.iloc[:, -data_info.instance.OU_MODEL_TARGET_NUM:].values
return [OpUnitData(OpUnit[file_name.upper()], x, y)]


def _default_get_ou_runner_data(filename):
# In the default case, the data does not need any pre-processing and the file name indicates the opunit
df = pd.read_csv(filename, skipinitialspace=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def _cardinality_linear_predict_transform(x, y):
OpUnit.DISK_LOG_CONSUMER_TASK: None,
OpUnit.TXN_BEGIN: None,
OpUnit.TXN_COMMIT: None,
OpUnit.COMPILATION: None,

# Execution engine opunits
OpUnit.SEQ_SCAN: _num_rows_linear_transformer,
Expand Down Expand Up @@ -184,6 +185,7 @@ def _num_rows_cardinality_linear_train_transform(x):
OpUnit.DISK_LOG_CONSUMER_TASK: None,
OpUnit.TXN_BEGIN: None,
OpUnit.TXN_COMMIT: None,
OpUnit.COMPILATION: None,

# Execution engine opunits
OpUnit.SEQ_SCAN: None,
Expand Down
10 changes: 10 additions & 0 deletions script/self_driving/modeling/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class OpUnit(enum.IntEnum):
BIND_COMMAND = 40,
EXECUTE_COMMAND = 41

# Compilation
COMPILATION = 42


class ExecutionFeature(enum.IntEnum):
# Debugging information
Expand Down Expand Up @@ -100,6 +103,13 @@ class ExecutionFeature(enum.IntEnum):
READONLY_UNLINKED = 18,
INTERVAL = 19,

# Compilation input features
NAME = 20,
CODE_SIZE = 21,
DATA_SIZE = 22,
FUNCTIONS_SIZE = 23,
STATIC_LOCALS_SIZE = 24


class ConcurrentCountingMode(enum.Enum):
"""How to identify the concurrent running operations (for a GroupedOpUnitData)
Expand Down
2 changes: 1 addition & 1 deletion src/execution/ast/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "execution/sql/table_vector_iterator.h"
#include "execution/sql/thread_state_container.h"
#include "execution/sql/value.h"
#include "self_driving/modeling/operating_unit.h"
#include "self_driving/modeling/execution_operating_unit.h"
// #include "execution/util/csv_reader.h" Fix later.
#include "execution/util/execution_common.h"

Expand Down
2 changes: 1 addition & 1 deletion src/execution/ast/type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include "execution/sql/thread_state_container.h"
#include "execution/sql/value.h"
#include "execution/sql/vector_projection_iterator.h"
#include "self_driving/modeling/operating_unit.h"
#include "self_driving/modeling/execution_operating_unit.h"
// #include "execution/util/csv_reader.h" Fix later.

namespace noisepage::execution::ast {
Expand Down
17 changes: 16 additions & 1 deletion src/execution/compiler/compilation_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "execution/compiler/operator/update_translator.h"
#include "execution/compiler/pipeline.h"
#include "execution/exec/execution_settings.h"
#include "execution/vm/module.h"
#include "parser/expression/abstract_expression.h"
#include "parser/expression/column_value_expression.h"
#include "parser/expression/comparison_expression.h"
Expand Down Expand Up @@ -73,6 +74,7 @@
#include "planner/plannodes/seq_scan_plan_node.h"
#include "planner/plannodes/set_op_plan_node.h"
#include "planner/plannodes/update_plan_node.h"
#include "self_driving/modeling/compilation_operating_unit.h"
#include "self_driving/modeling/operating_unit_recorder.h"
#include "spdlog/fmt/fmt.h"

Expand Down Expand Up @@ -124,6 +126,17 @@ ast::FunctionDecl *CompilationContext::GenerateTearDownFunction() {
return builder.Finish();
}

std::unique_ptr<selfdriving::CompilationOperatingUnits> CompilationContext::GenerateCompilationOperatingUnits(
const std::vector<std::unique_ptr<ExecutableQuery::Fragment>> &fragments) {
auto units = std::make_unique<selfdriving::CompilationOperatingUnits>();
for (const auto &fragment : fragments) {
auto module = fragment->GetModule();
auto bytecode_module = module->GetBytecodeModule();
units->RecordCompilationModule(bytecode_module);
}
return units;
}

void CompilationContext::GeneratePlan(const planner::AbstractPlanNode &plan,
common::ManagedPointer<planner::PlanMetaData> plan_meta_data) {
exec_ctx_ =
Expand Down Expand Up @@ -194,7 +207,9 @@ void CompilationContext::GeneratePlan(const planner::AbstractPlanNode &plan,

// Compile and finish.
fragments.emplace_back(main_builder.Compile(query_->GetExecutionSettings().GetCompilerSettings()));
query_->Setup(std::move(fragments), query_state_.GetSize(), codegen_.ReleasePipelineOperatingUnits());
auto units = GenerateCompilationOperatingUnits(fragments);
query_->Setup(std::move(fragments), query_state_.GetSize(), codegen_.ReleasePipelineOperatingUnits(),
std::move(units));
}

// static
Expand Down
32 changes: 25 additions & 7 deletions src/execution/compiler/executable_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
#include "execution/sema/error_reporter.h"
#include "execution/vm/module.h"
#include "loggers/execution_logger.h"
#include "self_driving/modeling/operating_unit.h"
#include "self_driving/modeling/compilation_operating_unit.h"
#include "self_driving/modeling/execution_operating_unit.h"
#include "transaction/transaction_context.h"

namespace noisepage::execution::compiler {
Expand All @@ -27,7 +28,9 @@ ExecutableQuery::Fragment::Fragment(std::vector<std::string> &&functions, std::v

ExecutableQuery::Fragment::~Fragment() = default;

void ExecutableQuery::Fragment::Run(byte query_state[], vm::ExecutionMode mode) const {
void ExecutableQuery::Fragment::ResetCompiledModule() { module_->ResetCompiledModule(); }

void ExecutableQuery::Fragment::Run(execution::query_id_t query_id, byte query_state[], vm::ExecutionMode mode) const {
using Function = std::function<void(void *)>;

auto exec_ctx = *reinterpret_cast<exec::ExecutionContext **>(query_state);
Expand All @@ -36,15 +39,15 @@ void ExecutableQuery::Fragment::Run(byte query_state[], vm::ExecutionMode mode)
}
for (const auto &func_name : functions_) {
Function func;
if (!module_->GetFunction(func_name, mode, &func)) {
if (!module_->GetFunction(query_id, func_name, mode, &func)) {
throw EXECUTION_EXCEPTION(fmt::format("Could not find function '{}' in query fragment.", func_name),
common::ErrorCode::ERRCODE_INTERNAL_ERROR);
}
try {
func(query_state);
} catch (const AbortException &e) {
for (const auto &teardown_name : teardown_fn_) {
if (!module_->GetFunction(teardown_name, mode, &func)) {
if (!module_->GetFunction(query_id, teardown_name, mode, &func)) {
throw EXECUTION_EXCEPTION(fmt::format("Could not find teardown function '{}' in query fragment.", func_name),
common::ErrorCode::ERRCODE_INTERNAL_ERROR);
}
Expand Down Expand Up @@ -130,7 +133,7 @@ ExecutableQuery::ExecutableQuery(const std::string &contents,
std::vector<std::unique_ptr<Fragment>> fragments;
fragments.emplace_back(std::move(fragment));

Setup(std::move(fragments), query_state_size, nullptr);
Setup(std::move(fragments), query_state_size, nullptr, nullptr);

if (is_file) {
// acquire the output format
Expand All @@ -142,7 +145,8 @@ ExecutableQuery::ExecutableQuery(const std::string &contents,
ExecutableQuery::~ExecutableQuery() = default;

void ExecutableQuery::Setup(std::vector<std::unique_ptr<Fragment>> &&fragments, const std::size_t query_state_size,
std::unique_ptr<selfdriving::PipelineOperatingUnits> pipeline_operating_units) {
std::unique_ptr<selfdriving::PipelineOperatingUnits> pipeline_operating_units,
std::unique_ptr<selfdriving::CompilationOperatingUnits> compilation_operating_units) {
NOISEPAGE_ASSERT(
std::all_of(fragments.begin(), fragments.end(), [](const auto &fragment) { return fragment->IsCompiled(); }),
"All query fragments are not compiled!");
Expand All @@ -152,6 +156,7 @@ void ExecutableQuery::Setup(std::vector<std::unique_ptr<Fragment>> &&fragments,
fragments_ = std::move(fragments);
query_state_size_ = query_state_size;
pipeline_operating_units_ = std::move(pipeline_operating_units);
compilation_operating_units_ = std::move(compilation_operating_units);

EXECUTION_LOG_TRACE("Query has {} fragment{} with {}-byte query state.", fragments_.size(),
fragments_.size() > 1 ? "s" : "", query_state_size_);
Expand All @@ -167,9 +172,22 @@ void ExecutableQuery::Run(common::ManagedPointer<exec::ExecutionContext> exec_ct
exec_ctx->SetPipelineOperatingUnits(GetPipelineOperatingUnits());
exec_ctx->SetQueryId(query_id_);

if (!exec_ctx->GetExecutionSettings().GetIsCompilationCacheEnabled()) {
// This model assumes that an ExecutableQuery is tied to the lifetime of a specific
// connection (via the ProtocolInterpreter). If at any point in the future, this
// assumption proves to be incorrect, this would need to be revisited.
//
// Particularly, to reliably bypass the compilation cache, module and/or invocation
// state (i.e., CompiledModule) would need to be moved to thread-local or
// per-execution state (i.e., into the ExecutionContext).
for (const auto &fragment : fragments_) {
fragment->ResetCompiledModule();
}
}

// Now run through fragments.
for (const auto &fragment : fragments_) {
fragment->Run(query_state.get(), mode);
fragment->Run(query_id_, query_state.get(), mode);
}

// We do not currently re-use ExecutionContexts. However, this is unset to help ensure
Expand Down
2 changes: 1 addition & 1 deletion src/execution/exec/execution_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "metrics/metrics_store.h"
#include "parser/expression/constant_value_expression.h"
#include "replication/primary_replication_manager.h"
#include "self_driving/modeling/operating_unit.h"
#include "self_driving/modeling/execution_operating_unit.h"
#include "self_driving/modeling/operating_unit_util.h"
#include "storage/recovery/recovery_manager.h"
#include "transaction/transaction_context.h"
Expand Down
1 change: 1 addition & 0 deletions src/execution/exec/execution_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ void ExecutionSettings::UpdateFromSettingsManager(common::ManagedPointer<setting
number_of_parallel_execution_threads_ = settings->GetInt(settings::Param::num_parallel_execution_threads);
is_counters_enabled_ = settings->GetBool(settings::Param::counters_enable);
is_pipeline_metrics_enabled_ = settings->GetBool(settings::Param::pipeline_metrics_enable);
is_compilation_cache_enabled_ = settings->GetBool(settings::Param::enable_compilation_cache);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/execution/sql/sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include "execution/util/stage_timer.h"
#include "ips4o/ips4o.hpp"
#include "loggers/execution_logger.h"
#include "self_driving/modeling/operating_unit.h"
#include "self_driving/modeling/execution_operating_unit.h"
#include "self_driving/modeling/operating_unit_defs.h"

namespace noisepage::execution::sql {
Expand Down
Loading