Skip to content

Commit

Permalink
BOOKKEEPER-557: Compiler error showing up badly with jdk 7 (ivank via…
Browse files Browse the repository at this point in the history
… sijie)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1460523 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
sijie committed Mar 25, 2013
1 parent 9868081 commit f2bb560
Show file tree
Hide file tree
Showing 41 changed files with 311 additions and 138 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Expand Up @@ -50,6 +50,8 @@ Trunk (unreleased changes)

BOOKKEEPER-573: Script to start a bookkeeper cluster (ivank via sijie)

BOOKKEEPER-557: Compiler error showing up badly with jdk 7 (ivank via sijie)

Release 4.2.0 - 2013-01-14

Non-backward compatible changes:
Expand Down
Expand Up @@ -120,7 +120,7 @@ public void processResult(int rc, String path, Object ctx, List<String> children

final HashSet<InetSocketAddress> deadBookies;
synchronized (this) {
deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
deadBookies = new HashSet<InetSocketAddress>(knownBookies);
deadBookies.removeAll(newBookieAddrs);
// No need to close readonly bookie clients.
deadBookies.removeAll(readOnlyBookieWatcher.getReadOnlyBookies());
Expand Down
Expand Up @@ -188,7 +188,7 @@ public ClientConfiguration setClientTcpNoDelay(boolean noDelay) {
* @return zookeeper servers
*/
public String getZkServers() {
List<Object> servers = getList(ZK_SERVERS, null);
List servers = getList(ZK_SERVERS, null);
if (null == servers || 0 == servers.size()) {
return "localhost";
}
Expand Down
Expand Up @@ -387,7 +387,7 @@ public ServerConfiguration setServerTcpNoDelay(boolean noDelay) {
* @return zookeeper servers
*/
public String getZkServers() {
List<Object> servers = getList(ZK_SERVERS, null);
List servers = getList(ZK_SERVERS, null);
if (null == servers || 0 == servers.size()) {
return null;
}
Expand Down
Expand Up @@ -177,7 +177,7 @@ public void writeLedgerMetadata(final long ledgerId, final LedgerMetadata metada
new StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (KeeperException.Code.BadVersion == rc) {
if (KeeperException.Code.BADVERSION.intValue() == rc) {
cb.operationComplete(BKException.Code.MetadataVersionException, null);
} else if (KeeperException.Code.OK.intValue() == rc) {
// update metadata version
Expand Down
Expand Up @@ -133,16 +133,17 @@ public static LedgerManagerFactory newLedgerManagerFactory(
// handle pre V2 layout
if (layout.getLayoutFormatVersion() <= V1) {
// pre V2 layout we use type of ledger manager
@SuppressWarnings("deprecation")
String lmType = conf.getLedgerManagerType();
if (lmType != null && !layout.getManagerType().equals(lmType)) {
if (lmType != null && !layout.getManagerFactoryClass().equals(lmType)) {
throw new IOException("Configured layout " + lmType
+ " does not match existing layout " + layout.getManagerType());
+ " does not match existing layout " + layout.getManagerFactoryClass());
}

// create the ledger manager
if (FlatLedgerManagerFactory.NAME.equals(layout.getManagerType())) {
if (FlatLedgerManagerFactory.NAME.equals(layout.getManagerFactoryClass())) {
lmFactory = new FlatLedgerManagerFactory();
} else if (HierarchicalLedgerManagerFactory.NAME.equals(layout.getManagerType())) {
} else if (HierarchicalLedgerManagerFactory.NAME.equals(layout.getManagerFactoryClass())) {
lmFactory = new HierarchicalLedgerManagerFactory();
} else {
throw new IOException("Unknown ledger manager type: " + lmType);
Expand Down Expand Up @@ -189,6 +190,7 @@ private static LedgerManagerFactory createNewLMFactory(
// use default ledger manager factory if no one provided
if (factoryClass == null) {
// for backward compatibility, check manager type
@SuppressWarnings("deprecation")
String lmType = conf.getLedgerManagerType();
if (lmType == null) {
factoryClass = FlatLedgerManagerFactory.class;
Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
Expand All @@ -66,8 +65,6 @@
* has reconnect logic if a connection to a bookie fails.
*
*/

@ChannelPipelineCoverage("one")
public class PerChannelBookieClient extends SimpleChannelHandler implements ChannelPipelineFactory {

static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class);
Expand Down
Expand Up @@ -151,6 +151,7 @@ private synchronized void submitAuditTask() {
return;
}
executor.submit(new Runnable() {
@SuppressWarnings("unchecked")
public void run() {
try {
waitIfLedgerReplicationDisabled();
Expand Down Expand Up @@ -267,6 +268,7 @@ private List<String> getAvailableBookies() throws KeeperException,
return zkc.getChildren(conf.getZkAvailableBookiesPath(), this);
}

@SuppressWarnings("unchecked")
private void auditingBookies(List<String> availableBookies)
throws BKAuditException, KeeperException, InterruptedException {

Expand Down
Expand Up @@ -287,9 +287,9 @@ public void testSpeculativeReadScheduling() throws Exception {
LedgerHandle l = bkspec.openLedger(id, digestType, passwd);

ArrayList<InetSocketAddress> ensemble = l.getLedgerMetadata().getEnsembles().get(0L);
Set<InetSocketAddress> allHosts = new HashSet(ensemble);
Set<InetSocketAddress> noHost = new HashSet();
Set<InetSocketAddress> secondHostOnly = new HashSet();
Set<InetSocketAddress> allHosts = new HashSet<InetSocketAddress>(ensemble);
Set<InetSocketAddress> noHost = new HashSet<InetSocketAddress>();
Set<InetSocketAddress> secondHostOnly = new HashSet<InetSocketAddress>();
secondHostOnly.add(ensemble.get(1));
PendingReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null;
try {
Expand Down Expand Up @@ -339,4 +339,4 @@ public void testSpeculativeReadScheduling() throws Exception {
bkspec.close();
}
}
}
}
Expand Up @@ -56,7 +56,7 @@ public void testLedgerLayout() throws Exception {
layout2.store(zkc, ledgerRootPath);

layout = LedgerLayout.readLayout(zkc, ledgerRootPath);
assertEquals(testName, layout.getManagerType());
assertEquals(testName, layout.getManagerFactoryClass());
assertEquals(testVersion, layout.getManagerVersion());
}

Expand Down Expand Up @@ -141,7 +141,7 @@ public void testReadV1LedgerManagerLayout() throws Exception {

LedgerLayout layout = LedgerLayout.readLayout(zkc, conf.getZkLedgersRootPath());
assertNotNull("Should not be null", layout);
assertEquals(FlatLedgerManagerFactory.NAME, layout.getManagerType());
assertEquals(FlatLedgerManagerFactory.NAME, layout.getManagerFactoryClass());
assertEquals(FlatLedgerManagerFactory.CUR_VERSION, layout.getManagerVersion());
assertEquals(1, layout.getLayoutFormatVersion());
}
Expand Down
Expand Up @@ -111,6 +111,7 @@ public void testBadConf() throws Exception {
/**
* Test bad client configuration
*/
@SuppressWarnings("deprecation")
@Test(timeout=60000)
public void testBadConfV1() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
Expand Down
Expand Up @@ -42,6 +42,7 @@ public BookieZKExpireTest() {
baseClientConf.setZkTimeout(6000);
}

@SuppressWarnings("deprecation")
@Test(timeout=60000)
public void testBookieServerZKExpireBehaviour() throws Exception {
BookieServer server = null;
Expand Down
Expand Up @@ -108,6 +108,7 @@ public void sleepServer(final int seconds, final CountDownLatch l)
for (final Thread t : allthreads) {
if (t.getName().contains("SyncThread:0")) {
Thread sleeper = new Thread() {
@SuppressWarnings("deprecation")
public void run() {
try {
t.suspend();
Expand Down
Expand Up @@ -31,6 +31,8 @@
import org.apache.hedwig.jms.message.MessageImpl;
import org.apache.hedwig.jms.message.MessageUtil;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -357,9 +359,10 @@ public void subscribeToTopic(String topicName, String subscribedId) throws JMSEx
}

try {
SubscriptionOptions opts = SubscriptionOptions.newBuilder()
.setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
hedwigClient.getSubscriber().subscribe(ByteString.copyFromUtf8(topicName),
ByteString.copyFromUtf8(subscribedId),
PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH);
ByteString.copyFromUtf8(subscribedId), opts);
} catch (PubSubException.CouldNotConnectException e) {
JMSException je = new JMSException("receive failed, could not connect .. " + e);
je.setLinkedException(e);
Expand Down
Expand Up @@ -40,7 +40,7 @@ public class ConnectionChurnTest extends JmsTestBase {

public void testPerformance() throws Exception {
ConnectionFactory factory = createConnectionFactory();
List<Connection> list = new ArrayList();
List<Connection> list = new ArrayList<Connection>();
for (int i = 0; i < CONNECTION_COUNT; i++) {
Connection connection = factory.createConnection();
connection.start();
Expand Down
Expand Up @@ -42,6 +42,7 @@ public class CompositePublishTest extends JmsSendReceiveTestSupport {
protected MessageConsumer[] consumers;
protected List[] messageLists;

@SuppressWarnings("unchecked")
protected void setUp() throws Exception {
super.setUp();

Expand Down Expand Up @@ -116,6 +117,7 @@ protected String getPrefix() {
return super.getSubject() + ".";
}

@SuppressWarnings("unchecked")
protected void assertMessagesAreReceived() throws JMSException {
waitForMessagesToBeDelivered();
int size = messageLists.length;
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.hedwig.client.benchmark.BenchmarkUtils.BenchmarkCallback;
import org.apache.hedwig.client.benchmark.BenchmarkUtils.ThroughputLatencyAggregator;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.util.Callback;

Expand All @@ -50,7 +51,9 @@ public BenchmarkPublisher(int numTopics, int numMessages, int numRegions, int st
public void warmup(int nWarmup) throws Exception {
ByteString topic = ByteString.copyFromUtf8("warmup" + partitionIndex);
ByteString subId = ByteString.copyFromUtf8("sub");
subscriber.subscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH);
SubscriptionOptions opts = SubscriptionOptions.newBuilder()
.setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
subscriber.subscribe(topic, subId, opts);

subscriber.startDelivery(topic, subId, new MessageHandler() {
@Override
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.util.Callback;

public class BenchmarkSubscriber extends BenchmarkWorker implements Callable<Void> {
Expand Down Expand Up @@ -73,7 +74,9 @@ public Void call() throws Exception {

final String topic = HedwigBenchmark.TOPIC_PREFIX + i;

subscriber.subscribe(ByteString.copyFromUtf8(topic), subId, CreateOrAttach.CREATE_OR_ATTACH);
SubscriptionOptions opts = SubscriptionOptions.newBuilder()
.setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
subscriber.subscribe(ByteString.copyFromUtf8(topic), subId, opts);
subscriber.startDelivery(ByteString.copyFromUtf8(topic), subId, new MessageHandler() {

@Override
Expand Down Expand Up @@ -130,7 +133,10 @@ void multiSub(String label, String topicPrefix, int start, final int npar, final
if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, numPartitions)) {
continue;
}
subscriber.asyncSubscribe(ByteString.copyFromUtf8(topicPrefix + i), subId, CreateOrAttach.CREATE_OR_ATTACH,
SubscriptionOptions opts = SubscriptionOptions.newBuilder()
.setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
subscriber.asyncSubscribe(ByteString.copyFromUtf8(topicPrefix + i),
subId, opts,
new BenchmarkCallback(agg), null);
}
// Wait till the benchmark test has completed
Expand Down
Expand Up @@ -52,6 +52,7 @@
* This is the Hedwig Netty specific implementation of the Subscriber interface.
*
*/
@SuppressWarnings("deprecation") // so that we can implemented the Deprecated subscribe methods without a warning
public class HedwigSubscriber implements Subscriber {

private static Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class);
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.slf4j.LoggerFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
Expand All @@ -48,7 +47,6 @@
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEventResponse;
import static org.apache.hedwig.util.VarArgs.va;

@ChannelPipelineCoverage("all")
public class HChannelHandler extends SimpleChannelHandler {

private static Logger logger = LoggerFactory.getLogger(HChannelHandler.class);
Expand Down Expand Up @@ -267,7 +265,7 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) thr
// explicitly or the client has been stopped.
if (cfg.isSSLEnabled() && !channelClosedExplicitly && !channelManager.isClosed()) {
logger.debug("Initiating the SSL handshake");
ctx.getPipeline().get(SslHandler.class).handshake(e.getChannel());
ctx.getPipeline().get(SslHandler.class).handshake();
}
}

Expand Down
Expand Up @@ -346,7 +346,9 @@ public boolean runCmd(String[] args) throws Exception {
try {
for (int j=startSub; j<=endSub; j++) {
ByteString sub = ByteString.copyFromUtf8(subPrefix + j);
subscriber.subscribe(topic, sub, CreateOrAttach.CREATE_OR_ATTACH);
SubscriptionOptions opts = SubscriptionOptions.newBuilder()
.setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
subscriber.subscribe(topic, sub, opts);
subscriber.unsubscribe(topic, sub);
}
System.out.println("RMSUB " + topic.toStringUtf8() + " DONE");
Expand Down Expand Up @@ -471,7 +473,9 @@ public boolean runCmd(String[] args) throws Exception {
System.out.println("Starting PUBSUB test ...");
try {
// sub the topic
subscriber.subscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH);
SubscriptionOptions opts = SubscriptionOptions.newBuilder()
.setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
subscriber.subscribe(topic, subId, opts);
subscribed = true;

System.out.println("Sub topic " + topic.toStringUtf8() + ", subscriber id " + subId.toStringUtf8());
Expand Down
Expand Up @@ -68,21 +68,24 @@ public int complete(String buffer, int cursor, List candidates) {
READTOPIC.equalsIgnoreCase(tokens[0]))) {
return completeTopic(buffer, tokens[1], candidates);
}
List<String> cmds = HedwigCommands.findCandidateCommands(tokens);
List cmds = HedwigCommands.findCandidateCommands(tokens);
return completeCommand(buffer, tokens[tokens.length - 1], cmds, candidates);
}

@SuppressWarnings("unchecked")
private int completeCommand(String buffer, String token,
List<String> commands, List<String> candidates) {
for (String cmd : commands) {
if (cmd.startsWith(token)) {
candidates.add(cmd);
List commands, List candidates) {
for (Object cmdo : commands) {
assert (cmdo instanceof String);
if (((String)cmdo).startsWith(token)) {
candidates.add(cmdo);
}
}
return buffer.lastIndexOf(" ") + 1;
}

private int completeTopic(String buffer, String token, List<String> candidates) {
@SuppressWarnings("unchecked")
private int completeTopic(String buffer, String token, List candidates) {
try {
Iterator<ByteString> children = admin.getTopics();
int i = 0;
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
Expand All @@ -38,7 +38,7 @@
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Log4JLoggerFactory;

@ChannelPipelineCoverage("all")
@Sharable
public class FakeBookie extends SimpleChannelHandler implements
ChannelPipelineFactory {
static final Logger logger = LoggerFactory.getLogger(FakeBookie.class);
Expand Down
Expand Up @@ -264,7 +264,7 @@ public HedwigSocketAddress getServerAddr() {
* @return String
*/
public String getZkHost() {
List<Object> servers = conf.getList(ZK_HOST, null);
List servers = conf.getList(ZK_HOST, null);
if (null == servers || 0 == servers.size()) {
return "localhost";
}
Expand Down Expand Up @@ -447,6 +447,7 @@ public int getBkEnsembleSize() {
* copies of each ledger entry is written).
*
* @return int
* @deprecated please use #getBkWriteQuorumSize() and #getBkAckQuorumSize()
*/
@Deprecated
protected int getBkQuorumSize() {
Expand Down

0 comments on commit f2bb560

Please sign in to comment.