Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ public TestStreaming() throws Exception {


//1) Start from a clean slate (metastore)
TxnDbUtil.cleanDb();
TxnDbUtil.prepDb();
TxnDbUtil.cleanDb(conf);
TxnDbUtil.prepDb(conf);

//2) obtain metastore clients
msClient = new HiveMetaStoreClient(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public HiveConf newHiveConf(String metaStoreUri) {

public void prepareTransactionDatabase(HiveConf conf) throws Exception {
TxnDbUtil.setConfValues(conf);
TxnDbUtil.cleanDb();
TxnDbUtil.prepDb();
TxnDbUtil.cleanDb(conf);
TxnDbUtil.prepDb(conf);
}

public IMetaStoreClient newMetaStoreClient(HiveConf conf) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,12 @@ public void stringifyValidTxns() throws Exception {

@Before
public void setUp() throws Exception {
TxnDbUtil.prepDb();
TxnDbUtil.prepDb(conf);
client = new HiveMetaStoreClient(conf);
}

@After
public void tearDown() throws Exception {
TxnDbUtil.cleanDb();
TxnDbUtil.cleanDb(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public String toString() {

@Before
public void setUp() throws Exception {
tearDown();
hiveConf = new HiveConf(this.getClass());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
Expand All @@ -107,7 +106,7 @@ public void setUp() throws Exception {
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
TxnDbUtil.setConfValues(hiveConf);
TxnDbUtil.prepDb();
TxnDbUtil.prepDb(hiveConf);
File f = new File(TEST_WAREHOUSE_DIR);
if (f.exists()) {
FileUtil.fullyDelete(f);
Expand Down Expand Up @@ -152,7 +151,7 @@ public void tearDown() throws Exception {
d.close();
d = null;
}
TxnDbUtil.cleanDb();
TxnDbUtil.cleanDb(hiveConf);
} finally {
FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ public void setup() throws Exception {
//"org.apache.hadoop.hive.ql.io.HiveInputFormat"

TxnDbUtil.setConfValues(hiveConf);
TxnDbUtil.cleanDb();
TxnDbUtil.prepDb();
TxnDbUtil.cleanDb(hiveConf);
TxnDbUtil.prepDb(hiveConf);

conf = hiveConf;
msClient = new HiveMetaStoreClient(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
import org.apache.hadoop.hive.metastore.security.TUGIContainingTransport;
import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
Expand Down Expand Up @@ -7624,6 +7625,7 @@ public void run() {
+ e.getMessage(), e);
}
}
ThreadPool.shutdown();
}
});

Expand Down Expand Up @@ -8002,6 +8004,14 @@ private static void startHouseKeeperService(HiveConf conf) throws Exception {
startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService"));
startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService"));
startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidWriteSetService"));

ThreadPool.initialize(conf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this better and if it is, why only this service is converted to use the pool?
One concern is that with a pool that has > 1 thread, if some task is taking a long time (for whatever reason) you may end up the pool launching another concurrent one. I don't think any Acid*Service are designed with that in mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just haven't gotten to the others yet.

It's better because it gives you one pool to manage the size of. It is also a simpler design with each thread not being long running.

scheduleAtFixedRate will not result in multiple running instances of a given task. See the javadocs.

RunnableConfigurable rc = new AcidOpenTxnsCounterService();
rc.setConf(conf);
ThreadPool.getPool().scheduleAtFixedRate(rc, 100, MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.COUNT_OPEN_TXNS_INTERVAL, TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);

}
private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception {
//todo: when metastore adds orderly-shutdown logic, houseKeeper.stop()
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -450,13 +450,13 @@ public void addDynamicPartitions() throws Exception {

@Before
public void setUp() throws Exception {
TxnDbUtil.prepDb();
TxnDbUtil.prepDb(conf);
txnHandler = TxnUtils.getTxnStore(conf);
}

@After
public void tearDown() throws Exception {
TxnDbUtil.cleanDb();
TxnDbUtil.cleanDb(conf);
}

private long openTxn() throws MetaException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1543,13 +1543,13 @@ private void updateLocks(Connection conn) throws SQLException {

@Before
public void setUp() throws Exception {
TxnDbUtil.prepDb();
TxnDbUtil.prepDb(conf);
txnHandler = TxnUtils.getTxnStore(conf);
}

@After
public void tearDown() throws Exception {
TxnDbUtil.cleanDb();
TxnDbUtil.cleanDb(conf);
}

private long openTxn() throws MetaException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@ public void setUp() throws Exception {
conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE, "None");
TxnDbUtil.setConfValues(conf);
try {
TxnDbUtil.prepDb();
TxnDbUtil.prepDb(conf);
} catch (SQLException e) {
// Usually this means we've already created the tables, so clean them and then try again
tearDown();
TxnDbUtil.prepDb();
TxnDbUtil.prepDb(conf);
}
txnHandler = TxnUtils.getTxnStore(conf);
}

@After
public void tearDown() throws Exception {
TxnDbUtil.cleanDb();
TxnDbUtil.cleanDb(conf);
}

@Test
Expand Down
23 changes: 2 additions & 21 deletions ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,6 @@ String getTestDataDir() {
return TEST_DATA_DIR;
}

private void dropTables() throws Exception {
for(Table t : Table.values()) {
runStatementOnDriver("drop table if exists " + t);
}
}
@After
public void tearDown() throws Exception {
try {
if (d != null) {
dropTables();
d.destroy();
d.close();
d = null;
}
} finally {
TxnDbUtil.cleanDb();
FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
}
}
@Test//todo: what is this for?
public void testInsertOverwrite() throws Exception {
runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2);
Expand Down Expand Up @@ -388,7 +369,7 @@ public void testTimeOutReaper() throws Exception {
Assert.assertNotNull(txnInfo);
Assert.assertEquals(14, txnInfo.getId());
Assert.assertEquals(TxnState.OPEN, txnInfo.getState());
String s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
String s =TxnDbUtil.queryToString(hiveConf, "select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
String[] vals = s.split("\\s+");
Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
long lastHeartbeat = Long.parseLong(vals[1]);
Expand All @@ -412,7 +393,7 @@ public void testTimeOutReaper() throws Exception {
TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());

//should've done several heartbeats
s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
s =TxnDbUtil.queryToString(hiveConf, "select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
vals = s.split("\\s+");
Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
Assert.assertTrue("Heartbeat didn't progress: (old,new) (" + lastHeartbeat + "," + vals[1]+ ")",
Expand Down
16 changes: 10 additions & 6 deletions ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService;
import org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService;
import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService;
import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
import org.apache.hadoop.hive.ql.txn.compactor.Initiator;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
Expand All @@ -70,6 +70,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* TODO: this should be merged with TestTxnCommands once that is checked in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not. These are long running test suites - combining them will make it worse and PTest is unable to parallelize anything within one suite

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

* specifically the tests; the supporting code here is just a clone of TestTxnCommands
*/
public class TestTxnCommands2 {
static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands2.class);
protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
Expand Down Expand Up @@ -118,7 +122,6 @@ public void setUp() throws Exception {
}

protected void setUpWithTableProperties(String tableProperties) throws Exception {
tearDown();
hiveConf = new HiveConf(this.getClass());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
Expand All @@ -131,7 +134,7 @@ protected void setUpWithTableProperties(String tableProperties) throws Exception
hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);

TxnDbUtil.setConfValues(hiveConf);
TxnDbUtil.prepDb();
TxnDbUtil.prepDb(hiveConf);
File f = new File(TEST_WAREHOUSE_DIR);
if (f.exists()) {
FileUtil.fullyDelete(f);
Expand Down Expand Up @@ -168,7 +171,7 @@ public void tearDown() throws Exception {
d.close();
d = null;
}
TxnDbUtil.cleanDb();
TxnDbUtil.cleanDb(hiveConf);
} finally {
FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
}
Expand Down Expand Up @@ -1284,7 +1287,8 @@ public void testOpenTxnsCounter() throws Exception {
OpenTxnsResponse openTxnsResponse = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));

AcidOpenTxnsCounterService openTxnsCounterService = new AcidOpenTxnsCounterService();
runHouseKeeperService(openTxnsCounterService, hiveConf); // will update current number of open txns to 3
openTxnsCounterService.setConf(hiveConf);
openTxnsCounterService.run(); // will update current number of open txns to 3

MetaException exception = null;
// This should fail once it finds out the threshold has been reached
Expand All @@ -1301,7 +1305,7 @@ public void testOpenTxnsCounter() throws Exception {
for (long txnid : openTxnsResponse.getTxn_ids()) {
txnHandler.commitTxn(new CommitTxnRequest(txnid));
}
runHouseKeeperService(openTxnsCounterService, hiveConf); // will update current number of open txns back to 0
openTxnsCounterService.run(); // will update current number of open txns back to 0
exception = null;
try {
txnHandler.openTxns(new OpenTxnRequest(1, "him", "localhost"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public void setUp() throws Exception {
setUpInternal();
}
void setUpInternal() throws Exception {
tearDown();
hiveConf = new HiveConf(this.getClass());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
Expand All @@ -65,7 +64,7 @@ void setUpInternal() throws Exception {
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
TxnDbUtil.setConfValues(hiveConf);
TxnDbUtil.prepDb();
TxnDbUtil.prepDb(hiveConf);
File f = new File(getWarehouseDir());
if (f.exists()) {
FileUtil.fullyDelete(f);
Expand Down Expand Up @@ -99,7 +98,7 @@ public void tearDown() throws Exception {
d = null;
}
} finally {
TxnDbUtil.cleanDb();
TxnDbUtil.cleanDb(hiveConf);
FileUtils.deleteDirectory(new File(getTestDataDir()));
}
}
Expand Down
Loading