Skip to content

Commit

Permalink
DL-114: Add namespace watch tool
Browse files Browse the repository at this point in the history
Port simple namespace watch tool from downstream

Author: Leigh Stewart <agrodellic@gmail.com>
Author: Leigh Stewart <lstewart@twitter.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #80 from leighst/lstewart/dlog/watch_tool
  • Loading branch information
leighst authored and Sijie Guo committed Dec 29, 2016
1 parent dd8ee62 commit 0ed8723
Showing 1 changed file with 52 additions and 0 deletions.
Expand Up @@ -55,6 +55,7 @@

import com.twitter.distributedlog.BKDistributedLogNamespace;
import com.twitter.distributedlog.Entry;
import com.twitter.distributedlog.callback.NamespaceListener;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.util.Utils;
Expand Down Expand Up @@ -489,6 +490,56 @@ protected void printStreams(com.twitter.distributedlog.DistributedLogManagerFact
}
}

public static class WatchNamespaceCommand extends PerDLCommand implements NamespaceListener {
private Set<String> currentSet = Sets.<String>newHashSet();
private CountDownLatch doneLatch = new CountDownLatch(1);

WatchNamespaceCommand() {
super("watch", "watch and report changes for a dl namespace");
}

@Override
protected void parseCommandLine(CommandLine cmdline) throws ParseException {
super.parseCommandLine(cmdline);
}

@Override
protected String getUsage() {
return "watch [options]";
}

@Override
protected int runCmd() throws Exception {
watchAndReportChanges(getNamespace());
doneLatch.await();
return 0;
}

@Override
public synchronized void onStreamsChanged(Iterator<String> streams) {
Set<String> updatedSet = Sets.newHashSet(streams);
Set<String> oldStreams = Sets.difference(currentSet, updatedSet);
Set<String> newStreams = Sets.difference(updatedSet, currentSet);
currentSet = updatedSet;

System.out.println("Old streams : ");
for (String stream : oldStreams) {
System.out.println(stream);
}

System.out.println("New streams : ");
for (String stream : newStreams) {
System.out.println(stream);
}

System.out.println("");
}

protected void watchAndReportChanges(DistributedLogNamespace namespace) throws Exception {
namespace.registerNamespaceListener(this);
}
}

protected static class InspectCommand extends PerDLCommand {

int numThreads = 1;
Expand Down Expand Up @@ -2696,6 +2747,7 @@ public DistributedLogTool() {
addCommand(new TruncateStreamCommand());
addCommand(new DeserializeDLSNCommand());
addCommand(new SerializeDLSNCommand());
addCommand(new WatchNamespaceCommand());
}

@Override
Expand Down

0 comments on commit 0ed8723

Please sign in to comment.