Skip to content

Commit

Permalink
#432: Add "transactions" and "is_unique" columns to system_schema.ind…
Browse files Browse the repository at this point in the history
…exes

Summary:
- Add "transactions" and "is_unique" columns to system_schema.indexes.
- Add generic system table schema upgrade mechanism in CatalogManager. It now validates schemas of existing YCQL system tables and update them as needed.

Test Plan:
- TestIndex.testCreateIndex
- Create a cluster with old system_schema.indexes schema. Restart cluster with new yb-master and verify the schema is updated.

Reviewers: bogdan, mihnea

Reviewed By: mihnea

Subscribers: yql, bharat

Differential Revision: https://phabricator.dev.yugabyte.com/D5315
  • Loading branch information
robertpang committed Aug 14, 2018
1 parent 898e432 commit 2a4fb09
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 45 deletions.
44 changes: 31 additions & 13 deletions java/yb-cql/src/test/java/org/yb/cql/TestIndex.java
Expand Up @@ -28,6 +28,7 @@
import org.yb.minicluster.MiniYBCluster;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -82,10 +83,8 @@ public void testCreateIndex() throws Exception {
table.getIndex("i2").asCQLQuery());

// Verify the covering columns.
assertIndexOptions("test_create_index", "i1",
"Row[{target=r1, r2, h1, h2, include=c1, c4}]");
assertIndexOptions("test_create_index", "i2",
"Row[{target=c4, h1, h2, r1, r2, include=c1, c2}]");
assertIndexOptions("test_create_index", "i1", "r1, r2, h1, h2", "c1, c4");
assertIndexOptions("test_create_index", "i2", "c4, h1, h2, r1, r2", "c1, c2");

