Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Support BDBJEJournal with prefix #8672

Merged
merged 8 commits into from Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/journal/Journal.java
Expand Up @@ -63,4 +63,6 @@ public interface Journal {

// abort current batch
public void batchWriteAbort() throws InterruptedException, JournalException;

public String getPrefix();
}
Expand Up @@ -21,6 +21,7 @@

package com.starrocks.journal.bdbje;

import com.google.common.base.Strings;
import com.google.common.net.HostAndPort;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
Expand Down Expand Up @@ -52,6 +53,7 @@
import com.starrocks.ha.HAProtocol;
import com.starrocks.journal.JournalException;
import com.starrocks.server.GlobalStateMgr;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -562,10 +564,14 @@ public void removeDatabase(String dbName) {
}
}

public List<Long> getDatabaseNames() {
return getDatabaseNamesWithPrefix("");
}

// get journal db names and sort the names
// let the caller retry from outside.
// return null only if environment is closing
public List<Long> getDatabaseNames() {
public List<Long> getDatabaseNamesWithPrefix(String prefix) {
if (closing) {
return null;
}
Expand All @@ -578,8 +584,26 @@ public List<Long> getDatabaseNames() {
continue;
}

long db = Long.parseLong(name);
ret.add(db);
if (Strings.isNullOrEmpty(prefix)) { // default GlobalStateMgr db
if (StringUtils.isNumeric(name)) {
starrocks-xupeng marked this conversation as resolved.
Show resolved Hide resolved
long db = Long.parseLong(name);
ret.add(db);
} else {
// skip non default GlobalStateMgr db
}
} else {
if (name.startsWith(prefix)) {
String dbStr = name.substring(prefix.length());
starrocks-xupeng marked this conversation as resolved.
Show resolved Hide resolved
if (StringUtils.isNumeric(dbStr)) {
long db = Long.parseLong(dbStr);
ret.add(db);
} else {
// prefix does not fully match, ignore
}
} else {
// prefix does not match, ignore
}
}
}

Collections.sort(ret);
Expand Down
Expand Up @@ -56,6 +56,8 @@ public class BDBJEJournal implements Journal {
private BDBEnvironment bdbEnvironment = null;
private CloseSafeDatabase currentJournalDB;
protected Transaction currentTrasaction = null;
// used to distinguish different module's db in BDB, must be empty or end with '_'
private final String prefix;

// store uncommitted kv, used for rebuilding txn on commit fails
private List<Pair<DatabaseEntry, DatabaseEntry>> uncommitedDatas = new ArrayList<>();
Expand All @@ -64,10 +66,21 @@ public class BDBJEJournal implements Journal {
public BDBJEJournal(BDBEnvironment bdbEnvironment, CloseSafeDatabase currentJournalDB) {
this.bdbEnvironment = bdbEnvironment;
this.currentJournalDB = currentJournalDB;
this.prefix = "";
}

public BDBJEJournal(BDBEnvironment bdbEnvironment) {
this(bdbEnvironment, "" /* prefix */);
}

public BDBJEJournal(BDBEnvironment bdbEnvironment, String prefix) {
this.bdbEnvironment = bdbEnvironment;
assert prefix.isEmpty() || prefix.charAt(prefix.length() - 1) == '_';
this.prefix = prefix;
starrocks-xupeng marked this conversation as resolved.
Show resolved Hide resolved
}

public String getPrefix() {
return prefix;
}

/*
Expand All @@ -84,11 +97,16 @@ public void rollJournal(long newName) throws JournalException {
}

String currentDbName = currentJournalDB.getDb().getDatabaseName();
long currentName = Long.parseLong(currentDbName);
String currentIdStr = currentDbName;
if (!prefix.isEmpty()) { // remove prefix
currentIdStr = currentDbName.substring(prefix.length());
}
long currentName = Long.parseLong(currentIdStr);
long newNameVerify = currentName + currentJournalDB.getDb().count();
if (newName == newNameVerify) {
LOG.info("roll edit log. new db name is {}", newName);
currentJournalDB = bdbEnvironment.openDatabase(Long.toString(newName));
String newDbName = getFullDatabaseName(newName);
LOG.info("roll edit log. new db name is {}", newDbName);
currentJournalDB = bdbEnvironment.openDatabase(newDbName);
} else {
String msg = String.format("roll journal error! journalId and db journal numbers is not match. "
+ "journal id: %d, current db: %s, expected db count: %d",
Expand All @@ -100,7 +118,7 @@ public void rollJournal(long newName) throws JournalException {

@Override
public JournalCursor read(long fromKey, long toKey) throws JournalException {
return BDBJournalCursor.getJournalCursor(bdbEnvironment, fromKey, toKey);
return BDBJournalCursor.getJournalCursor(bdbEnvironment, prefix, fromKey, toKey);
}

@Override
Expand All @@ -109,13 +127,13 @@ public long getMaxJournalId() {
if (bdbEnvironment == null) {
return ret;
}
List<Long> dbNames = bdbEnvironment.getDatabaseNames();
List<Long> dbNames = bdbEnvironment.getDatabaseNamesWithPrefix(prefix);
if (dbNames == null || dbNames.size() == 0) {
return ret;
}

int index = dbNames.size() - 1;
String dbName = dbNames.get(index).toString();
String dbName = getFullDatabaseName(dbNames.get(index));
long dbNumberName = dbNames.get(index);
Database database = bdbEnvironment.openDatabase(dbName).getDb();
ret = dbNumberName + database.count() - 1;
Expand All @@ -129,12 +147,12 @@ public long getMinJournalId() {
if (bdbEnvironment == null) {
return ret;
}
List<Long> dbNames = bdbEnvironment.getDatabaseNames();
List<Long> dbNames = bdbEnvironment.getDatabaseNamesWithPrefix(prefix);
if (dbNames == null || dbNames.size() == 0) {
return ret;
}

String dbName = dbNames.get(0).toString();
String dbName = getFullDatabaseName(dbNames.get(0));
Database database = bdbEnvironment.openDatabase(dbName).getDb();
// The database is empty
if (database.count() == 0) {
Expand Down Expand Up @@ -167,7 +185,7 @@ public synchronized void open() throws InterruptedException, JournalException {
Thread.sleep(SLEEP_INTERVAL_SEC * 1000L);
}

dbNames = bdbEnvironment.getDatabaseNames();
dbNames = bdbEnvironment.getDatabaseNamesWithPrefix(prefix);
if (dbNames == null) { // bdb environment is closing
throw new JournalException("fail to get dbNames while open bdbje journal. will exit");
}
Expand All @@ -180,11 +198,11 @@ public synchronized void open() throws InterruptedException, JournalException {
* here we should open database with name image max journal id + 1.
* (default GlobalStateMgr.getCurrentState().getReplayedJournalId() is 0)
*/
dbName = Long.toString(GlobalStateMgr.getCurrentState().getReplayedJournalId() + 1);
dbName = getFullDatabaseName(GlobalStateMgr.getCurrentState().getReplayedJournalId() + 1);
LOG.info("the very first time to open bdb, dbname is {}", dbName);
} else {
// get last database as current journal database
dbName = dbNames.get(dbNames.size() - 1).toString();
dbName = getFullDatabaseName(dbNames.get(dbNames.size() - 1));
}

currentJournalDB = bdbEnvironment.openDatabase(dbName);
Expand All @@ -207,7 +225,7 @@ public synchronized void open() throws InterruptedException, JournalException {

@Override
public void deleteJournals(long deleteToJournalId) {
List<Long> dbNames = bdbEnvironment.getDatabaseNames();
List<Long> dbNames = bdbEnvironment.getDatabaseNamesWithPrefix(prefix);
if (dbNames == null) {
LOG.info("delete database names is null.");
return;
Expand All @@ -223,9 +241,9 @@ public void deleteJournals(long deleteToJournalId) {
for (int i = 1; i < dbNames.size(); i++) {
if (deleteToJournalId >= dbNames.get(i)) {
long name = dbNames.get(i - 1);
String stringName = Long.toString(name);
LOG.info("delete database name {}", stringName);
bdbEnvironment.removeDatabase(stringName);
String dbName = getFullDatabaseName(name);
LOG.info("delete database name {}", dbName);
bdbEnvironment.removeDatabase(dbName);
} else {
LOG.info("database name {} is larger than deleteToJournalId {}, not delete",
dbNames.get(i), deleteToJournalId);
Expand All @@ -236,7 +254,7 @@ public void deleteJournals(long deleteToJournalId) {

@Override
public long getFinalizedJournalId() {
List<Long> dbNames = bdbEnvironment.getDatabaseNames();
List<Long> dbNames = bdbEnvironment.getDatabaseNamesWithPrefix(prefix);
assert (dbNames != null);

String msg = "database names: ";
Expand All @@ -258,7 +276,7 @@ public List<Long> getDatabaseNames() {
return null;
}

return bdbEnvironment.getDatabaseNames();
return bdbEnvironment.getDatabaseNamesWithPrefix(prefix);
}

public BDBEnvironment getBdbEnvironment() {
Expand Down Expand Up @@ -459,4 +477,7 @@ public void batchWriteAbort() throws InterruptedException, JournalException {
}
}

private String getFullDatabaseName(long dbId) {
return prefix + Long.toString(dbId);
}
}
Expand Up @@ -51,20 +51,31 @@ public class BDBJournalCursor implements JournalCursor {
private List<Long> dbNames;
private CloseSafeDatabase database;
private int nextDbPositionIndex;
private String prefix;

public static BDBJournalCursor getJournalCursor(BDBEnvironment env, long fromKey, long toKey) throws
JournalException {
return new BDBJournalCursor(env, "", fromKey, toKey);
}

public static BDBJournalCursor getJournalCursor(BDBEnvironment env, String prefix, long fromKey, long toKey) throws
JournalException {
if (toKey < fromKey || fromKey < 0) {
throw new JournalException(String.format("Invalid key range! fromKey %s toKey %s", fromKey, toKey));
}
return new BDBJournalCursor(env, fromKey, toKey);
return new BDBJournalCursor(env, prefix, fromKey, toKey);
}

protected BDBJournalCursor(BDBEnvironment env, long fromKey, long toKey) throws JournalException {
this(env, "", fromKey, toKey);
}

protected BDBJournalCursor(BDBEnvironment env, String prefix, long fromKey, long toKey) throws JournalException {
this.environment = env;
this.toKey = toKey;
this.currentKey = fromKey;
this.dbNames = env.getDatabaseNames();
this.prefix = prefix;
this.dbNames = env.getDatabaseNamesWithPrefix(prefix);
if (dbNames == null) {
throw new JournalException("failed to get db names!");
}
Expand Down Expand Up @@ -101,7 +112,7 @@ protected void openDatabaseIfNecessary() throws InterruptedException, JournalExc
return;
}

String dbName = Long.toString(dbNames.get(nextDbPositionIndex));
String dbName = prefix + Long.toString(dbNames.get(nextDbPositionIndex));
JournalException exception = null;
for (int i = 0; i < RETRY_TIME; ++ i) {
try {
Expand Down
Expand Up @@ -20,7 +20,7 @@

public class JournalWriterTest {
@Mocked
private Journal journal = new BDBJEJournal(null, null);
private Journal journal = new BDBJEJournal(null);
private BlockingQueue<JournalTask> journalQueue = new ArrayBlockingQueue<>(100);
private JournalWriter writer = new JournalWriter(journal, journalQueue);
/**
Expand Down
Expand Up @@ -9,6 +9,8 @@
import com.starrocks.common.Config;
import com.starrocks.common.util.NetUtils;
import com.starrocks.journal.JournalException;
import mockit.Mock;
import mockit.MockUp;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -482,4 +484,49 @@ protected void testAddBadFollowerBase(boolean failover) throws Exception {
LOG.warn("==============> getDatabasesNames() {}", masterEnvironment.getDatabaseNames());
}
}

@Test
public void testGetDatabase() throws Exception {
String selfNodeHostPort = findUnbindHostPort();
BDBEnvironment environment = new BDBEnvironment(
createTmpDir(),
"standalone",
selfNodeHostPort,
selfNodeHostPort,
true);
environment.setup();

new MockUp<ReplicatedEnvironment>() {
starrocks-xupeng marked this conversation as resolved.
Show resolved Hide resolved
@Mock
public List<String> getDatabaseNames() {
List<String> list = new ArrayList<>();
list.add("1001");
list.add("2001");
list.add("aaa_3001");
list.add("aaa_4001");
list.add("aaa_bbb_");
return list;
}
};

List<Long> l1 = environment.getDatabaseNamesWithPrefix("");
Assert.assertEquals(2, l1.size());
Assert.assertEquals((Long) 1001L, l1.get(0));
Assert.assertEquals((Long) 2001L, l1.get(1));

List<Long> l2 = environment.getDatabaseNamesWithPrefix("aaa_");
Assert.assertEquals(2, l2.size());
Assert.assertEquals((Long) 3001L, l2.get(0));
Assert.assertEquals((Long) 4001L, l2.get(1));

// prefix not fully match
List<Long> l3 = environment.getDatabaseNamesWithPrefix("aaa");
Assert.assertEquals(0, l3.size());

// prefix not match
List<Long> l4 = environment.getDatabaseNamesWithPrefix("bbb_");
Assert.assertEquals(0, l4.size());

environment.close();
}
}