Skip to content
Browse files

Update to Cassandra 1.1

  • Loading branch information...
1 parent db7c006 commit d570aad39704425996e8c731872c2c4c9fc70252 @Vijay2win Vijay2win committed Jun 25, 2012
View
2 pom.xml
@@ -138,7 +138,7 @@
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
- <version>1.0.9</version>
+ <version>1.1.1</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
View
2 src/main/java/com/netflix/priam/backup/SnapshotBackup.java
@@ -92,7 +92,7 @@ private void takeSnapshot(final String snapshotName) throws Exception
public Void retriableCall() throws Exception
{
JMXNodeTool nodetool = JMXNodeTool.instance(config);
- nodetool.takeSnapshot(snapshotName);
+ nodetool.takeSnapshot(snapshotName, null, new String[0]);
return null;
}
}.call();
View
28 src/main/java/com/netflix/priam/resources/CassandraAdmin.java
@@ -13,6 +13,7 @@
import javax.ws.rs.GET;
import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
@@ -23,6 +24,7 @@
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.compaction.CompactionManagerMBean;
import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.commons.lang.StringUtils;
@@ -97,12 +99,12 @@ public Response cassInfo() throws IOException, InterruptedException, JSONExcepti
}
@GET
- @Path("/ring")
- public Response cassRing() throws IOException, InterruptedException, JSONException
+ @Path("/ring/{id}")
+ public Response cassRing(@PathParam("id") String keyspace) throws IOException, InterruptedException, JSONException
{
JMXNodeTool nodetool = JMXNodeTool.instance(config);
logger.info("node tool ring being called");
- return Response.ok(nodetool.ring(), MediaType.APPLICATION_JSON).build();
+ return Response.ok(nodetool.ring(keyspace), MediaType.APPLICATION_JSON).build();
}
@GET
@@ -137,11 +139,11 @@ public Response cassCleanup() throws IOException, ExecutionException, Interrupte
@GET
@Path("/repair")
- public Response cassRepair() throws IOException, ExecutionException, InterruptedException
+ public Response cassRepair(@QueryParam("sequential") boolean isSequential) throws IOException, ExecutionException, InterruptedException
{
JMXNodeTool nodetool = JMXNodeTool.instance(config);
logger.info("node tool repair being called");
- nodetool.repair();
+ nodetool.repair(isSequential);
return Response.ok(REST_SUCCESS, MediaType.APPLICATION_JSON).build();
}
@@ -195,16 +197,16 @@ public Response compactionStats() throws IOException, ExecutionException, Interr
CompactionManagerMBean cm = nodetool.getCompactionManagerProxy();
rootObj.put("pending tasks", cm.getPendingTasks());
JSONArray compStats = new JSONArray();
- for (CompactionInfo c : cm.getCompactions())
+ for (Map<String, String> c : cm.getCompactions())
{
JSONObject cObj = new JSONObject();
- cObj.put("compaction type", c.getTaskType());
- cObj.put("keyspace", c.getKeyspace());
- cObj.put("column family", c.getColumnFamily());
- cObj.put("bytes compacted", c.getBytesComplete());
- cObj.put("bytes total", c.getTotalBytes());
-
- String percentComplete = c.getTotalBytes() == 0 ? "n/a" : new DecimalFormat("0.00").format((double) c.getBytesComplete() / c.getTotalBytes() * 100) + "%";
+ cObj.put("id", c.get("id"));
+ cObj.put("keyspace", c.get("keyspace"));
+ cObj.put("columnfamily", c.get("columnfamily"));
+ cObj.put("bytesComplete", c.get("bytesComplete"));
+ cObj.put("totalBytes", c.get("totalBytes"));
+ cObj.put("taskType", c.get("taskType"));
+ String percentComplete = new Long(c.get("totalBytes")) == 0 ? "n/a" : new DecimalFormat("0.00").format((double) new Long(c.get("bytesComplete")) / new Long(c.get("totalBytes")) * 100) + "%";
cObj.put("progress", percentComplete);
compStats.put(cObj);
}
View
124 src/main/java/com/netflix/priam/utils/JMXNodeTool.java
@@ -6,10 +6,8 @@
import java.lang.reflect.Field;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
-import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -18,12 +16,10 @@
import javax.management.JMX;
import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import org.apache.cassandra.cache.InstrumentingCacheMBean;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.tools.NodeProbe;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
@@ -44,9 +40,6 @@
public class JMXNodeTool extends NodeProbe
{
private static final Logger logger = LoggerFactory.getLogger(JMXNodeTool.class);
- private static String keyCacheObjFmt = "org.apache.cassandra.db:type=Caches,keyspace=%s,cache=%sKeyCache";
- private static String rowCacheObjFmt = "org.apache.cassandra.db:type=Caches,keyspace=%s,cache=%sRowCache";
-
private static volatile JMXNodeTool tool = null;
private MBeanServerConnection mbeanServerConn = null;
@@ -180,24 +173,34 @@ public JSONObject info() throws JSONException
}
@SuppressWarnings("unchecked")
- public JSONArray ring() throws JSONException
+ public JSONArray ring(String keyspace) throws JSONException
{
logger.info("JMX ring being called");
JSONArray ring = new JSONArray();
- Map<Token, String> tokenToEndpoint = getTokenToEndpointMap();
- List<Token> sortedTokens = new ArrayList<Token>(tokenToEndpoint.keySet());
- Collections.sort(sortedTokens);
+ Map<String, String> tokenToEndpoint = getTokenToEndpointMap();
+ List<String> sortedTokens = new ArrayList<String>(tokenToEndpoint.keySet());
Collection<String> liveNodes = getLiveNodes();
Collection<String> deadNodes = getUnreachableNodes();
Collection<String> joiningNodes = getJoiningNodes();
Collection<String> leavingNodes = getLeavingNodes();
Collection<String> movingNodes = getMovingNodes();
Map<String, String> loadMap = getLoadMap();
+
+ String format = "%-16s%-12s%-12s%-7s%-8s%-16s%-20s%-44s%n";
+
// Calculate per-token ownership of the ring
- Map<Token, Float> ownerships = getOwnership();
+ Map<String, Float> ownerships;
+ try
+ {
+ ownerships = effectiveOwnership(keyspace);
+ }
+ catch (ConfigurationException ex)
+ {
+ ownerships = getOwnership();
+ }
- for (Token token : sortedTokens)
+ for (String token : sortedTokens)
{
String primaryEndpoint = tokenToEndpoint.get(token);
String dataCenter;
@@ -218,7 +221,11 @@ public JSONArray ring() throws JSONException
{
rack = "Unknown";
}
- String status = liveNodes.contains(primaryEndpoint) ? "Up" : deadNodes.contains(primaryEndpoint) ? "Down" : "?";
+ String status = liveNodes.contains(primaryEndpoint)
+ ? "Up"
+ : deadNodes.contains(primaryEndpoint)
+ ? "Down"
+ : "?";
String state = "Normal";
@@ -229,15 +236,17 @@ else if (leavingNodes.contains(primaryEndpoint))
else if (movingNodes.contains(primaryEndpoint))
state = "Moving";
- String load = loadMap.containsKey(primaryEndpoint) ? loadMap.get(primaryEndpoint) : "?";
- String owns = new DecimalFormat("##0.00%").format(ownerships.get(token));
+ String load = loadMap.containsKey(primaryEndpoint)
+ ? loadMap.get(primaryEndpoint)
+ : "?";
+ String owns = new DecimalFormat("##0.00%").format(ownerships.get(token) == null ? 0.0F : ownerships.get(token));
ring.put(createJson(primaryEndpoint, dataCenter, rack, status, state, load, owns, token));
}
logger.info(ring.toString());
return ring;
}
- private JSONObject createJson(String primaryEndpoint, String dataCenter, String rack, String status, String state, String load, String owns, Token token) throws JSONException
+ private JSONObject createJson(String primaryEndpoint, String dataCenter, String rack, String status, String state, String load, String owns, String token) throws JSONException
{
JSONObject object = new JSONObject();
object.put("endpoint", primaryEndpoint);
@@ -257,10 +266,10 @@ public void compact() throws IOException, ExecutionException, InterruptedExcepti
forceTableCompaction(keyspace, new String[0]);
}
- public void repair() throws IOException, ExecutionException, InterruptedException
+ public void repair(boolean isSequential) throws IOException, ExecutionException, InterruptedException
{
for (String keyspace : getKeyspaces())
- forceTableRepair(keyspace, new String[0]);
+ forceTableRepair(keyspace, isSequential, new String[0]);
}
public void cleanup() throws IOException, ExecutionException, InterruptedException
@@ -298,79 +307,4 @@ public void close() throws IOException
super.close();
}
}
-
- public Iterator<Map.Entry<String, InstrumentingCacheMBean>> getKeyCacheMBeanProxies(IConfiguration config)
- {
- try
- {
- return new CacheMBeanIterator(mbeanServerConn, keyCacheObjFmt);
- }
- catch (MalformedObjectNameException e)
- {
- throw new RuntimeException("Invalid ObjectName", e);
- }
- catch (IOException e)
- {
- throw new RuntimeException("Could not retrieve list of stat mbeans.", e);
- }
- }
-
- public Iterator<Map.Entry<String, InstrumentingCacheMBean>> getRowCacheMBeanProxies(IConfiguration config)
- {
- try
- {
- return new CacheMBeanIterator(mbeanServerConn, rowCacheObjFmt);
- }
- catch (MalformedObjectNameException e)
- {
- throw new RuntimeException("Invalid ObjectName.", e);
- }
- catch (IOException e)
- {
- throw new RuntimeException("Could not retrieve list of stat mbeans.", e);
- }
- }
-
- class CacheMBeanIterator implements Iterator<Map.Entry<String, InstrumentingCacheMBean>>
- {
- private Iterator<ObjectName> resIter;
- private MBeanServerConnection mbeanServerConn;
- private String cachePath;
-
- public CacheMBeanIterator(MBeanServerConnection mbeanServerConn, String cachePath) throws MalformedObjectNameException, NullPointerException, IOException
- {
- ObjectName query = new ObjectName("org.apache.cassandra.db:type=ColumnFamilies,*");
- resIter = mbeanServerConn.queryNames(query, null).iterator();
- this.mbeanServerConn = mbeanServerConn;
- this.cachePath = cachePath;
- }
-
- public boolean hasNext()
- {
- return resIter.hasNext();
- }
-
- public Entry<String, InstrumentingCacheMBean> next()
- {
- ObjectName objectName = resIter.next();
- String tableName = objectName.getKeyProperty("keyspace");
- String cfName = objectName.getKeyProperty("columnfamily");
- String keyCachePath = String.format(cachePath, tableName, cfName);
- InstrumentingCacheMBean cacheProxy = null;
- try
- {
- cacheProxy = JMX.newMBeanProxy(mbeanServerConn, new ObjectName(keyCachePath), InstrumentingCacheMBean.class);
- }
- catch (Exception e)
- {
- logger.error("Cannot get cache MBean", e);
- }
- return new AbstractMap.SimpleImmutableEntry<String, InstrumentingCacheMBean>(tableName + "_" + cfName, cacheProxy);
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
}
View
2 src/test/java/com/netflix/priam/backup/TestBackup.java
@@ -143,7 +143,7 @@ public static void cleanup(File dir)
}
@Mock
- public void takeSnapshot(String snapshotName, String... keyspaces) throws IOException
+ public void takeSnapshot(String snapshotName, String columnFamily, String... keyspaces) throws IOException
{
File tmp = new File("cass/data/");
if (tmp.exists())

0 comments on commit d570aad

Please sign in to comment.
Something went wrong with that request. Please try again.