// Test retrieving non-existent index.
assertNull(table.getIndex("i3"));
Expand All @@ -108,7 +107,7 @@ public void testCreateIndex() throws Exception {

// Test create index if not exists. Verify i1 is still the same.
session.execute("create index if not exists i1 on test_create_index (r1) include (c1);");
assertIndexOptions("test_create_index", "i1", "Row[{target=r1, r2, h1, h2, include=c1, c4}]");
assertIndexOptions("test_create_index", "i1", "r1, r2, h1, h2", "c1, c4");

// Create another test table.
session.execute("create table test_create_index_2 " +
Expand Down Expand Up @@ -211,7 +210,7 @@ public void testDropIndex() throws Exception {
.getTable("test_drop_index");
assertEquals("CREATE INDEX i1 ON cql_test_keyspace.test_drop_index (r1, r2, h1, h2);",
table.getIndex("i1").asCQLQuery());
assertIndexOptions("test_drop_index", "i1", "Row[{target=r1, r2, h1, h2, include=c1, c2}]");
assertIndexOptions("test_drop_index", "i1", "r1, r2, h1, h2", "c1, c2");

// Drop test index.
session.execute("drop index i1;");
Expand All @@ -236,8 +235,7 @@ public void testDropIndex() throws Exception {
table = cluster.getMetadata().getKeyspace(DEFAULT_TEST_KEYSPACE).getTable("test_drop_index");
assertEquals("CREATE INDEX i1 ON cql_test_keyspace.test_drop_index (c1, c2, h1, h2, r1, r2);",
table.getIndex("i1").asCQLQuery());
assertIndexOptions("test_drop_index", "i1",
"Row[{target=c1, c2, h1, h2, r1, r2, include=c3, c4}]");
assertIndexOptions("test_drop_index", "i1", "c1, c2, h1, h2, r1, r2", "c3, c4");
}

@Test
Expand All @@ -253,8 +251,8 @@ public void testDropTableCascade() throws Exception {
// Create test index.
session.execute("create index i1 on test_drop_cascade (r1, r2) include (c1, c2);");
session.execute("create index i2 on test_drop_cascade (c4) include (c1);");
assertIndexOptions("test_drop_cascade", "i1", "Row[{target=r1, r2, h1, h2, include=c1, c2}]");
assertIndexOptions("test_drop_cascade", "i2", "Row[{target=c4, h1, h2, r1, r2, include=c1}]");
assertIndexOptions("test_drop_cascade", "i1", "r1, r2, h1, h2", "c1, c2");
assertIndexOptions("test_drop_cascade", "i2", "c4, h1, h2, r1, r2", "c1");

// Drop test table. Verify the index is cascade-deleted.
session.execute("drop table test_drop_cascade;");
Expand All @@ -266,11 +264,18 @@ public void testDropTableCascade() throws Exception {
DEFAULT_TEST_KEYSPACE, "test_drop_cascade", "i2").one());
}

private void assertIndexOptions(String table, String index, String options) throws Exception {
Row row = session.execute("select options from system_schema.indexes " +
private void assertIndexOptions(String table, String index, String target, String include)
throws Exception {
Row row = session.execute("select options, transactions, is_unique " +
"from system_schema.indexes " +
"where keyspace_name = ? and table_name = ? and index_name = ?",
DEFAULT_TEST_KEYSPACE, table, index).one();
assertEquals(options, row.toString());
Map<String, String> options = row.getMap("options", String.class, String.class);
assertEquals(target, options.get("target"));
assertEquals(include, options.get("include"));
Map<String, String> transactions = row.getMap("transactions", String.class, String.class);
assertEquals("true", transactions.get("enabled"));
assertFalse(row.getBool("is_unique"));
}

private void assertIndexColumns(String index, String columns) throws Exception {
Expand Down Expand Up @@ -447,6 +452,13 @@ public void testWeakIndexBatchUpdate() throws Exception {
"with transactions = {'enabled' : false, " +
"'consistency_level' : 'user_enforced'};");

assertQuery(String.format("select options, transactions from system_schema.indexes where "+
"keyspace_name = '%s' and " +
"table_name = 'test_batch' and " +
"index_name = 'test_batch_by_v';",
DEFAULT_TEST_KEYSPACE),
"Row[{target=v, k}, {enabled=false, consistency_level=user_enforced}]");

final int BATCH_SIZE = 20;
final int KEY_COUNT = 1000;

Expand Down Expand Up @@ -665,6 +677,12 @@ public void testUniqueIndex() throws Exception {
session.execute("create unique index test_unique_by_v1 on test_unique (v1);");
session.execute("create unique index test_unique_by_v2_v3 on test_unique (v2, v3);");

assertQuery(String.format("select is_unique from system_schema.indexes where "+
"keyspace_name = '%s' and table_name = 'test_unique';",
DEFAULT_TEST_KEYSPACE),
"Row[true]" +
"Row[true]");

// Test unique constraint on NULL values in v2 and v3.
session.execute(
"insert into test_unique (k, v1) values (1, 1);");
Expand Down
62 changes: 38 additions & 24 deletions src/yb/master/catalog_manager.cc
Expand Up @@ -282,9 +282,11 @@ class TableLoader : public Visitor<PersistentTableInfo> {
catalog_manager_->table_names_map_[{l->data().namespace_id(), l->data().name()}] = table;
}

l->Commit();

LOG(INFO) << "Loaded metadata for table " << table->ToString();
VLOG(1) << "Metadata for table " << table->ToString() << ": " << metadata.ShortDebugString();
l->Commit();

return Status::OK();
}

Expand Down Expand Up @@ -370,7 +372,7 @@ class TabletLoader : public Visitor<PersistentTabletInfo> {

LOG(INFO) << "Loaded metadata for tablet " << tablet_id
<< " (first table " << first_table->ToString() << ")";
VLOG(2) << "Metadata for tablet " << tablet_id << ": " << metadata.ShortDebugString();
VLOG(1) << "Metadata for tablet " << tablet_id << ": " << metadata.ShortDebugString();

return Status::OK();
}
Expand Down Expand Up @@ -404,9 +406,11 @@ class NamespaceLoader : public Visitor<PersistentNamespaceInfo> {
catalog_manager_->namespace_names_map_[l->data().pb.name()] = ns;
}

LOG(INFO) << "Loaded metadata for namespace " << l->data().pb.name() << " (id="
<< ns_id << "): " << ns->ToString() << ": " << metadata.ShortDebugString();
l->Commit();

LOG(INFO) << "Loaded metadata for namespace " << ns->ToString();
VLOG(1) << "Metadata for namespace " << ns->ToString() << ": " << metadata.ShortDebugString();

return Status::OK();
}

Expand Down Expand Up @@ -439,9 +443,11 @@ class UDTypeLoader : public Visitor<PersistentUDTypeInfo> {
catalog_manager_->udtype_names_map_[{l->data().namespace_id(), l->data().name()}] = udtype;
}

l->Commit();

LOG(INFO) << "Loaded metadata for type " << udtype->ToString();
VLOG(1) << "Metadata for type " << udtype->ToString() << ": " << metadata.ShortDebugString();
l->Commit();

return Status::OK();
}

Expand Down Expand Up @@ -529,9 +535,11 @@ class RoleLoader : public Visitor<PersistentRoleInfo> {
l->mutable_data()->pb.CopyFrom(metadata);
catalog_manager_->roles_map_[role_name] = role;

LOG(INFO) << "Loaded metadata for role " << l->data().pb.role()
<< ": " << metadata.ShortDebugString();
l->Commit();

LOG(INFO) << "Loaded metadata for role " << role->id();
VLOG(1) << "Metadata for role " << role->id() << ": " << metadata.ShortDebugString();

return Status::OK();
}

Expand Down Expand Up @@ -1075,35 +1083,43 @@ Status CatalogManager::PrepareSystemTable(const TableName& table_name,
const NamespaceId& namespace_id,
const Schema& schema,
YQLVirtualTable* vtable) {
std::unique_ptr<YQLVirtualTable> yql_storage(vtable);
// Verify we have the catalog manager lock.
if (!lock_.is_locked()) {
return STATUS(IllegalState, "We don't have the catalog manager lock!");
}

std::shared_ptr<SystemTablet> system_tablet;
std::unique_ptr<YQLVirtualTable> yql_storage(vtable);

scoped_refptr<TableInfo> table = FindPtrOrNull(table_names_map_,
std::make_pair(namespace_id, table_name));
bool create_table = true;
if (table != nullptr) {
LOG(INFO) << Substitute("Table $0.$1 already created", namespace_name, table_name);

Schema persisted_schema;
RETURN_NOT_OK(table->GetSchema(&persisted_schema));
if (!persisted_schema.Equals(schema)) {
LOG(INFO) << Substitute("Updating schema of $0.$1 ...", namespace_name, table_name);
auto l = table->LockForWrite();
RETURN_NOT_OK(SchemaToPB(schema, l->mutable_data()->pb.mutable_schema()));
l->mutable_data()->pb.set_version(l->data().pb.version() + 1);

// Update sys-catalog with the new table schema.
RETURN_NOT_OK(sys_catalog_->UpdateItem(table.get()));
l->Commit();
}

// There might have been a failure after writing the table but before writing the tablets. As
// a result, if we don't find any tablets, we try to create the tablets only again.
vector<scoped_refptr<TabletInfo>> tablets;
table->GetAllTablets(&tablets);
if (!tablets.empty()) {
LOG(INFO) << strings::Substitute("Table $0.$1 already created, skipping initialization",
namespace_name, table_name);
// Initialize the appropriate system tablet.
DCHECK_EQ(1, tablets.size());
system_tablet.reset(
new SystemTablet(schema, std::move(yql_storage), tablets[0]->tablet_id()));
return sys_tables_handler_.AddTablet(system_tablet);
return sys_tables_handler_.AddTablet(
std::make_shared<SystemTablet>(schema, std::move(yql_storage), tablets[0]->tablet_id()));
} else {
// Table is already created, only need to create tablets now.
LOG(INFO) << strings::Substitute("Table $0.$1 already created, but tablets have not been "
"created. Creating only the respective tablets...",
namespace_name, table_name);
LOG(INFO) << Substitute("Creating tablets for $0.$1 ...", namespace_name, table_name);
create_table = false;
}
}
Expand All @@ -1126,8 +1142,8 @@ Status CatalogManager::PrepareSystemTable(const TableName& table_name,

RETURN_NOT_OK(CreateTableInMemory(req, schema, partition_schema, false, namespace_id,
partitions, nullptr, &tablets, nullptr, &table));
LOG(INFO) << "Inserted new table info into CatalogManager maps: "
<< namespace_name << "." << table_name;
LOG(INFO) << Substitute("Inserted new $0.$1 table info into CatalogManager maps",
namespace_name, table_name);

// Update the on-disk table state to "running".
table->mutable_metadata()->mutable_dirty()->pb.set_state(SysTablesEntryPB::RUNNING);
Expand Down Expand Up @@ -1161,10 +1177,8 @@ Status CatalogManager::PrepareSystemTable(const TableName& table_name,
}

// Finally create the appropriate tablet object.
system_tablet.reset(new SystemTablet(schema, std::move(yql_storage), tablets[0]->tablet_id()));
RETURN_NOT_OK(sys_tables_handler_.AddTablet(system_tablet));

return Status::OK();
return sys_tables_handler_.AddTablet(
std::make_shared<SystemTablet>(schema, std::move(yql_storage), tablets[0]->tablet_id()));
}

Status CatalogManager::PrepareNamespace(const NamespaceName& name, const NamespaceId& id) {
Expand Down
26 changes: 24 additions & 2 deletions src/yb/master/yql_indexes_vtable.cc
Expand Up @@ -92,8 +92,10 @@ Status YQLIndexesVTable::RetrieveData(const QLReadRequestPB& request,
options.set_map_value();
options.add_map_key()->set_string_value("target");
options.add_map_value()->set_string_value(target);
options.add_map_key()->set_string_value("include");
options.add_map_value()->set_string_value(include);
if (!include.empty()) {
options.add_map_key()->set_string_value("include");
options.add_map_value()->set_string_value(include);
}
RETURN_NOT_OK(SetColumnValue(kOptions, options.value(), &row));

// Create appropriate table uuids.
Expand All @@ -103,6 +105,23 @@ Status YQLIndexesVTable::RetrieveData(const QLReadRequestPB& request,
RETURN_NOT_OK(SetColumnValue(kTableId, uuid, &row));
RETURN_NOT_OK(uuid.FromHexString(table->id()));
RETURN_NOT_OK(SetColumnValue(kIndexId, uuid, &row));

Schema schema;
RETURN_NOT_OK(table->GetSchema(&schema));
const auto & table_properties = schema.table_properties();
QLValue txn;
txn.set_map_value();
txn.add_map_key()->set_string_value("enabled");
txn.add_map_value()->set_string_value(table_properties.is_transactional() ? "true" : "false");
if (table_properties.consistency_level() == YBConsistencyLevel::USER_ENFORCED) {
// If consistency level is user-encorced, show it also. Omit the other consistency levels
// which are not recognized by "CREATE INDEX" syntax.
txn.add_map_key()->set_string_value("consistency_level");
txn.add_map_value()->set_string_value("user_enforced");
}
RETURN_NOT_OK(SetColumnValue(kTransactions, txn.value(), &row));

RETURN_NOT_OK(SetColumnValue(kIsUnique, table->is_unique_index(), &row));
}

return Status::OK();
Expand All @@ -118,6 +137,9 @@ Schema YQLIndexesVTable::CreateSchema() const {
QLType::CreateTypeMap(DataType::STRING, DataType::STRING)));
CHECK_OK(builder.AddColumn(kTableId, QLType::Create(DataType::UUID)));
CHECK_OK(builder.AddColumn(kIndexId, QLType::Create(DataType::UUID)));
CHECK_OK(builder.AddColumn(kTransactions,
QLType::CreateTypeMap(DataType::STRING, DataType::STRING)));
CHECK_OK(builder.AddColumn(kIsUnique, QLType::Create(DataType::BOOL)));

return builder.Build();
}
Expand Down
2 changes: 2 additions & 0 deletions src/yb/master/yql_indexes_vtable.h
Expand Up @@ -36,6 +36,8 @@ class YQLIndexesVTable : public YQLVirtualTable {
static constexpr const char* const kOptions = "options";
static constexpr const char* const kTableId = "table_id";
static constexpr const char* const kIndexId = "index_id";
static constexpr const char* const kTransactions = "transactions";
static constexpr const char* const kIsUnique = "is_unique";
};

} // namespace master
Expand Down
12 changes: 6 additions & 6 deletions src/yb/master/yql_tables_vtable.cc
Expand Up @@ -76,12 +76,12 @@ Status YQLTablesVTable::RetrieveData(const QLReadRequestPB& request,
schema.table_properties().DefaultTimeToLive() / MonoTime::kMillisecondsPerSecond);
RETURN_NOT_OK(SetColumnValue(kDefaultTimeToLive, cql_ttl, &row));

QLValue dtxn;
dtxn.set_map_value();
dtxn.add_map_key()->set_string_value("enabled");
dtxn.add_map_value()->set_string_value(schema.table_properties().is_transactional() ?
"true" : "false");
RETURN_NOT_OK(SetColumnValue(kTransactions, dtxn.value(), &row));
QLValue txn;
txn.set_map_value();
txn.add_map_key()->set_string_value("enabled");
txn.add_map_value()->set_string_value(schema.table_properties().is_transactional() ?
"true" : "false");
RETURN_NOT_OK(SetColumnValue(kTransactions, txn.value(), &row));
}

return Status::OK();
Expand Down

0 comments on commit 2a4fb09

Please sign in to comment.