Skip to content

Commit

Permalink
merge from 0.8
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/cassandra/trunk@1152107 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
jbellis committed Jul 29, 2011
2 parents 99f0cf7 + 8efd368 commit d505bde
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 120 deletions.
13 changes: 13 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@
(CASSANDRA-1951)


0.8.3
* add ability to drop local reads/writes that are going to timeout
(CASSANDRA-2943)
* revamp token removal process, keep gossip states for 3 days (CASSANDRA-2946)
* don't accept extra args for 0-arg nodetool commands (CASSANDRA-2740)
* log unavailableexception details at debug level (CASSANDRA-2856)
* expose data_dir though jmx (CASSANDRA-2770)
* don't include tmp files as sstable when create cfs (CASSANDRA-2929)
* log Java classpath on startup (CASSANDRA-2895)
* keep gossipped version in sync with actual on migration coordinator
(CASSANDRA-2946)


0.8.2
* CQL:
- include only one row per unique key for IN queries (CASSANDRA-2717)
Expand Down
10 changes: 10 additions & 0 deletions NEWS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ Upgrading
sstableloader tool instead.


0.8.3
=====

Upgrading
---------
- Token removal has been revamped. Removing tokens in a mixed cluster with
0.8.3 will not work, so the entire cluster will need to be running 0.8.3
first, except for the dead node.


0.8.2
=====

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface
public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
public final static String PIG_PARTITIONER = "PIG_PARTITIONER";

private static String UDFCONTEXT_SCHEMA_KEY_PREFIX = "cassandra.schema";

private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private static final Log logger = LogFactory.getLog(CassandraStorage.class);

Expand All @@ -79,6 +77,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface
private boolean slice_reverse = false;
private String keyspace;
private String column_family;
private String loadSignature;
private String storeSignature;

private Configuration conf;
private RecordReader reader;
Expand Down Expand Up @@ -113,7 +113,7 @@ public Tuple getNext() throws IOException
if (!reader.nextKeyValue())
return null;

CfDef cfDef = getCfDef();
CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
assert key != null && cf != null;
Expand Down Expand Up @@ -166,11 +166,11 @@ private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IO
return pair;
}

private CfDef getCfDef()
private CfDef getCfDef(String signature)
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(CassandraStorage.class);
return cfdefFromString(property.getProperty(getSchemaContextKey()));
return cfdefFromString(property.getProperty(signature));
}

private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
Expand Down Expand Up @@ -290,7 +290,7 @@ public void setLocation(String location, Job job) throws IOException
}
ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
initSchema();
initSchema(loadSignature);
}

@Override
Expand All @@ -299,9 +299,16 @@ public String relativeToAbsolutePath(String location, Path curDir) throws IOExce
return location;
}

@Override
public void setUDFContextSignature(String signature)
{
this.loadSignature = signature;
}

/* StoreFunc methods */
public void setStoreFuncUDFContextSignature(String signature)
{
this.storeSignature = signature;
}

public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
Expand All @@ -315,7 +322,7 @@ public void setStoreLocation(String location, Job job) throws IOException
setLocationFromUri(location);
ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
initSchema();
initSchema(storeSignature);
}

public OutputFormat getOutputFormat()
Expand Down Expand Up @@ -347,7 +354,7 @@ public void putNext(Tuple t) throws ExecException, IOException
ByteBuffer key = objToBB(t.get(0));
DefaultDataBag pairs = (DefaultDataBag) t.get(1);
ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
CfDef cfDef = getCfDef();
CfDef cfDef = getCfDef(storeSignature);
List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
try
Expand Down Expand Up @@ -412,7 +419,7 @@ public void putNext(Tuple t) throws ExecException, IOException
}
catch (ClassCastException e)
{
throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily");
throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily", e);
}
try
{
Expand All @@ -430,14 +437,13 @@ public void cleanupOnFailure(String failure, Job job)

/* Methods to get the column family schema from Cassandra */

private void initSchema()
private void initSchema(String signature)
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(CassandraStorage.class);

String schemaContextKey = getSchemaContextKey();
// Only get the schema if we haven't already gotten it
if (!property.containsKey(schemaContextKey))
if (!property.containsKey(signature))
{
Cassandra.Client client = null;
try
Expand All @@ -455,7 +461,7 @@ private void initSchema()
break;
}
}
property.setProperty(schemaContextKey, cfdefToString(cfDef));
property.setProperty(signature, cfdefToString(cfDef));
}
catch (TException e)
{
Expand Down Expand Up @@ -521,14 +527,4 @@ private static CfDef cfdefFromString(String st)
}
return cfDef;
}

private String getSchemaContextKey()
{
StringBuilder sb = new StringBuilder(UDFCONTEXT_SCHEMA_KEY_PREFIX);
sb.append('.');
sb.append(keyspace);
sb.append('.');
sb.append(column_family);
return sb.toString();
}
}
5 changes: 4 additions & 1 deletion doc/cql/CQL.textile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1. Cassandra Query Language (CQL) v1.0.0
h1. Cassandra Query Language (CQL) v1.1.0

h2. Table of Contents

Expand Down Expand Up @@ -364,5 +364,8 @@ Versioning of the CQL language adheres to the "Semantic Versioning":http://semve
h1. Changes

pre.
Sat, 01 Jun 2011 15:58:00 -0600 - Pavel Yaskevich
* Updated to support ALTER (CASSANDRA-1709)

Tue, 22 Mar 2011 18:10:28 -0700 - Eric Evans <eevans@rackspace.com>
* Initial version, 1.0.0
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/db/HintedHandOffManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedExcep
}
waited = 0;
// then wait for the correct schema version.
// usually we use DD.getDefsVersion, which checks the local schema uuid as stored in the system table.
// here we check the one in gossip instead; this serves as a canary to warn us if we introduce a bug that
// causes the two to diverge (see CASSANDRA-2946)
while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value))
{
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/gms/ApplicationState.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public enum ApplicationState
DC,
RACK,
RELEASE_VERSION,
REMOVAL_COORDINATOR,
INTERNAL_IP,
// pad to allow adding new states to existing cluster
X1,
Expand Down
Loading

0 comments on commit d505bde

Please sign in to comment.