Skip to content
This repository has been archived by the owner on Sep 27, 2019. It is now read-only.

Commit

Permalink
Merge 89161d8 into 5686479
Browse files Browse the repository at this point in the history
  • Loading branch information
nwang57 committed May 18, 2018
2 parents 5686479 + 89161d8 commit a2e0c5b
Show file tree
Hide file tree
Showing 55 changed files with 2,161 additions and 827 deletions.
255 changes: 211 additions & 44 deletions src/catalog/abstract_catalog.cpp
Expand Up @@ -11,7 +11,9 @@
//===----------------------------------------------------------------------===//

#include "catalog/abstract_catalog.h"

#include "executor/plan_executor.h"
#include "codegen/buffering_consumer.h"
#include "common/internal_types.h"
#include "common/statement.h"

#include "catalog/catalog.h"
Expand Down Expand Up @@ -90,11 +92,6 @@ AbstractCatalog::AbstractCatalog(const std::string &catalog_table_ddl,
}
}

/*@brief insert tuple(reord) helper function
* @param tuple tuple to be inserted
* @param txn TransactionContext
* @return Whether insertion is Successful
*/
bool AbstractCatalog::InsertTuple(std::unique_ptr<storage::Tuple> tuple,
concurrency::TransactionContext *txn) {
if (txn == nullptr)
Expand Down Expand Up @@ -129,12 +126,53 @@ bool AbstractCatalog::InsertTuple(std::unique_ptr<storage::Tuple> tuple,
return this_p_status.m_result == peloton::ResultType::SUCCESS;
}

/*@brief Delete a tuple using index scan
* @param index_offset Offset of index for scan
* @param values Values for search
* @param txn TransactionContext
* @return Whether deletion is Successful
*/

bool AbstractCatalog::InsertTupleWithCompiledPlan(const std::vector<std::vector<
std::unique_ptr<expression::AbstractExpression>>> *insert_values,
concurrency::TransactionContext *txn) {
if (txn == nullptr)
throw CatalogException("Insert tuple requires transaction");

std::vector<std::string> columns;
std::shared_ptr<planner::InsertPlan> insert_plan(
new planner::InsertPlan(catalog_table_, &columns, insert_values));

// Bind the plan
planner::BindingContext context;
insert_plan->PerformBinding(context);

// Prepare a consumer to collect the result
codegen::BufferingConsumer buffer{{}, context};


bool cached;

codegen::QueryParameters parameters(*insert_plan, {});
std::unique_ptr<executor::ExecutorContext> executor_context(
new executor::ExecutorContext(txn, std::move(parameters)));

// search for query
codegen::Query *query = codegen::QueryCache::Instance().Find(insert_plan);
std::unique_ptr<codegen::Query> compiled_query(nullptr);
cached = (query != nullptr);

// if not cached, compile the query and save it into cache
executor::ExecutionResult ret;
if (!cached) {
compiled_query = codegen::QueryCompiler().Compile(
*insert_plan, executor_context->GetParams().GetQueryParametersMap(),
buffer);
query = compiled_query.get();
codegen::QueryCache::Instance().Add(insert_plan, std::move(compiled_query));
}

query->Execute(std::move(executor_context), buffer,
[&ret](executor::ExecutionResult result) { ret = result; });

return ret.m_result == peloton::ResultType::SUCCESS;
}


bool AbstractCatalog::DeleteWithIndexScan(
oid_t index_offset, std::vector<type::Value> values,
concurrency::TransactionContext *txn) {
Expand Down Expand Up @@ -178,13 +216,53 @@ bool AbstractCatalog::DeleteWithIndexScan(
return status;
}

/*@brief Index scan helper function
* @param column_offsets Column ids for search (projection)
* @param index_offset Offset of index for scan
* @param values Values for search
* @param txn TransactionContext
* @return Unique pointer of vector of logical tiles
*/
bool AbstractCatalog::DeleteWithCompiledSeqScan(
std::vector<oid_t> column_offsets,
expression::AbstractExpression *predicate,
concurrency::TransactionContext *txn) {
if (txn == nullptr)
throw CatalogException("Delete tuple requires transaction");

std::shared_ptr<planner::DeletePlan> delete_plan{
new planner::DeletePlan(catalog_table_)};

std::unique_ptr<planner::AbstractPlan> scan{new planner::SeqScanPlan(
catalog_table_, predicate, column_offsets)};
delete_plan->AddChild(std::move(scan));

// Do binding
planner::BindingContext context;
delete_plan->PerformBinding(context);

codegen::BufferingConsumer buffer{column_offsets, context};

bool cached;

codegen::QueryParameters parameters(*delete_plan, {});
std::unique_ptr<executor::ExecutorContext> executor_context(
new executor::ExecutorContext(txn, std::move(parameters)));

// search for query
codegen::Query *query = codegen::QueryCache::Instance().Find(delete_plan);;
std::unique_ptr<codegen::Query> compiled_query(nullptr);
cached = (query != nullptr);

// if not cached, compile the query and save it into cache
executor::ExecutionResult ret;
if (!cached) {
compiled_query = codegen::QueryCompiler().Compile(
*delete_plan, executor_context->GetParams().GetQueryParametersMap(),
buffer);
query = compiled_query.get();
codegen::QueryCache::Instance().Add(delete_plan, std::move(compiled_query));
}

query->Execute(std::move(executor_context), buffer,
[&ret](executor::ExecutionResult result) { ret = result; });

return ret.m_result == peloton::ResultType::SUCCESS;
}

std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>>
AbstractCatalog::GetResultWithIndexScan(
std::vector<oid_t> column_offsets, oid_t index_offset,
Expand Down Expand Up @@ -226,15 +304,7 @@ AbstractCatalog::GetResultWithIndexScan(
return result_tiles;
}

/*@brief Sequential scan helper function
* NOTE: try to use efficient index scan instead of sequential scan, but you
* shouldn't build too many indexes on one catalog table
* @param column_offsets Column ids for search (projection)
* @param predicate predicate for this sequential scan query
* @param txn TransactionContext
*
* @return Unique pointer of vector of logical tiles
*/

std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>>
AbstractCatalog::GetResultWithSeqScan(std::vector<oid_t> column_offsets,
expression::AbstractExpression *predicate,
Expand All @@ -261,15 +331,48 @@ AbstractCatalog::GetResultWithSeqScan(std::vector<oid_t> column_offsets,
return result_tiles;
}

/*@brief Add index on catalog table
* @param key_attrs indexed column offset(position)
* @param index_oid index id(global unique)
* @param index_name index name(global unique)
* @param index_constraint index constraints
* @return Unique pointer of vector of logical tiles
* Note: Use catalog::Catalog::CreateIndex() if you can, only ColumnCatalog and
* IndexCatalog should need this
*/

std::vector<codegen::WrappedTuple>
AbstractCatalog::GetResultWithCompiledSeqScan(
std::vector<oid_t> column_offsets,
expression::AbstractExpression *predicate,
concurrency::TransactionContext *txn) const {
if (txn == nullptr) throw CatalogException("Scan table requires transaction");

// Create sequential scan
auto plan_ptr = std::make_shared<planner::SeqScanPlan>(
catalog_table_, predicate, column_offsets);
planner::BindingContext scan_context;
plan_ptr->PerformBinding(scan_context);

// Create consumer
codegen::BufferingConsumer buffer{column_offsets, scan_context};
bool cached;

codegen::QueryParameters parameters(*plan_ptr, {});
std::unique_ptr<executor::ExecutorContext> executor_context(
new executor::ExecutorContext(txn, std::move(parameters)));

// search for query
codegen::Query *query = codegen::QueryCache::Instance().Find(plan_ptr);
std::unique_ptr<codegen::Query> compiled_query(nullptr);
cached = (query != nullptr);

// if not cached, compile the query and save it into cache
if (!cached) {
compiled_query = codegen::QueryCompiler().Compile(
*plan_ptr, executor_context->GetParams().GetQueryParametersMap(),
buffer);
query = compiled_query.get();
codegen::QueryCache::Instance().Add(plan_ptr, std::move(compiled_query));
}

query->Execute(std::move(executor_context), buffer,
[](executor::ExecutionResult result) { return result; });

return buffer.GetOutputTuples();
}

void AbstractCatalog::AddIndex(const std::vector<oid_t> &key_attrs,
oid_t index_oid, const std::string &index_name,
IndexConstraintType index_constraint) {
Expand Down Expand Up @@ -298,13 +401,77 @@ void AbstractCatalog::AddIndex(const std::vector<oid_t> &key_attrs,
index_name.c_str(), (int)catalog_table_->GetOid());
}

/*@brief Update specific columns using index scan
* @param update_columns Columns to be updated
* @param update_values Values to be updated
* @param scan_values Value to be scaned (used in index scan)
* @param index_offset Offset of index for scan
* @return true if successfully executes
*/
bool AbstractCatalog::UpdateWithCompiledSeqScan(
std::vector<oid_t> update_columns, std::vector<type::Value> update_values,
std::vector<oid_t> column_offsets, expression::AbstractExpression *predicate,
concurrency::TransactionContext *txn) {
if (txn == nullptr) throw CatalogException("Scan table requires transaction");
// Construct update executor
TargetList target_list;
DirectMapList direct_map_list;

size_t column_count = catalog_table_->GetSchema()->GetColumnCount();
for (size_t col_itr = 0; col_itr < column_count; col_itr++) {
// Skip any column for update
if (std::find(std::begin(update_columns), std::end(update_columns),
col_itr) == std::end(update_columns)) {
direct_map_list.emplace_back(col_itr, std::make_pair(0, col_itr));
}
}

PELOTON_ASSERT(update_columns.size() == update_values.size());
for (size_t i = 0; i < update_values.size(); i++) {
planner::DerivedAttribute update_attribute{
new expression::ConstantValueExpression(update_values[i])};
target_list.emplace_back(update_columns[i], update_attribute);
}

std::unique_ptr<const planner::ProjectInfo> project_info(
new planner::ProjectInfo(std::move(target_list),
std::move(direct_map_list)));

std::shared_ptr<planner::UpdatePlan> update_plan{
new planner::UpdatePlan(catalog_table_, std::move(project_info))
};

std::unique_ptr<planner::AbstractPlan> scan{new planner::SeqScanPlan(
catalog_table_, predicate, column_offsets)};
update_plan->AddChild(std::move(scan));

// Do binding
planner::BindingContext context;
update_plan->PerformBinding(context);

codegen::BufferingConsumer buffer{column_offsets, context};

bool cached;

codegen::QueryParameters parameters(*update_plan, {});
std::unique_ptr<executor::ExecutorContext> executor_context(
new executor::ExecutorContext(txn, std::move(parameters)));

// search for query
codegen::Query *query = codegen::QueryCache::Instance().Find(update_plan);;
std::unique_ptr<codegen::Query> compiled_query(nullptr);
cached = (query != nullptr);

// if not cached, compile the query and save it into cache
executor::ExecutionResult ret;
if (!cached) {
compiled_query = codegen::QueryCompiler().Compile(
*update_plan, executor_context->GetParams().GetQueryParametersMap(),
buffer);
query = compiled_query.get();
codegen::QueryCache::Instance().Add(update_plan, std::move(compiled_query));
}

query->Execute(std::move(executor_context), buffer,
[&ret](executor::ExecutionResult result) { ret = result; });

return ret.m_result == peloton::ResultType::SUCCESS;
}


bool AbstractCatalog::UpdateWithIndexScan(
std::vector<oid_t> update_columns, std::vector<type::Value> update_values,
std::vector<type::Value> scan_values, oid_t index_offset,
Expand Down

0 comments on commit a2e0c5b

Please sign in to comment.