Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
...
  • 11 commits
  • 17 files changed
  • 0 commit comments
  • 4 contributors
View
2 bin/omid.sh
@@ -41,7 +41,7 @@ fi
tso() {
export LD_LIBRARY_PATH=`$READLINK -f ../src/main/native`
- exec java -Xmx1024m -cp $CLASSPATH -Djava.library.path=$LD_LIBRARY_PATH -Dlog4j.configuration=log4j.properties com.yahoo.omid.tso.TSOServer 1234 $BATCHSIZE 4 2 localhost:2181
+ exec java -Xmx1024m -cp $CLASSPATH -Domid.maxItems=100000 -Domid.maxCommits=100000 -Djava.library.path=$LD_LIBRARY_PATH -Dlog4j.configuration=log4j.properties com.yahoo.omid.tso.TSOServer 1234 $BATCHSIZE 4 2 localhost:2181
}
tsobench() {
View
45 conf/log4j.properties
@@ -0,0 +1,45 @@
+########################################################################
+#
+# Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License. See accompanying LICENSE file.
+#
+########################################################################
+
+#log4j.rootCategory=TRACE, R, O
+log4j.rootCategory=WARN, R, O
+
+# Stdout
+log4j.appender.O=org.apache.log4j.ConsoleAppender
+#log4j.appender.O.Threshold=WARN
+
+# File
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.File=logs/log4j.log
+
+# Control the maximum log file size
+log4j.appender.R.MaxFileSize=100MB
+
+# Clear log file each time
+log4j.appender.R.Append=false
+
+# Archive log files (one backup file here)
+log4j.appender.R.MaxBackupIndex=5
+
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.O.layout=org.apache.log4j.PatternLayout
+
+log4j.appender.R.layout.ConversionPattern=[%d{ISO8601}]%5p%6.6r[%t]%x - %C.%M(%F:%L) - %m%n
+log4j.appender.O.layout.ConversionPattern=[%d{ISO8601}]%5p%6.6r[%t]%x - %C.%M(%F:%L) - %m%n
+
+log4j.logger.com.yahoo.omid.tso.ThroughputMonitor=TRACE
View
355 src/main/java/com/yahoo/omid/client/TransactionManager.java
@@ -1,174 +1,181 @@
-/**
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-
-package com.yahoo.omid.client;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
-
-/**
- * Provides the methods necessary to create and commit transactions.
- *
- * @see TransactionalTable
- *
- */
-public class TransactionManager {
- private static final Log LOG = LogFactory.getLog(TSOClient.class);
-
- static TSOClient tsoclient = null;
- private static Object lock = new Object();
- private Configuration conf;
- private HashMap<byte[], HTable> tableCache;
-
- public TransactionManager(Configuration conf) throws TransactionException, IOException {
- this.conf = conf;
- synchronized (lock) {
- if (tsoclient == null) {
- tsoclient = new TSOClient(conf);
- }
- }
- tableCache = new HashMap<byte[], HTable>();
- }
-
- /**
- * Starts a new transaction.
- *
- * This method returns an opaque {@link TransactionState} object, used by {@link TransactionalTable}'s methods
- * for performing operations on a given transaction.
- *
- * @return Opaque object which identifies one transaction.
- * @throws TransactionException
- */
- public TransactionState beginTransaction() throws TransactionException {
- SyncCreateCallback cb = new SyncCreateCallback();
- try {
- tsoclient.getNewTimestamp(cb);
- cb.await();
- } catch (Exception e) {
- throw new TransactionException("Could not get new timestamp", e);
- }
- if (cb.getException() != null) {
- throw new TransactionException("Error retrieving timestamp", cb.getException());
- }
-
- return new TransactionState(cb.getStartTimestamp(), tsoclient);
- }
-
- /**
- * Commits a transaction. If the transaction is aborted it automatically rollbacks the changes and
- * throws a {@link CommitUnsuccessfulException}.
- *
- * @param transactionState Object identifying the transaction to be committed.
- * @throws CommitUnsuccessfulException
- * @throws TransactionException
- */
- public void tryCommit(TransactionState transactionState)
- throws CommitUnsuccessfulException, TransactionException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("tryCommit " + transactionState.getStartTimestamp());
- }
- SyncCommitCallback cb = new SyncCommitCallback();
- try {
- tsoclient.commit(transactionState.getStartTimestamp(),
- transactionState.getRows(), cb);
- cb.await();
- } catch (Exception e) {
- throw new TransactionException("Could not commit", e);
- }
- if (cb.getException() != null) {
- throw new TransactionException("Error committing", cb.getException());
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("doneCommit " + transactionState.getStartTimestamp() +
- " TS_c: " + cb.getCommitTimestamp() +
- " Success: " + (cb.getResult() == TSOClient.Result.OK));
- }
-
- if (cb.getResult() == TSOClient.Result.ABORTED) {
- cleanup(transactionState);
- throw new CommitUnsuccessfulException();
- }
- transactionState.setCommitTimestamp(cb.getCommitTimestamp());
- }
-
- /**
- * Aborts a transaction and automatically rollbacks the changes.
- *
- * @param transactionState Object identifying the transaction to be committed.
- * @throws TransactionException
- */
- public void abort(TransactionState transactionState) throws TransactionException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("abort " + transactionState.getStartTimestamp());
- }
- try {
- tsoclient.abort(transactionState.getStartTimestamp());
- } catch (Exception e) {
- throw new TransactionException("Could not abort", e);
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("doneAbort " + transactionState.getStartTimestamp());
- }
-
- // Make sure its commit timestamp is 0, so the cleanup does the right job
- transactionState.setCommitTimestamp(0);
- cleanup(transactionState);
- }
-
- private void cleanup(final TransactionState transactionState)
- throws TransactionException {
- Map<byte[], List<Delete>> deleteBatches = new HashMap<byte[], List<Delete>>();
- for (final RowKeyFamily rowkey : transactionState.getRows()) {
- List<Delete> batch = deleteBatches.get(rowkey.getTable());
- if (batch == null) {
- batch = new ArrayList<Delete>();
- deleteBatches.put(rowkey.getTable(), batch);
- }
- Delete delete = new Delete(rowkey.getRow());
- for (Entry<byte[], List<KeyValue>> entry : rowkey.getFamilies().entrySet()) {
- for (KeyValue kv : entry.getValue()) {
- delete.deleteColumn(entry.getKey(), kv.getQualifier(), transactionState.getStartTimestamp());
- }
- }
- batch.add(delete);
- }
- for (final Entry<byte[], List<Delete>> entry : deleteBatches.entrySet()) {
- try {
- HTable table = tableCache.get(entry.getKey());
- if (table == null) {
- table = new HTable(conf, entry.getKey());
- tableCache.put(entry.getKey(), table);
- }
- table.delete(entry.getValue());
- } catch (IOException ioe) {
- throw new TransactionException("Could not clean up for table " + entry.getKey(), ioe);
- }
- }
- }
-}
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+
+/**
+ * Provides the methods necessary to create and commit transactions.
+ *
+ * @see TransactionalTable
+ *
+ */
+public class TransactionManager {
+ private static final Log LOG = LogFactory.getLog(TSOClient.class);
+
+ static TSOClient tsoclient = null;
+ private static Object lock = new Object();
+ private Configuration conf;
+ private HashMap<byte[], HTable> tableCache;
+
+ public TransactionManager(Configuration conf) throws TransactionException, IOException {
+ this.conf = conf;
+ synchronized (lock) {
+ if (tsoclient == null) {
+ tsoclient = new TSOClient(conf);
+ }
+ }
+ tableCache = new HashMap<byte[], HTable>();
+ }
+
+ /**
+ * Starts a new transaction.
+ *
+ * This method returns an opaque {@link TransactionState} object, used by {@link TransactionalTable}'s methods
+ * for performing operations on a given transaction.
+ *
+ * @return Opaque object which identifies one transaction.
+ * @throws TransactionException
+ */
+ public TransactionState beginTransaction() throws TransactionException {
+ SyncCreateCallback cb = new SyncCreateCallback();
+ try {
+ tsoclient.getNewTimestamp(cb);
+ cb.await();
+ } catch (Exception e) {
+ throw new TransactionException("Could not get new timestamp", e);
+ }
+ if (cb.getException() != null) {
+ throw new TransactionException("Error retrieving timestamp", cb.getException());
+ }
+
+ return new TransactionState(cb.getStartTimestamp(), tsoclient);
+ }
+
+ /**
+ * Commits a transaction. If the transaction is aborted it automatically rollbacks the changes and
+ * throws a {@link CommitUnsuccessfulException}.
+ *
+ * @param transactionState Object identifying the transaction to be committed.
+ * @throws CommitUnsuccessfulException
+ * @throws TransactionException
+ */
+ public void tryCommit(TransactionState transactionState)
+ throws CommitUnsuccessfulException, TransactionException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("tryCommit " + transactionState.getStartTimestamp());
+ }
+ SyncCommitCallback cb = new SyncCommitCallback();
+ try {
+ tsoclient.commit(transactionState.getStartTimestamp(),
+ transactionState.getRows(), cb);
+ cb.await();
+ } catch (Exception e) {
+ throw new TransactionException("Could not commit", e);
+ }
+ if (cb.getException() != null) {
+ throw new TransactionException("Error committing", cb.getException());
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("doneCommit " + transactionState.getStartTimestamp() +
+ " TS_c: " + cb.getCommitTimestamp() +
+ " Success: " + (cb.getResult() == TSOClient.Result.OK));
+ }
+
+ if (cb.getResult() == TSOClient.Result.ABORTED) {
+ cleanup(transactionState);
+ throw new CommitUnsuccessfulException();
+ }
+ transactionState.setCommitTimestamp(cb.getCommitTimestamp());
+ }
+
+ /**
+ * Aborts a transaction and automatically rollbacks the changes.
+ *
+ * @param transactionState Object identifying the transaction to be committed.
+ * @throws TransactionException
+ */
+ public void abort(TransactionState transactionState) throws TransactionException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("abort " + transactionState.getStartTimestamp());
+ }
+ try {
+ tsoclient.abort(transactionState.getStartTimestamp());
+ } catch (Exception e) {
+ throw new TransactionException("Could not abort", e);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("doneAbort " + transactionState.getStartTimestamp());
+ }
+
+ // Make sure its commit timestamp is 0, so the cleanup does the right job
+ transactionState.setCommitTimestamp(0);
+ cleanup(transactionState);
+ }
+
+ private void cleanup(final TransactionState transactionState)
+ throws TransactionException {
+ Map<byte[], List<Delete>> deleteBatches = new HashMap<byte[], List<Delete>>();
+ for (final RowKeyFamily rowkey : transactionState.getRows()) {
+ List<Delete> batch = deleteBatches.get(rowkey.getTable());
+ if (batch == null) {
+ batch = new ArrayList<Delete>();
+ deleteBatches.put(rowkey.getTable(), batch);
+ }
+ Delete delete = new Delete(rowkey.getRow());
+ for (Entry<byte[], List<KeyValue>> entry : rowkey.getFamilies().entrySet()) {
+ for (KeyValue kv : entry.getValue()) {
+ delete.deleteColumn(entry.getKey(), kv.getQualifier(), transactionState.getStartTimestamp());
+ }
+ }
+ batch.add(delete);
+ }
+ for (final Entry<byte[], List<Delete>> entry : deleteBatches.entrySet()) {
+ try {
+ HTable table = tableCache.get(entry.getKey());
+ if (table == null) {
+ table = new HTable(conf, entry.getKey());
+ tableCache.put(entry.getKey(), table);
+ }
+ table.delete(entry.getValue());
+ } catch (IOException ioe) {
+ throw new TransactionException("Could not clean up for table " + entry.getKey(), ioe);
+ }
+ }
+ AbortCompleteCallback cb = new SyncAbortCompleteCallback();
+ try {
+ tsoclient.completeAbort(transactionState.getStartTimestamp(), cb );
+ } catch (IOException ioe) {
+ throw new TransactionException("Could not notify TSO about cleanup completion for transaction " +
+ transactionState.getStartTimestamp(), ioe);
+ }
+ }
+}
View
12 src/main/java/com/yahoo/omid/client/TransactionalTable.java
@@ -186,29 +186,19 @@ public void delete(TransactionState transactionState, Delete delete) throws IOEx
*/
public void put(TransactionState transactionState, Put put) throws IOException, IllegalArgumentException {
final long startTimestamp = transactionState.getStartTimestamp();
-// byte[] startTSBytes = Bytes.toBytes(startTimestamp);
// create put with correct ts
final Put tsput = new Put(put.getRow(), startTimestamp);
Map<byte[], List<KeyValue>> kvs = put.getFamilyMap();
for (List<KeyValue> kvl : kvs.values()) {
for (KeyValue kv : kvl) {
-// int tsOffset = kv.getTimestampOffset();
-// System.arraycopy(startTSBytes, 0, kv.getBuffer(), tsOffset, Bytes.SIZEOF_LONG);
tsput.add(new KeyValue(kv.getRow(), kv.getFamily(), kv.getQualifier(), startTimestamp, kv.getValue()));
}
}
// should add the table as well
- transactionState.addRow(new RowKeyFamily(put.getRow(), getTableName(), put.getFamilyMap()));
+ transactionState.addRow(new RowKeyFamily(tsput.getRow(), getTableName(), tsput.getFamilyMap()));
put(tsput);
-// super.getConnection().getRegionServerWithRetries(
-// new ServerCallable<Boolean>(super.getConnection(), super.getTableName(), put.getRow()) {
-// public Boolean call() throws IOException {
-// server.put(location.getRegionInfo().getRegionName(), tsput);
-// return true;
-// }
-// });
}
/**
* Transactional version of {@link HTable#getScanner(Scan)}
View
8 src/main/java/com/yahoo/omid/client/regionserver/Compacter.java
@@ -99,6 +99,7 @@ public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment>
private InternalScanner internalScanner;
private long minTimestamp;
private Set<ByteArray> columnsSeen = new HashSet<ByteArray>();
+ private ByteArray lastRowId = null;
public CompacterScanner(InternalScanner internalScanner, long minTimestamp) {
this.minTimestamp = minTimestamp;
@@ -118,6 +119,13 @@ public boolean next(List<KeyValue> result, int limit) throws IOException {
while (limit == -1 || result.size() < limit) {
int toReceive = limit == -1 ? -1 : limit - result.size();
moreRows = internalScanner.next(raw, toReceive);
+ if (raw.size() > 0) {
+ ByteArray currentRowId = new ByteArray(raw.get(0).getRow());
+ if (!currentRowId.equals(lastRowId)) {
+ columnsSeen.clear();
+ lastRowId = currentRowId;
+ }
+ }
for (KeyValue kv : raw) {
ByteArray column = new ByteArray(kv.getFamily(), kv.getQualifier());
if (columnsSeen.add(column) || kv.getTimestamp() > minTimestamp) {
View
23 src/main/java/com/yahoo/omid/tso/TSOServer.java
@@ -16,22 +16,13 @@
package com.yahoo.omid.tso;
-import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
@@ -45,13 +36,7 @@
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
-import com.yahoo.omid.tso.serialization.TSODecoder;
-import com.yahoo.omid.tso.serialization.TSOEncoder;
import com.yahoo.omid.tso.persistence.BookKeeperStateBuilder;
-import com.yahoo.omid.tso.persistence.StateBuilder;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* TSO Server with serialization
@@ -130,7 +115,7 @@ public void run() {
// TODO: make it singleton
//TimestampOracle timestampOracle = new TimestampOracle();
// The wrapper for the shared state of TSO
- state = BookKeeperStateBuilder.getState();
+ state = BookKeeperStateBuilder.getState(this.config);
if(state == null){
LOG.error("Couldn't build state");
@@ -157,7 +142,7 @@ public void run() {
// Create the monitor
ThroughputMonitor monitor = new ThroughputMonitor(state);
// Add the parent channel to the group
- Channel channel = bootstrap.bind(new InetSocketAddress(config.getPort())));
+ Channel channel = bootstrap.bind(new InetSocketAddress(config.getPort()));
channelGroup.add(channel);
// Compacter handler
View
8 src/main/java/com/yahoo/omid/tso/TSOServerConfig.java
@@ -59,10 +59,10 @@ static public TSOServerConfig configFactory(int port, int batch, boolean recover
TSOServerConfig(int port, int batch, boolean recoveryEnabled, int ensemble, int quorum, String zkServers){
this.port = port;
this.batch = batch;
- this.recoveryEnabled = Boolean.parseBoolean(System.getProperty("RECOVERABLE", "false"));
- this.zkServers = System.getProperty("ZKSERVERS");
- this.ensemble = Integer.parseInt(System.getProperty("ENSEMBLE", "3"));
- this.quorum = Integer.parseInt(System.getProperty("QUORUM", "2"));
+ this.recoveryEnabled = recoveryEnabled;
+ this.zkServers = zkServers;
+ this.ensemble = ensemble;
+ this.quorum = quorum;
}
public int getPort(){
View
16 src/main/java/com/yahoo/omid/tso/persistence/BookKeeperStateBuilder.java
@@ -44,6 +44,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import com.yahoo.omid.tso.TSOServerConfig;
import com.yahoo.omid.tso.TSOState;
import com.yahoo.omid.tso.TimestampOracle;
import com.yahoo.omid.tso.persistence.BookKeeperStateLogger.LedgerIdCreateCallback;
@@ -72,13 +74,13 @@
*/
private static final long BKREADBATCHSIZE = 50;
- public static TSOState getState(){
+ public static TSOState getState(TSOServerConfig config){
TSOState returnValue;
- if(System.getProperty("ZKSERVERS") == null){
+ if(config.getZkServers() == null){
LOG.warn("Logger is disabled");
returnValue = new TSOState(new TimestampOracle());
} else {
- BookKeeperStateBuilder builder = new BookKeeperStateBuilder();
+ BookKeeperStateBuilder builder = new BookKeeperStateBuilder(config);
try{
returnValue = builder.buildState();
@@ -97,9 +99,11 @@ public static TSOState getState(){
ZooKeeper zk;
LoggerProtocol lp;
boolean enabled;
+ TSOServerConfig config;
- BookKeeperStateBuilder() {
- this.timestampOracle = new TimestampOracle();
+ BookKeeperStateBuilder(TSOServerConfig config) {
+ this.timestampOracle = new TimestampOracle();
+ this.config = config;
}
/**
@@ -225,7 +229,7 @@ public TSOState buildState()
try{
CountDownLatch latch = new CountDownLatch(1);
- this.zk = new ZooKeeper(System.getProperty("ZKSERVERS"),
+ this.zk = new ZooKeeper(config.getZkServers(),
Integer.parseInt(System.getProperty("SESSIONTIMEOUT", Integer.toString(10000))),
new LoggerWatcher(latch));
View
5 src/test/java/com/yahoo/omid/TestAbortTransaction.java
@@ -4,16 +4,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import com.yahoo.omid.client.TransactionManager;
View
5 src/test/java/com/yahoo/omid/TestBasicTransaction.java
@@ -20,18 +20,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import com.yahoo.omid.client.TransactionManager;
View
100 src/test/java/com/yahoo/omid/TestCompaction.java
@@ -27,6 +27,8 @@
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
@@ -37,14 +39,14 @@
public class TestCompaction extends OmidTestBase {
private static final Log LOG = LogFactory.getLog(TestCompaction.class);
- @Test public void runDeleteOld() throws Exception {
+ @Test public void testDeleteOld() throws Exception {
try {
TransactionManager tm = new TransactionManager(conf);
TransactionalTable tt = new TransactionalTable(conf, TEST_TABLE);
-
+
TransactionState t1 = tm.beginTransaction();
LOG.info("Transaction created " + t1);
-
+
byte[] row = Bytes.toBytes("test-simple");
byte[] fam = Bytes.toBytes(TEST_FAMILY);
byte[] col = Bytes.toBytes("testdata");
@@ -87,7 +89,7 @@
g.addColumn(fam, col2);
Result r = tt.get(g);
int size = r.getColumn(fam, col2).size();
- System.out.println("Size before compaction : " + size);
+ LOG.info("Size before compaction : " + size);
admin.compact(TEST_TABLE);
@@ -103,11 +105,99 @@
g.setMaxVersions();
g.addColumn(fam, col2);
r = tt.get(g);
- System.out.println("Size after compaction : " + r.getColumn(fam, col2).size());
+ LOG.info("Size after compaction " + r.getColumn(fam, col2).size());
assertThat(r.getColumn(fam, col2).size(), is(lessThan(size)));
} catch (Exception e) {
LOG.error("Exception occurred", e);
throw e;
}
}
+
+ @Test public void testLimitEqualToColumns() throws Exception {
+ try {
+ TransactionManager tm = new TransactionManager(conf);
+ TransactionalTable tt = new TransactionalTable(conf, TEST_TABLE);
+
+ TransactionState t1 = tm.beginTransaction();
+
+ byte[] row = Bytes.toBytes("test-simple");
+ byte[] row2 = Bytes.toBytes("test-simple2");
+ byte[] row3 = Bytes.toBytes("test-simple3");
+ byte[] row4 = Bytes.toBytes("test-simple4");
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col = Bytes.toBytes("testdata");
+ byte[] col1 = Bytes.add(col, Bytes.toBytes(1));
+ byte[] col11 = Bytes.add(col, Bytes.toBytes(11));
+ byte[] data = Bytes.toBytes("testWrite-1");
+ byte[] data2 = Bytes.toBytes("testWrite-2verylargedatamuchmoredata than anything ever written to");
+
+ Put p = new Put(row);
+ for (int i = 0; i < 10; ++i) {
+ p.add(fam, Bytes.add(col, Bytes.toBytes(i)), data);
+ }
+ tt.put(t1, p);
+ tm.tryCommit(t1);
+
+ TransactionState t2 = tm.beginTransaction();
+ p = new Put(row2);
+ for (int i = 0; i < 10; ++i) {
+ p.add(fam, Bytes.add(col, Bytes.toBytes(i)), data);
+ }
+ tt.put(t2, p);
+ tm.tryCommit(t2);
+
+ // fill with data
+ for (int i = 0; i < 500; ++i) {
+ t2 = tm.beginTransaction();
+ p = new Put(row4);
+ p.add(fam, col11, data2);
+ tt.put(t2, p);
+ tm.tryCommit(t2);
+ }
+
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ admin.flush(TEST_TABLE);
+
+ TransactionState t3 = tm.beginTransaction();
+ p = new Put(row3);
+ for (int i = 0; i < 10; ++i) {
+ p.add(fam, Bytes.add(col, Bytes.toBytes(i)), data);
+ }
+ tt.put(t3, p);
+ tm.tryCommit(t3);
+
+ // fill with data
+ for (int i = 0; i < 500; ++i) {
+ t2 = tm.beginTransaction();
+ p = new Put(row4);
+ p.add(fam, col11, data2);
+ tt.put(t2, p);
+ tm.tryCommit(t2);
+ }
+
+ Get g = new Get(row);
+ g.setMaxVersions();
+ g.addColumn(fam, col1);
+ Result r = tt.get(g);
+ int size = r.getColumn(fam, col1).size();
+ LOG.info("Size before compaction : " + size);
+
+ admin.compact(TEST_TABLE);
+
+ Thread.sleep(2000);
+
+ Scan s = new Scan(row);
+ s.setMaxVersions();
+ s.addColumn(fam, col1);
+ ResultScanner rs = tt.getScanner(s);
+ int count = 0;
+ while ((r = rs.next()) != null) {
+ count += r.getColumn(fam, col1).size();
+ }
+ assertEquals(3, count);
+ } catch (Exception e) {
+ LOG.error("Exception occurred", e);
+ throw e;
+ }
+ }
}
View
5 src/test/java/com/yahoo/omid/TestSingleColumnFamily.java
@@ -20,16 +20,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import com.yahoo.omid.client.TransactionManager;
View
389 src/test/java/com/yahoo/omid/TestTransactionConflict.java
@@ -31,8 +31,6 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import com.yahoo.omid.client.CommitUnsuccessfulException;
@@ -41,173 +39,240 @@
import com.yahoo.omid.client.TransactionalTable;
public class TestTransactionConflict extends OmidTestBase {
- private static final Log LOG = LogFactory
- .getLog(TestTransactionConflict.class);
+ private static final Log LOG = LogFactory.getLog(TestTransactionConflict.class);
- @Test
- public void runTestWriteWriteConflict() throws Exception {
- TransactionManager tm = new TransactionManager(conf);
- TransactionalTable tt = new TransactionalTable(conf, TEST_TABLE);
+ @Test
+ public void runTestWriteWriteConflict() throws Exception {
+ TransactionManager tm = new TransactionManager(conf);
+ TransactionalTable tt = new TransactionalTable(conf, TEST_TABLE);
- TransactionState t1 = tm.beginTransaction();
- LOG.info("Transaction created " + t1);
+ TransactionState t1 = tm.beginTransaction();
+ LOG.info("Transaction created " + t1);
- TransactionState t2 = tm.beginTransaction();
- LOG.info("Transaction created" + t2);
+ TransactionState t2 = tm.beginTransaction();
+ LOG.info("Transaction created" + t2);
- byte[] row = Bytes.toBytes("test-simple");
- byte[] fam = Bytes.toBytes(TEST_FAMILY);
- byte[] col = Bytes.toBytes("testdata");
- byte[] data1 = Bytes.toBytes("testWrite-1");
- byte[] data2 = Bytes.toBytes("testWrite-2");
+ byte[] row = Bytes.toBytes("test-simple");
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col = Bytes.toBytes("testdata");
+ byte[] data1 = Bytes.toBytes("testWrite-1");
+ byte[] data2 = Bytes.toBytes("testWrite-2");
- Put p = new Put(row);
- p.add(fam, col, data1);
- tt.put(t1, p);
+ Put p = new Put(row);
+ p.add(fam, col, data1);
+ tt.put(t1, p);
- Put p2 = new Put(row);
- p2.add(fam, col, data2);
- tt.put(t2, p2);
+ Put p2 = new Put(row);
+ p2.add(fam, col, data2);
+ tt.put(t2, p2);
- tm.tryCommit(t2);
+ tm.tryCommit(t2);
- boolean aborted = false;
- try {
- tm.tryCommit(t1);
- assertTrue("Transaction commited successfully", false);
- } catch (CommitUnsuccessfulException e) {
- aborted = true;
- }
- assertTrue("Transaction didn't raise exception", aborted);
- }
-
- @Test
- public void runTestCleanupAfterConflict() throws Exception {
- TransactionManager tm = new TransactionManager(conf);
- TransactionalTable tt = new TransactionalTable(conf, TEST_TABLE);
-
- TransactionState t1 = tm.beginTransaction();
- LOG.info("Transaction created " + t1);
-
- TransactionState t2 = tm.beginTransaction();
- LOG.info("Transaction created" + t2);
-
- byte[] row = Bytes.toBytes("test-simple");
- byte[] fam = Bytes.toBytes(TEST_FAMILY);
- byte[] col = Bytes.toBytes("testdata");
- byte[] data1 = Bytes.toBytes("testWrite-1");
- byte[] data2 = Bytes.toBytes("testWrite-2");
-
- Put p = new Put(row);
- p.add(fam, col, data1);
- tt.put(t1, p);
-
- Get g = new Get(row).setMaxVersions();
- Result r = tt.get(g);
- assertEquals("Unexpected size for read.", 1, r.size());
- assertTrue(
- "Unexpected value for read: "
- + Bytes.toString(r.getValue(fam, col)),
- Bytes.equals(data1, r.getValue(fam, col)));
-
- Put p2 = new Put(row);
- p2.add(fam, col, data2);
- tt.put(t2, p2);
-
- r = tt.get(g);
- assertEquals("Unexpected size for read.", 2, r.size());
-
- tm.tryCommit(t2);
-
- boolean aborted = false;
- try {
- tm.tryCommit(t1);
- assertTrue("Transaction commited successfully", false);
- } catch (CommitUnsuccessfulException e) {
+ boolean aborted = false;
+ try {
+ tm.tryCommit(t1);
+ assertTrue("Transaction commited successfully", false);
+ } catch (CommitUnsuccessfulException e) {
+ aborted = true;
+ }
+ assertTrue("Transaction didn't raise exception", aborted);
+ }
+
+ @Test
+ public void runTestMultiTableConflict() throws Exception {
+ TransactionManager tm = new TransactionManager(conf);
+ TransactionalTable tt = new TransactionalTable(conf, TEST_TABLE);
+ String table2 = TEST_TABLE + 2;
+
+ HBaseAdmin admin = new HBaseAdmin(conf);
+
+ if (!admin.tableExists(table2)) {
+ HTableDescriptor desc = new HTableDescriptor(table2);
+ HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
+ datafam.setMaxVersions(Integer.MAX_VALUE);
+ desc.addFamily(datafam);
+
+ admin.createTable(desc);
+ }
+
+ if (admin.isTableDisabled(table2)) {
+ admin.enableTable(table2);
+ }
+
+ TransactionalTable tt2 = new TransactionalTable(conf, table2);
+
+ TransactionState t1 = tm.beginTransaction();
+ LOG.info("Transaction created " + t1);
+
+ TransactionState t2 = tm.beginTransaction();
+ LOG.info("Transaction created" + t2);
+
+ byte[] row = Bytes.toBytes("test-simple");
+ byte[] row2 = Bytes.toBytes("test-simple2");
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col = Bytes.toBytes("testdata");
+ byte[] data1 = Bytes.toBytes("testWrite-1");
+ byte[] data2 = Bytes.toBytes("testWrite-2");
+
+ Put p = new Put(row);
+ p.add(fam, col, data1);
+ tt.put(t1, p);
+ tt2.put(t1, p);
+
+ Put p2 = new Put(row);
+ p2.add(fam, col, data2);
+ tt.put(t2, p2);
+ p2 = new Put(row2);
+ p2.add(fam, col, data2);
+ tt2.put(t2, p2);
+
+ tm.tryCommit(t2);
+
+ boolean aborted = false;
+ try {
+ tm.tryCommit(t1);
+ assertTrue("Transaction commited successfully", false);
+ } catch (CommitUnsuccessfulException e) {
+ aborted = true;
+ }
+ assertTrue("Transaction didn't raise exception", aborted);
+
+ ResultScanner rs = tt2.getScanner(Bytes.toBytes(TEST_FAMILY));
+ int count = 0;
+ Result r;
+ while ((r = rs.next()) != null) {
+ count += r.size();
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void runTestCleanupAfterConflict() throws Exception {
+ TransactionManager tm = new TransactionManager(conf);
+ TransactionalTable tt = new TransactionalTable(conf, TEST_TABLE);
+
+ TransactionState t1 = tm.beginTransaction();
+ LOG.info("Transaction created " + t1);
+
+ TransactionState t2 = tm.beginTransaction();
+ LOG.info("Transaction created" + t2);
+
+ byte[] row = Bytes.toBytes("test-simple");
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col = Bytes.toBytes("testdata");
+ byte[] data1 = Bytes.toBytes("testWrite-1");
+ byte[] data2 = Bytes.toBytes("testWrite-2");
+
+ Put p = new Put(row);
+ p.add(fam, col, data1);
+ tt.put(t1, p);
+
+ Get g = new Get(row).setMaxVersions();
+ Result r = tt.get(g);
+ assertEquals("Unexpected size for read.", 1, r.size());
+ assertTrue("Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)),
+ Bytes.equals(data1, r.getValue(fam, col)));
+
+ Put p2 = new Put(row);
+ p2.add(fam, col, data2);
+ tt.put(t2, p2);
+
+ r = tt.get(g);
+ assertEquals("Unexpected size for read.", 2, r.size());
+ r = tt.get(t2, g);
+ assertEquals("Unexpected size for read.", 1, r.size());
+ assertTrue("Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)),
+ Bytes.equals(data2, r.getValue(fam, col)));
+
+ tm.tryCommit(t1);
+
+ boolean aborted = false;
+ try {
+ tm.tryCommit(t2);
+ assertTrue("Transaction commited successfully", false);
+ } catch (CommitUnsuccessfulException e) {
+ aborted = true;
+ }
+ assertTrue("Transaction didn't raise exception", aborted);
+
+ r = tt.get(g);
+ assertEquals("Unexpected size for read.", 1, r.size());
+ assertTrue("Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)),
+ Bytes.equals(data1, r.getValue(fam, col)));
+ }
+
+ @Test
+ public void testCleanupWithDeleteRow() throws Exception {
+ try {
+ TransactionManager tm = new TransactionManager(conf);
+ TransactionalTable tt = new TransactionalTable(conf, TEST_TABLE);
+
+ TransactionState t1 = tm.beginTransaction();
+ LOG.info("Transaction created " + t1);
+
+ int rowcount = 10;
+ int count = 0;
+
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col = Bytes.toBytes("testdata");
+ byte[] data1 = Bytes.toBytes("testWrite-1");
+ byte[] data2 = Bytes.toBytes("testWrite-2");
+
+ byte[] modrow = Bytes.toBytes("test-del" + 3);
+ for (int i = 0; i < rowcount; i++) {
+ byte[] row = Bytes.toBytes("test-del" + i);
+
+ Put p = new Put(row);
+ p.add(fam, col, data1);
+ tt.put(t1, p);
+ }
+ tm.tryCommit(t1);
+
+ TransactionState t2 = tm.beginTransaction();
+ LOG.info("Transaction created " + t2);
+ Delete d = new Delete(modrow);
+ tt.delete(t2, d);
+
+ ResultScanner rs = tt.getScanner(t2, new Scan());
+ Result r = rs.next();
+ count = 0;
+ while (r != null) {
+ count++;
+ LOG.trace("row: " + Bytes.toString(r.getRow()) + " count: " + count);
+ r = rs.next();
+ }
+ assertEquals("Wrong count", rowcount - 1, count);
+
+ TransactionState t3 = tm.beginTransaction();
+ LOG.info("Transaction created " + t3);
+ Put p = new Put(modrow);
+ p.add(fam, col, data2);
+ tt.put(t3, p);
+
+ tm.tryCommit(t3);
+
+ boolean aborted = false;
+ try {
+ tm.tryCommit(t2);
+ assertTrue("Didn't abort", false);
+ } catch (CommitUnsuccessfulException e) {
aborted = true;
- }
- assertTrue("Transaction didn't raise exception", aborted);
-
- r = tt.get(g);
- assertEquals("Unexpected size for read.", 1, r.size());
- assertTrue(
- "Unexpected value for read: "
- + Bytes.toString(r.getValue(fam, col)),
- Bytes.equals(data2, r.getValue(fam, col)));
- }
-
-
- @Test public void testCleanupWithDeleteRow() throws Exception {
- try {
- TransactionManager tm = new TransactionManager(conf);
- TransactionalTable tt = new TransactionalTable(conf, TEST_TABLE);
-
- TransactionState t1 = tm.beginTransaction();
- LOG.info("Transaction created " + t1);
-
- int rowcount = 10;
- int count = 0;
-
- byte[] fam = Bytes.toBytes(TEST_FAMILY);
- byte[] col = Bytes.toBytes("testdata");
- byte[] data1 = Bytes.toBytes("testWrite-1");
- byte[] data2 = Bytes.toBytes("testWrite-2");
-
- byte[] modrow = Bytes.toBytes("test-del" + 3);
- for (int i = 0; i < rowcount; i++) {
- byte[] row = Bytes.toBytes("test-del" + i);
-
- Put p = new Put(row);
- p.add(fam, col, data1);
- tt.put(t1, p);
- }
- tm.tryCommit(t1);
-
- TransactionState t2 = tm.beginTransaction();
- LOG.info("Transaction created " + t2);
- Delete d = new Delete(modrow);
- tt.delete(t2, d);
-
- ResultScanner rs = tt.getScanner(t2, new Scan());
- Result r = rs.next();
- count = 0;
- while (r != null) {
- count++;
- LOG.trace("row: " + Bytes.toString(r.getRow()) + " count: " + count);
- r = rs.next();
- }
- assertEquals("Wrong count", rowcount - 1, count);
-
- TransactionState t3 = tm.beginTransaction();
- LOG.info("Transaction created " + t3);
- Put p = new Put(modrow);
- p.add(fam, col, data2);
- tt.put(t3, p);
-
- tm.tryCommit(t3);
-
- boolean aborted = false;
- try {
- tm.tryCommit(t2);
- assertTrue("Didn't abort", false);
- } catch (CommitUnsuccessfulException e) {
- aborted = true;
- }
- assertTrue("Didn't raise exception", aborted);
-
- TransactionState tscan = tm.beginTransaction();
- rs = tt.getScanner(tscan, new Scan());
- r = rs.next();
- count = 0;
- while (r != null) {
- count++;
- r = rs.next();
- }
- assertEquals("Wrong count", rowcount, count);
-
- } catch (Exception e) {
- LOG.error("Exception occurred", e);
- throw e;
- }
- }
+ }
+ assertTrue("Didn't raise exception", aborted);
+
+ TransactionState tscan = tm.beginTransaction();
+ rs = tt.getScanner(tscan, new Scan());
+ r = rs.next();
+ count = 0;
+ while (r != null) {
+ count++;
+ r = rs.next();
+ }
+ assertEquals("Wrong count", rowcount, count);
+
+ } catch (Exception e) {
+ LOG.error("Exception occurred", e);
+ throw e;
+ }
+ }
}
View
6 src/test/java/com/yahoo/omid/tso/TSOTestBase.java
@@ -98,8 +98,6 @@ public static void teardownClient() {
channelFactory.releaseExternalResources();
}
- static LocalBookKeeper localBK;
-
@BeforeClass
public static void setupBookkeeper() throws Exception {
System.out.println("PATH : "
@@ -113,7 +111,7 @@ public void run() {
String[] args = new String[1];
args[0] = "5";
LOG.info("Starting bk");
- localBK.main(args);
+ LocalBookKeeper.main(args);
} catch (InterruptedException e) {
// go away quietly
} catch (Exception e) {
@@ -151,7 +149,7 @@ public void setupTSO() throws Exception {
*/
Thread.sleep(500);
- tso = new TSOServer(1234, 0, 4, 2, new String("localhost:2181"));
+ tso = new TSOServer(TSOServerConfig.configFactory(1234, 0, false, 4, 2, new String("localhost:2181")));
tsothread = new Thread(tso);
LOG.info("Starting TSO");
View
2 src/test/java/com/yahoo/omid/tso/TestCommitReport.java
@@ -18,11 +18,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.objectweb.asm.Opcodes.NEW;
import org.junit.Test;
-import com.yahoo.omid.tso.RowKey;
import com.yahoo.omid.tso.messages.CommitRequest;
import com.yahoo.omid.tso.messages.CommitResponse;
import com.yahoo.omid.tso.messages.CommittedTransactionReport;
View
8 src/test/java/com/yahoo/omid/tso/TestPersistence.java
@@ -19,20 +19,16 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.junit.Test;
-import com.yahoo.omid.tso.RowKey;
-import com.yahoo.omid.tso.messages.AbortedTransactionReport;
import com.yahoo.omid.tso.messages.CommitRequest;
import com.yahoo.omid.tso.messages.CommitResponse;
import com.yahoo.omid.tso.messages.CommittedTransactionReport;
-import com.yahoo.omid.tso.messages.FullAbortReport;
import com.yahoo.omid.tso.messages.TimestampRequest;
import com.yahoo.omid.tso.messages.TimestampResponse;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
public class TestPersistence extends TSOTestBase {
private static final Log LOG = LogFactory.getLog(TestPersistence.class);
View
1 src/test/java/com/yahoo/omid/tso/TestTimestamps.java
@@ -20,7 +20,6 @@
import org.junit.Test;
-import com.yahoo.omid.tso.messages.CommittedTransactionReport;
import com.yahoo.omid.tso.messages.TimestampRequest;
import com.yahoo.omid.tso.messages.TimestampResponse;

No commit comments for this range

Something went wrong with that request. Please try again.