Skip to content
This repository has been archived by the owner on Apr 5, 2019. It is now read-only.

Commit

Permalink
Fixed various bugs in CassandraStorage related to super columns
Browse files Browse the repository at this point in the history
1) The marshaller for the super column name was being used for the
   sub column names as well, code was added to use the sub column
   comparator to determine the marshaller for the sub column names.
2) The IntegerType in cassandra returns a "BigInteger".  Pig does
   not support this type.  The CassandraStorage was changed to
   handle this type specially.  If the value will fit in an int, an
   Integer will be returned.  If the value will fit in a long, a
   Long will be returned.  Otherwise a byte array will be returned.
3) The version of Pig was upgraded to 0.9.0.  This did not cause
   any issues I can see but allowed me to discover the cause of #2
   above.
  • Loading branch information
danapsimer committed Oct 19, 2011
1 parent a3b7160 commit 0225213
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 15 deletions.
2 changes: 1 addition & 1 deletion build.properties.default
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ artifact.remoteRepository.datastax-release: http://mvn.riptano.com/content/re
hadoop-core.build.version: 0.20.203.1-brisk1-beta2
hadoop.build.version: 0.20.203.1-brisk1-beta2
hive.build.version: 0.7.0-brisk1-SNAPSHOT
pig.build.version: 0.8.3
pig.build.version: 0.9.0
cassandra.build.version: 0.8.1-brisk1-beta2
cassandra-thrift.build.version: 0.8.1-brisk1-beta2
cassandra-cql.build.version: 1.0.1-SNAPSHOT
Expand Down
50 changes: 36 additions & 14 deletions src/java/src/org/apache/cassandra/pig/CassandraStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.cassandra.hadoop.pig;

import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.*;

Expand All @@ -32,6 +33,7 @@
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.Deletion;
Expand Down Expand Up @@ -136,33 +138,50 @@ public Tuple getNext() throws IOException
}
}

private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException {
return columnToTuple(name,col,cfDef,0);
}

private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef, int level) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);

pair.set(0, marshallers.get(2*level).compose(name));
if (col instanceof Column)
{
// standard
pair.set(0, marshallers.get(0).compose(name));
if (validators.get(name) == null)
if (validators.get(name) == null) {
// Have to special case BytesType because compose returns a ByteBuffer
if (marshallers.get(1) instanceof BytesType)
if (marshallers.get(1) instanceof BytesType) {
pair.set(1, new DataByteArray(ByteBufferUtil.getArray(col.value())));
else
} else if (marshallers.get(1) instanceof IntegerType) {
// Handle IntegerType specially because Pig cannot handle BigIntegers
BigInteger value = ((BigInteger)marshallers.get(1).compose(col.value()));
if ( value.bitLength() < 32 ) {
pair.set(1, value.intValue() );
} else if ( value.bitLength() < 64 ) {
pair.set(1,value.longValue());
} else {
pair.set(1, new DataByteArray(value.toByteArray()));
}
} else {
pair.set(1, marshallers.get(1).compose(col.value()));
else
}
} else {
pair.set(1, validators.get(name).compose(col.value()));
return pair;
}

// super
ArrayList<Tuple> subcols = new ArrayList<Tuple>();
for (IColumn subcol : col.getSubColumns())
subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
}
} else if ( col instanceof SuperColumn ) {
// super
ArrayList<Tuple> subcols = new ArrayList<Tuple>();
for (IColumn subcol : col.getSubColumns())
subcols.add(columnToTuple(subcol.name(), subcol, cfDef, 1));

pair.set(1, new DefaultDataBag(subcols));
pair.set(1, new DefaultDataBag(subcols));
} else {
throw new IOException("unknown column type: "+col.getClass().getName());
}
return pair;
}

Expand All @@ -177,10 +196,12 @@ private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
{
ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
AbstractType comparator = null;
AbstractType subcomparator = null;
AbstractType default_validator = null;
try
{
comparator = TypeParser.parse(cfDef.comparator_type);
subcomparator = TypeParser.parse(cfDef.subcomparator_type);
default_validator = TypeParser.parse(cfDef.default_validation_class);
}
catch (ConfigurationException e)
Expand All @@ -190,6 +211,7 @@ private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException

marshallers.add(comparator);
marshallers.add(default_validator);
marshallers.add(subcomparator);
return marshallers;
}

Expand Down

0 comments on commit 0225213

Please sign in to comment.