Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

simple query on composite column not working #80

Closed
jimooo opened this Issue · 26 comments

6 participants

@jimooo

I couldn't find a mailing list for this question so I'll ask here. Feel free to point me to a different place to ask this.

I have a simple composite column that consists of string:string:long. When I try to do an allrows query on the first string in the composite, I don't get any results. What's worse, when I insert some nonsense code before my query, then I get the entire row back, which is still not the expected result.

I have checked that my column family exists and has data in it. I have tried a bunch of different things but I still can't get it to work. I was able to use cql3 to produce the exact same column family and the query works just fine in cql3.

I'll include my code here if someone wants to take a look and see if I'm doing something wrong either in my query, or my composite object, or in my column family creation.

I'll include 3 files: my database object which creates the table and makes the query, the composite object, and a simple helper bean.

Here is my database object which creates the table and runs the query. The nonsense code is commented out before my query. Enabling the nonsense code makes the query give different results.

package testing;

import java.util.Date;

import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.model.Rows;
import com.netflix.astyanax.serializers.AnnotatedCompositeSerializer;
import com.netflix.astyanax.serializers.CompositeRangeBuilder;
import com.netflix.astyanax.serializers.StringSerializer;

public class CompositeTestDB extends AstyanaxDB
{
   private ColumnFamily<String, MyCompositeColumnType>  columnFamily;

   public CompositeTestDB()
   {
   }

   @Override
   public String getColumnFamilyName()
   {
      return ("CompositeTestCF");
   }

   @Override
   protected void init()
   {
      if (!isInitialized())
      {
         super.init();

         // create the column family object
         columnFamily = ColumnFamily.newColumnFamily(
                  getColumnFamilyName(),        // column family name
                  StringSerializer.get(),       // row key serializer
                  new AnnotatedCompositeSerializer<>(MyCompositeColumnType.class));    // column serializer
      }
   }

   /**
    * Add a new column family with the passed columnFamily Name
    * @param theColumnFamilyName
    * @throws ConnectionException
    */
   @Override
   protected void addColumnFamily(final String theColumnFamilyName) throws ConnectionException
   {
      init();

      if (!columnFamilyExists(theColumnFamilyName))
      {
         final ColumnFamilyDefinition def = getCluster().makeColumnFamilyDefinition();
         getLog().debug("creating column family: " + theColumnFamilyName);

         def.setName(theColumnFamilyName);
         def.setKeyspace(KEYSPACENAME);

         // row type
         def.setKeyValidationClass("UTF8Type");

         // column type
         // this has to match what is in the class that implements the composite column type
         def.setComparatorType("CompositeType(UTF8Type, UTF8Type, LongType)");

         // value type
//         def.setDefaultValidationClass("AsciiType");

         // do it!
         getCluster().addColumnFamily(def);
      }
      else
      {
         getLog().debug("can't create column family: " + theColumnFamilyName + ". it already exists. Skipping...");
      }
   }

   public void save(final TestData testData) throws ConnectionException
   {
      init();
      final MyCompositeColumnType columnName = new MyCompositeColumnType(testData.getStr1(), testData.getStr2(), testData.getTimestamp());

      // Inserting data
      final MutationBatch mutator = getKeyspace().prepareMutationBatch();

      mutator.withRow(columnFamily, testData.getRowKey()).putColumn(columnName, new byte[] {1,2,3});
      mutator.execute();
   }

   @SuppressWarnings("boxing")
   public void fetchAllRowsForStr1(final String theStr1) throws ConnectionException
   {
      init();
      System.out.println("-----------------------------");
      System.out.println(String.format("Fetching all rows for %s", theStr1));

      final AnnotatedCompositeSerializer<MyCompositeColumnType> serializer =
               new AnnotatedCompositeSerializer<>(MyCompositeColumnType.class);

      final CompositeRangeBuilder rangeBuilder = serializer.buildRange()
               .withPrefix(theStr1);

//      final AllRowsQuery<String, MyCompositeColumnType> i = getKeyspace().prepareQuery(columnFamily)
//               .getAllRows()
//               .setRowLimit(10)
//               .withColumnRange(rangeBuilder.build());

      final OperationResult<Rows<String, MyCompositeColumnType>> result = getKeyspace().prepareQuery(columnFamily)
               .getAllRows()
               .setRowLimit(10)
               .withColumnRange(rangeBuilder.build())
               .execute();

      // now process the results
      for (Row<String, MyCompositeColumnType> row : result.getResult())
      {
         System.out.println(String.format("In row %s found %d columns", row.getKey(), row.getColumns().size()));
      }
   }

   @SuppressWarnings("static-method")
   public void createRows(final CompositeTestDB db) throws ConnectionException
   {
      for (int row = 0; row < 8; row++)
      {
         for (int firstNumber = 0; firstNumber < 4; firstNumber++)
         {
            for (int secondNumber = 0; secondNumber < 3; secondNumber++)
            {
               final TestData data = new TestData("RowKey_" + row, "First_" + firstNumber, "Second_" + secondNumber, new Date().getTime());
               db.save(data);
            }
         }
      }
   }

   public static void main(final String[] args)
   {
      try
      {
         final CompositeTestDB testDb = new CompositeTestDB();
         testDb.dropColumnFamily();
         testDb.addColumnFamily();
         testDb.createRows(testDb);
         testDb.fetchAllRowsForStr1("First_2");
      }
      catch (final ConnectionException e)
      {
         e.printStackTrace();
      }
   }
}

Here's my composite type. I don't think I did anything wrong here but I might have missed something. The equals and hash code were generated by eclipse. I did the compareTo myself.

package testing;

import com.netflix.astyanax.annotations.Component;

/**
 * Composite type object for use in astyanax. It looks more complicated than it is. You just need to follow these rules:
 *      each part of the composite column must have the @Commponent annotation
 *      There should be a default constructor and a constructor that takes all the components
 *      have eclipse generate the getters and setter
 *      have eclipse generate the toHash and equals
 *      make your class extend Comparable and implement the compareTo method
 *      overriding toString is optional but very helpful in debugging
 *
 */
public class MyCompositeColumnType implements Comparable<MyCompositeColumnType>
{
   private static final int BEFORE = -1;
   private static final int EQUAL = 0;
   private static final int AFTER = 1;

   @Component(ordinal = 0)
   private String str1;

   @Component(ordinal = 1)
   private String str2;

   @Component(ordinal = 2)
   private long timestamp;


   public MyCompositeColumnType()
   {
   }

   public MyCompositeColumnType(final String theStr1, final String theStr2, final long theTimestamp)
   {
      str1 = theStr1;
      str2 = theStr2;
      timestamp = theTimestamp;
   }

   /**
    * @return the str1
    */
   protected String getStr1()
   {
      return str1;
   }

   /**
    * @param str1 the str1 to set
    */
   protected void setStr1(final String theStr1)
   {
      this.str1 = theStr1;
   }

   /**
    * @return the str2
    */
   protected String getStr2()
   {
      return str2;
   }

   /**
    * @param str2 the str2 to set
    */
   protected void setStr2(final String theStr2)
   {
      this.str2 = theStr2;
   }

   public long getTimestamp()
   {
      return timestamp;
   }

   public void setTimestamp(final long theTimestamp)
   {
      this.timestamp = theTimestamp;
   }

   @Override
   public int compareTo(final MyCompositeColumnType other)
   {

      int returnVal = EQUAL;

      //this optimization is usually worthwhile, and can
      //always be added
      if ( this == other )
      {
         returnVal = EQUAL;
      }
      else
      {
         int comparison = this.str1.compareTo(other.getStr1());
         if ( comparison != EQUAL )
         {
            comparison = this.str2.compareTo(other.getStr2());
            if ( comparison != EQUAL )
            {
               returnVal = comparison;
            }
            else if (this.timestamp < other.getTimestamp())
            {
               returnVal = BEFORE;
            }
            else if (this.timestamp > other.getTimestamp())
            {
               returnVal = AFTER;
            }
         }
      }
      return (returnVal);
   }

   /* (non-Javadoc)
    * @see java.lang.Object#toString()
    */
   @Override
   public String toString()
   {
      return (str1 + ":" + str2 + ":" + timestamp);
   }


   /* (non-Javadoc)
    * @see java.lang.Object#hashCode()
    */
   @Override
   public int hashCode()
   {
      final int prime = 31;
      int result = 1;
      result = prime * result + ((str2 == null) ? 0 : str2.hashCode());
      result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
      result = prime * result + ((str1 == null) ? 0 : str1.hashCode());
      return result;
   }

   /* (non-Javadoc)
    * @see java.lang.Object#equals(java.lang.Object)
    */
   @Override
   public boolean equals(final Object obj)
   {
      if (this == obj)
      {
         return true;
      }
      if (obj == null)
      {
         return false;
      }
      if (getClass() != obj.getClass())
      {
         return false;
      }
      final MyCompositeColumnType other = (MyCompositeColumnType) obj;
      if (str2 == null)
      {
         if (other.str2 != null)
         {
            return false;
         }
      }
      else if (!str2.equals(other.str2))
      {
         return false;
      }
      if (timestamp != other.timestamp)
      {
         return false;
      }
      if (str1 == null)
      {
         if (other.str1 != null)
         {
            return false;
         }
      }
      else if (!str1.equals(other.str1))
      {
         return false;
      }
      return true;
   }
}

and now the simple bean to hold the data:

package testing;

public class TestData
{
   private String rowKey;
   private String str1;
   private String str2;
   private long timestamp;

   /**
    *
    */
   public TestData()
   {
   }

   public TestData(final String theRowKey, final String theStr1, final String theStr2, final long theTimestamp)
   {
      rowKey = theRowKey;
      str1 = theStr1;
      str2 = theStr2;
      timestamp = theTimestamp;
   }

   /**
    * @return the rowKey
    */
   protected String getRowKey()
   {
      return rowKey;
   }

   /**
    * @param theRowKey the rowKey to set
    */
   protected void setRowKey(final String theRowKey)
   {
      this.rowKey = theRowKey;
   }

   /**
    * @return the str1
    */
   protected String getStr1()
   {
      return str1;
   }

   /**
    * @param theStr1 the str1 to set
    */
   protected void setStr1(final String theStr1)
   {
      this.str1 = theStr1;
   }

   /**
    * @return the str2
    */
   protected String getStr2()
   {
      return str2;
   }

   /**
    * @param theStr2 the str2 to set
    */
   protected void setStr2(final String theStr2)
   {
      this.str2 = theStr2;
   }

   /**
    * @return the timestamp
    */
   protected long getTimestamp()
   {
      return timestamp;
   }

   /**
    * @param theTimestamp the timestamp to set
    */
   protected void setTimestamp(final long theTimestamp)
   {
      this.timestamp = theTimestamp;
   }
}
@cballock

I have a similar problem. I have a column family with the following composite type:

CompositeType(BytesType, BytesType, BytesType, LongType)

When I try to make an "all rows query" on the first part of the composite column with a range created with the CompositeRangeBuilder, the result is always empty. If I make an equivalent "row query" on the column family, the correct columns are returned.

Please, I need urgent help.

@elandau
Owner

I have confirmed the bug and will need some time to figure out a clean solution. In the meantime you can work around the bug by adding lessThan/lessThanEquals and greaterThan/greaterThanEquals to the range. For example,

final CompositeRangeBuilder rangeBuilder = serializer.buildRange()
.withPrefix(theStr1).greaterThan("part2lower").lessThan("part2upper).build();

@cballock

Hi, Landau.
I´ve tried the basic idea behind your workaround and since I need an equality (the equivalent of the withPrefix() method) and can´t define the upper value of the second part of my composite column name (can be any arbitrary byte array) I used the "greaterThanEquals" and "lessThanEquals" methods like this:

CompositeRangeBuilder range = serializer.buildRange().greaterThanEquals(value).lessThanEquals(value).build();

When I try to run this with Cassandra, I get the following error:

"Invalid bytes remaining after an end-of-component at component0"

Searching for the class that generates this error in Cassandra, I´ve found AbstractCompositeType:

http://svn.apache.org/repos/asf/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java

If you look in the validate(ByteBuffer bytes) method of the above class you´ll find the following logic:

public void validate(ByteBuffer bytes) throws MarshalException
{
ByteBuffer bb = bytes.duplicate();

    int i = 0;
    while (bb.remaining() > 0)
    {
        AbstractType comparator = validateNextComparator(i, bb);

        if (bb.remaining() < 2)
            throw new MarshalException("Not enough bytes to read value size of component " + i);
        int length = getShortLength(bb);

        if (bb.remaining() < length)
            throw new MarshalException("Not enough bytes to read value of component " + i);
        ByteBuffer value = getBytes(bb, length);

        comparator.validate(value);

        if (bb.remaining() == 0)
            throw new MarshalException("Not enough bytes to read the end-of-component byte of component" + i);
        byte b = bb.get();
        if (b != 0 && bb.remaining() != 0)
            throw new MarshalException("Invalid bytes remaining after an end-of-component at component" + i);
        ++i;
    }
}

It seems that, when sending data to Cassandra with the "greaterThanEquals" and "lessThanEquals", Astyanax is also sending invalid remaining bytes.

Please, can you confirm that? Is there any other workaround for my case?

Thank you.

@elandau
Owner

I ran your code and it works fine. I'm wondering if this may be related to the cassandra version, although that would be pretty bad if this broke. Which version are you using? Also, since you are using strings you would probably want to structure your query as follows,

serializer.buildRange().greaterThanEquals(value).lessThanEquals(value + "\uFFFF").build();

@cballock

Hi, Landau.
I am using version 1.1.2 of Cassandra and version 1.0.3 of Astyanax.
The first 3 parts of my composite column family name are byte arrays, not Strings:

CompositeType(BytesType, BytesType, BytesType, LongType)

With which version of Cassandra/Astyanax did you test?

@cballock

Landau, me again.
Based on your suggestion and since the first 3 parts of my composite column name are byte arrays, I´ve tried the following:

byte[] sufixedValue = ArrayUtils.addAll(value, new String("\uFFFF").getBytes()); // ArrayUtils is an Apache Commons utility class;
serializer.buildRange().greaterThanEquals(value).lessThanEquals(sufixedValue).build();

But the same error occur:

InvalidRequestException(why:Invalid bytes remaining after an end-of-component at component0)

@elandau
Owner

Can you try this and see if it works.

.withColumnRange(serializer.buildRange().greaterThanEquals("First_0").lessThanEquals("First_0" + "\uFFFF").build())

Can you also try changing your column family definition to use UTF8Type instead of BytesType.

@elandau
Owner

I tried several other combinations and am still not able to reproduce this. Can you write a complete unit test with code to create the column family on cassandra as well as the code to populate and query it.

@cballock

Hi, Landau.
Since I cannot upload files here, I will add the 3 java classes from my test case as 3 distinct comments below.
I´ve put a mark in the Main class test() method line where I get the error from Cassandra.

@cballock

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Cluster;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.connectionpool.impl.SmaLatencyScoreStrategyImpl;
import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
import com.netflix.astyanax.ddl.KeyspaceDefinition;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.query.ColumnFamilyQuery;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.astyanax.serializers.AnnotatedCompositeSerializer;
import com.netflix.astyanax.serializers.CompositeRangeBuilder;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;

public class Main {

// configuration parameters

private String clusterName = "test";

private String keyspaceName = "test";

private String strategyClass = "NetworkTopologyStrategy";

private String strategyOptions = "DC1=1;";

private String poolName = "test";

private Integer connectTimeout = 2000;

private Integer socketTimeout = 10000;

private String seeds = "127.0.0.1:9160";

private Integer maximumTimeoutWhenExhausted = 2000;

private Integer clusterPort = 9160;

private Integer maximumConnectionsPerHost = 25;

private Integer initConnectionsPerHost = 5;

private Integer maximumFailoverCount = -1;

private Integer maximumConnections = 100;

private Integer latencyAwareWindowSize = 100;

private Float latencyAwareSentinelCompare = 0.768f;

private Float latencyAwareBadnessThreshold = 0.40f;

private Integer latencyAwareUpdateInterval = 10000;

private Integer latencyAwareResetInterval = 60000;

private Integer connectionLimiterWindowSize = 2000;

private Integer connectionLimiterMaximumPendingCount = 50;

private Integer maximumPendingConnectionsPerHost = 5;

private Integer maximumBlockedThreadsPerHost = 10;

private Integer maximumTimeoutCount = 3;

private Integer timeoutWindow = 10000;

private Integer retrySuspendWindow = 20000;

private Integer retryDelaySlice = 1000;

private Integer retryMaximumDelaySlice = 10;

private Integer maximumOperationsPerConnection = 10000;

private Integer schemaSetupSleepInterval = 2000;

// internal attributes

private AstyanaxContext<Cluster> clusterContext = null;

private AstyanaxContext<Keyspace> keyspaceContext = null;

private MutationBatch mutationBatch  = null;

private AnnotatedCompositeSerializer<CompositeColumnName> compositeSerializer = null;

private ColumnFamily<String, CompositeColumnName> dummyCF = null;

// JUnit methods

@Before
public void before() throws Exception {

    init();

    // clean old data
    deleteColumn("1", new CompositeColumnName("part1", "part2", "part3", 1234L));
    mutationBatch.execute();
    mutationBatch.discardMutations();

    ColumnList<CompositeColumnName> columns = getColumnsSlice("1", null);
    assertTrue(columns!=null);
    assertTrue(columns.isEmpty());

}

@Test
public void test() throws Exception {

    ColumnList<CompositeColumnName> columns = null;

    putColumn("1", new CompositeColumnName("part1", "part2", "part3", 1234L), null, null);
    mutationBatch.execute();
    mutationBatch.discardMutations();

    columns = getColumnsSlice("1", null);
    assertTrue(columns!=null);
    assertTrue(columns.size()==1);
    assertTrue(columns.getColumnByIndex(0).getName().getPart1(String.class).equals("part1"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart2(String.class).equals("part2"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart3(String.class).equals("part3"));
    assertTrue(columns.getColumnByIndex(0).getName().getTimestamp()==1234L);

    List<CompositeColumnName> sliceColumnNames = new ArrayList<CompositeColumnName>();
    sliceColumnNames.add(new CompositeColumnName("part1", "part2", "part3", 1234L));
    columns = getColumnsSlice("1", sliceColumnNames);
    assertTrue(columns!=null);
    assertTrue(columns.size()==1);
    assertTrue(columns.getColumnByIndex(0).getName().getPart1(String.class).equals("part1"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart2(String.class).equals("part2"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart3(String.class).equals("part3"));
    assertTrue(columns.getColumnByIndex(0).getName().getTimestamp()==1234L);

    columns = getColumnsRange("1", new CompositeColumnName("part1", null, null, null));
    assertTrue(columns!=null);
    assertTrue(columns.size()==1);
    assertTrue(columns.getColumnByIndex(0).getName().getPart1(String.class).equals("part1"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart2(String.class).equals("part2"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart3(String.class).equals("part3"));
    assertTrue(columns.getColumnByIndex(0).getName().getTimestamp()==1234L);

    columns = getColumnsRange("1", new CompositeColumnName("part1", "part2", null, null)); // my test case fails here with the "InvalidRequestException(why:Invalid bytes remaining after an end-of-component at component0)";
    assertTrue(columns!=null);
    assertTrue(columns.size()==1);
    assertTrue(columns.getColumnByIndex(0).getName().getPart1(String.class).equals("part1"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart2(String.class).equals("part2"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart3(String.class).equals("part3"));
    assertTrue(columns.getColumnByIndex(0).getName().getTimestamp()==1234L);

    columns = getColumnsRange("1", new CompositeColumnName("part1", "part2", "part3", null));
    assertTrue(columns!=null);
    assertTrue(columns.size()==1);
    assertTrue(columns.getColumnByIndex(0).getName().getPart1(String.class).equals("part1"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart2(String.class).equals("part2"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart3(String.class).equals("part3"));
    assertTrue(columns.getColumnByIndex(0).getName().getTimestamp()==1234L);

    columns = getColumnsRange("1", new CompositeColumnName("part1", "part2", "part3", 1234L));
    assertTrue(columns!=null);
    assertTrue(columns.size()==1);
    assertTrue(columns.getColumnByIndex(0).getName().getPart1(String.class).equals("part1"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart2(String.class).equals("part2"));
    assertTrue(columns.getColumnByIndex(0).getName().getPart3(String.class).equals("part3"));
    assertTrue(columns.getColumnByIndex(0).getName().getTimestamp()==1234L);
}

@After
public void after() {
    shutdown();
}

// internal methods

private void init() {

    try {

        AstyanaxConfigurationImpl astyanaxConfiguration = 
                new AstyanaxConfigurationImpl()
                    .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                    .setConnectionPoolType(ConnectionPoolType.TOKEN_AWARE);

        SmaLatencyScoreStrategyImpl smaLatencyScoreStrategyImpl = 
                new SmaLatencyScoreStrategyImpl(
                        latencyAwareUpdateInterval,
                        latencyAwareResetInterval,
                        latencyAwareWindowSize,
                        latencyAwareBadnessThreshold);

        ConnectionPoolConfigurationImpl poolConfiguration = 
                new ConnectionPoolConfigurationImpl(poolName)
                    .setConnectTimeout(connectTimeout)
                    .setSocketTimeout(socketTimeout)
                    .setSeeds(seeds)
                    .setMaxTimeoutWhenExhausted(maximumTimeoutWhenExhausted)
                    .setPort(clusterPort)
                    .setMaxConnsPerHost(maximumConnectionsPerHost)
                    .setInitConnsPerHost(initConnectionsPerHost)
                    .setMaxFailoverCount(maximumFailoverCount)
                    .setMaxConns(maximumConnections)
                    .setLatencyScoreStrategy(smaLatencyScoreStrategyImpl) // enabled SMA (moving average) latency analysis;
                    .setLatencyAwareWindowSize(latencyAwareWindowSize) // uses last X latency samples;
                    .setLatencyAwareSentinelCompare(latencyAwareSentinelCompare)
                    .setLatencyAwareBadnessThreshold(latencyAwareBadnessThreshold) // will sort hosts if a host has x% of the performance of the best host and always assign connections to the fastest host, otherwise will use round robin;
                    .setLatencyAwareUpdateInterval(latencyAwareUpdateInterval) // will resort hosts per token partition every X seconds;
                    .setLatencyAwareResetInterval(latencyAwareResetInterval) // will clear the latency every X seconds;
                    .setConnectionLimiterWindowSize(connectionLimiterWindowSize)
                    .setConnectionLimiterMaxPendingCount(connectionLimiterMaximumPendingCount)
                    .setMaxPendingConnectionsPerHost(maximumPendingConnectionsPerHost)
                    .setMaxBlockedThreadsPerHost(maximumBlockedThreadsPerHost)
                    .setMaxTimeoutCount(maximumTimeoutCount)
                    .setTimeoutWindow(timeoutWindow)
                    .setRetrySuspendWindow(retrySuspendWindow)
                    .setRetryDelaySlice(retryDelaySlice)
                    .setRetryMaxDelaySlice(retryMaximumDelaySlice)
                    .setMaxOperationsPerConnection(maximumOperationsPerConnection);

        AstyanaxContext.Builder builder =
                new AstyanaxContext.Builder()
                    .forCluster(clusterName)
                    .forKeyspace(keyspaceName)
                    .withAstyanaxConfiguration(astyanaxConfiguration)
                    .withConnectionPoolConfiguration(poolConfiguration)
                    .withConnectionPoolMonitor(new CountingConnectionPoolMonitor());

        clusterContext = builder.buildCluster(ThriftFamilyFactory.getInstance());
        clusterContext.start();

        keyspaceContext = builder.buildKeyspace(ThriftFamilyFactory.getInstance());
        keyspaceContext.start();

        mutationBatch = keyspaceContext.getEntity().prepareMutationBatch().setConsistencyLevel(ConsistencyLevel.CL_QUORUM);
        compositeSerializer = new AnnotatedCompositeSerializer<CompositeColumnName>(CompositeColumnName.class);
        dummyCF = new ColumnFamily<String, CompositeColumnName>("DummyCF", StringSerializer.get(), compositeSerializer);

        addKeyspace();
        addColumnFamily();

        Thread.sleep(schemaSetupSleepInterval);

    } catch (Exception exception) {

        throw new RuntimeException(exception);

    }
}

private void shutdown() {

    if(clusterContext!=null) {
        clusterContext.shutdown();
    }

    if(keyspaceContext!=null) {
        keyspaceContext.shutdown();
    }
}

private void addKeyspace() throws Exception {

    if(clusterContext.getEntity().describeKeyspace(keyspaceName)==null) {

        Map<String, String> options = new HashMap<String, String>();

        if(strategyClass.trim().equals("SimpleStrategy")) {
            options.put("replication_factor", strategyOptions);
        } else if(strategyClass.trim().equals("NetworkTopologyStrategy")) {
            StringTokenizer dataCentersTokenizer = new StringTokenizer(strategyOptions, ";");
            while(dataCentersTokenizer.hasMoreTokens()) {
                String dataCentersToken = dataCentersTokenizer.nextToken().trim();
                StringTokenizer dataCenterTokenizer = new StringTokenizer(dataCentersToken, "=");
                String dataCenter = dataCenterTokenizer.nextToken().trim();
                String dataCenterReplicationFactor = dataCenterTokenizer.nextToken().trim();
                options.put(dataCenter, dataCenterReplicationFactor);
            }
        }

        KeyspaceDefinition keyspaceDefinition = clusterContext.getEntity().makeKeyspaceDefinition();

        keyspaceDefinition.setName(keyspaceName);
        keyspaceDefinition.setStrategyClass(strategyClass);
        keyspaceDefinition.setStrategyOptions(options);

        clusterContext.getEntity().addKeyspace(keyspaceDefinition);

    }
}

private void addColumnFamily() {

    try {

        KeyspaceDefinition keyspaceDefinition = clusterContext.getEntity().describeKeyspace(keyspaceName);

        if(keyspaceDefinition.getColumnFamily(dummyCF.getName())==null) {

            ColumnFamilyDefinition columnFamilyDefinition = clusterContext.getEntity().makeColumnFamilyDefinition();

            columnFamilyDefinition.setKeyspace(keyspaceName);
            columnFamilyDefinition.setName(dummyCF.getName());
            columnFamilyDefinition.setKeyValidationClass("UTF8Type");
            columnFamilyDefinition.setComparatorType("CompositeType(BytesType, BytesType, BytesType, LongType)");

            clusterContext.getEntity().addColumnFamily(columnFamilyDefinition);
        }

    } catch(Exception exception) {

        throw new RuntimeException(exception);

    }
}

private ColumnFamilyQuery<String, CompositeColumnName> prepareQuery() {
    return keyspaceContext.getEntity().prepareQuery(dummyCF).setConsistencyLevel(ConsistencyLevel.CL_QUORUM);
}

private void putColumn(String rowId, CompositeColumnName newColumnName, Object newColumnValue, Integer ttl) {

    if(rowId!=null && newColumnName!=null) {

        try {

            ColumnListMutation<CompositeColumnName> mutation = mutationBatch.withRow(dummyCF, rowId);
            mutation.putColumn(newColumnName, AstyanaxHelper.toBytes(newColumnValue), ttl);

        } catch(Exception exception) {

            throw new RuntimeException(exception);

        }
    }
}

private void deleteColumn(String rowId, CompositeColumnName deletedColumnName) {

    if(rowId!=null && deletedColumnName!=null) {

        try {

            ColumnListMutation<CompositeColumnName> mutation = mutationBatch.withRow(dummyCF, rowId);
            mutation.deleteColumn(deletedColumnName);

        } catch(Exception exception) {

            throw new RuntimeException(exception);

        }
    }
}

private ColumnList<CompositeColumnName> getColumnsSlice(String rowId, List<CompositeColumnName> sliceColumnNames) {

    if(rowId!=null) {

        try {

            ColumnFamilyQuery<String, CompositeColumnName> columnFamilyQuery = prepareQuery();
            RowQuery<String, CompositeColumnName> rowQuery = columnFamilyQuery.getKey(rowId);

            if(sliceColumnNames!=null && !sliceColumnNames.isEmpty()) {
                rowQuery = rowQuery.withColumnSlice(sliceColumnNames);
            }

            return rowQuery.execute().getResult();

        } catch (Exception exception) {

            throw new RuntimeException(exception);

        }
    }

    return null;
}

private ColumnList<CompositeColumnName> getColumnsRange(String rowId, CompositeColumnName filterColumnName) {

    if(rowId!=null && filterColumnName!=null && filterColumnName.getPart1()!=null) {

        try {

            CompositeRangeBuilder range = buildRange(filterColumnName);

            ColumnFamilyQuery<String, CompositeColumnName> columnFamilyQuery = prepareQuery();
            RowQuery<String, CompositeColumnName> rowQuery = columnFamilyQuery.getKey(rowId).withColumnRange(range);

            return rowQuery.execute().getResult();

        } catch (Exception exception) {

            throw new RuntimeException(exception);

        }
    }

    return null;
}

private CompositeRangeBuilder buildRange(CompositeColumnName filterColumnName) {

    // See https://github.com/Netflix/astyanax/issues/80
    CompositeRangeBuilder range = compositeSerializer.buildRange().greaterThanEquals(filterColumnName.getPart1()).lessThanEquals(filterColumnName.getPart1());

    if(filterColumnName.getPart2()!=null && filterColumnName.getPart2().length>0) {

        range = range.greaterThanEquals(filterColumnName.getPart2()).lessThanEquals(filterColumnName.getPart2());

        if(filterColumnName.getPart3()!=null && filterColumnName.getPart3().length>0) {

            range = range.greaterThanEquals(filterColumnName.getPart3()).lessThanEquals(filterColumnName.getPart3());

            if(filterColumnName.getTimestamp()!=null) {

                range = range.greaterThanEquals(filterColumnName.getTimestamp()).lessThanEquals(filterColumnName.getTimestamp());

            }
        }
    }

    return range;
}

}

@cballock

import com.netflix.astyanax.annotations.Component;

public class CompositeColumnName {

@Component(ordinal=0)
private byte[] part1 = null;

@Component(ordinal=1)
private byte[] part2 = null;

@Component(ordinal=2)
private byte[] part3 = null;

@Component(ordinal=3)
private Long timestamp = null;

public CompositeColumnName() {

}

public CompositeColumnName(Object part1Object, Object part2Object, Object part3Object, Long timestamp) {
    this.part1 = AstyanaxHelper.toBytes(part1Object);
    this.part2 = AstyanaxHelper.toBytes(part2Object);
    this.part3 = AstyanaxHelper.toBytes(part3Object);
    this.timestamp = timestamp;
}

public byte[] getPart1() {
    return part1;
}

public void setPart1(byte[] part1) {
    this.part1 = part1;
}

public byte[] getPart2() {
    return part2;
}

public void setPart2(byte[] part2) {
    this.part2 = part2;
}

public byte[] getPart3() {
    return part3;
}

public void setPart3(byte[] part3) {
    this.part3 = part3;
}

public Long getTimestamp() {
    return timestamp;
}

public void setTimestamp(Long timestamp) {
    this.timestamp = timestamp;
}

public <T> T getPart1(Class<T> clazz) {
    return AstyanaxHelper.fromBytes(part1, clazz);
}

public <T> T getPart2(Class<T> clazz) {
    return AstyanaxHelper.fromBytes(part2, clazz);
}

public <T> T getPart3(Class<T> clazz) {
    return AstyanaxHelper.fromBytes(part3, clazz);
}

}

@cballock

import java.util.Date;
import java.util.UUID;

import com.netflix.astyanax.serializers.BooleanSerializer;
import com.netflix.astyanax.serializers.BytesArraySerializer;
import com.netflix.astyanax.serializers.DateSerializer;
import com.netflix.astyanax.serializers.DoubleSerializer;
import com.netflix.astyanax.serializers.FloatSerializer;
import com.netflix.astyanax.serializers.IntegerSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import com.netflix.astyanax.serializers.ObjectSerializer;
import com.netflix.astyanax.serializers.ShortSerializer;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.serializers.UUIDSerializer;

public class AstyanaxHelper {

public static byte[] toBytes(Object object) {

    if(object==null) {
        return new byte[0];
    }

    if (object.getClass()==Byte.class) {
        return BytesArraySerializer.get().toBytes(new byte[] {(Byte)object});
    } else if (object.getClass()==UUID.class) {
        return UUIDSerializer.get().toBytes((UUID)object);
    } else if (object.getClass()==Boolean.class) {
        return BooleanSerializer.get().toBytes((Boolean)object);
    } else if (object.getClass()==Short.class) {
        return ShortSerializer.get().toBytes((Short)object);
    } else if (object.getClass()==Integer.class) {
        return IntegerSerializer.get().toBytes((Integer)object);
    } else if (object.getClass()==Long.class) {
        return LongSerializer.get().toBytes((Long)object);
    } else if (object.getClass()==Float.class) {
        return FloatSerializer.get().toBytes((Float)object);
    } else if (object.getClass()==Double.class) {
        return DoubleSerializer.get().toBytes((Double)object);
    } else if (object.getClass()==Date.class) {
        return DateSerializer.get().toBytes((Date)object);
    } else if (object.getClass()==String.class) {
        return StringSerializer.get().toBytes((String)object);
    } else if (object.getClass().isEnum()) {
        return StringSerializer.get().toBytes(((Enum<?>)object).name());
    } else {
        return ObjectSerializer.get().toBytes(object);
    }
}

@SuppressWarnings("unchecked")
public static <I> I fromBytes(byte[] bytes, Class<I> clazz) {

    if(bytes==null || bytes.length==0 || clazz==null) {
        return null;
    }

    if (clazz==Byte.class) {
        return (I)(new Byte(BytesArraySerializer.get().fromBytes(bytes)[0]));
    } else if (clazz==UUID.class) {
        return (I)UUIDSerializer.get().fromBytes(bytes);
    } else if (clazz==Boolean.class) {
        return (I)BooleanSerializer.get().fromBytes(bytes);
    } else if (clazz==Short.class) {
        return (I)ShortSerializer.get().fromBytes(bytes);
    } else if (clazz==Integer.class) {
        return (I)IntegerSerializer.get().fromBytes(bytes);
    } else if (clazz==Long.class) {
        return (I)LongSerializer.get().fromBytes(bytes);
    } else if (clazz==Float.class) {
        return (I)FloatSerializer.get().fromBytes(bytes);
    } else if (clazz==Double.class) {
        return (I)DoubleSerializer.get().fromBytes(bytes);
    } else if (clazz==Date.class) {
        return (I)DateSerializer.get().fromBytes(bytes);
    } else if (clazz==String.class) {
        return (I)StringSerializer.get().fromBytes(bytes);
    } else if (clazz.isEnum()) {
        String enumValueAsString = StringSerializer.get().fromBytes(bytes);
        for(Enum<? extends Enum<?>> enumValue: (Enum[])clazz.getEnumConstants()) {
            if(enumValue.name().equals(enumValueAsString)) {
                return (I)enumValue;
            }
        }
        return null;
    } else {
        return (I)ObjectSerializer.get().fromBytes(bytes);
    }
}

}

@cballock

Hi, Landau.
Could you figure out how to fix this problem?
We really need this correction with some urgency.

@elandau
Owner

Now that I see the code I realize that what you really want here is DynamicComposite and not Composite columns. Composite columns are meant to work with a 'fixed' schema of component types. Making the component type dynamic will break sorting of the columns and you'll end up getting incorrect results for your queries.

In addition, once you've specified a greaterThan or lessThan on a component in the range builder you cannot refine your query for subsequent components of the Composite column. This simply has to do with how the CompositeComparator was implemented in cassandra.

You can fix your buildRange() method as follows but I think you'll find that the overall approach here will have numerous edge cases where it will not do what you expect.

private CompositeRangeBuilder buildRange(CompositeColumnName filterColumnName) {

    // See https://github.com/Netflix/astyanax/issues/80
    CompositeRangeBuilder range;

    if(filterColumnName.getPart2()!=null && filterColumnName.getPart2().length>0) {

        range = compositeSerializer.buildRange()
                .withPrefix(filterColumnName.getPart1());

        if(filterColumnName.getPart3()!=null && filterColumnName.getPart3().length>0) {

            range = range.withPrefix(filterColumnName.getPart2());

            if(filterColumnName.getTimestamp()!=null) {
                range = range.withPrefix(filterColumnName.getPart3())
                        .greaterThanEquals(filterColumnName.getTimestamp())
                        .lessThanEquals(filterColumnName.getTimestamp());

            }
            else {
                range = range
                        .greaterThanEquals(filterColumnName.getPart3())
                        .lessThanEquals(filterColumnName.getPart3());
            }
        }
        else {
            range = range
                    .greaterThanEquals(filterColumnName.getPart2())
                    .lessThanEquals(filterColumnName.getPart2());
        }
    }
    else {
        range = compositeSerializer.buildRange()
                .greaterThanEquals(filterColumnName.getPart1())
                .lessThanEquals(filterColumnName.getPart1());
    }

    return range;
}
@cballock

Hi Landau,
Thank you for your reply and I´ll consider your comment on the DynamicComposite to my use cases, thanks again!
About my "buildRange()" method, in fact, it was originally implemented as below:

{code}
private CompositeRangeBuilder buildRange(CompositeColumnName filterColumnName) {

    CompositeRangeBuilder range = compositeSerializer.buildRange().withPrefix(filterColumnName.getPart1());

    if(filterColumnName.getPart2()!=null && filterColumnName.getPart2().length>0) {

        range = range.withPrefix(filterColumnName.getPart2());

        if(filterColumnName.getPart3()!=null && filterColumnName.getPart3().length>0) {

            range = range.withPrefix(filterColumnName.getPart3());

            if(filterColumnName.getTimestamp()!=null) {

                range = range.withPrefix(filterColumnName.getTimestamp());

            }
        }
    }

    return range;
}

{code}

The reason why I´ve changed all the "withPrefix(xxx)" calls with the "greaterThanEquals(xxx).lessThanEquals(xxx)" was because I was trying your original workaround suggestion since I was unable to make the all rows query to work with the "withPrefix()" method (unknowingly, I mislead you, sorry). With your explanation on the CompositeComparator now I realize why the "withPrefix()" method of your CompositeRangeBuilder class has the "nextComponent()" call that doesn't appear in the other methods of the same class.

But based on my original code above, is there anything I can do to make composite column queries to work?

Regards.

@cballock

Hi Landau,
The following code worked for me:

{code}
private CompositeRangeBuilder buildRange(CompositeColumnName filterColumnName) {

    // See https://github.com/Netflix/astyanax/issues/80
    CompositeRangeBuilder range = compositeSerializer.buildRange();

    if(filterColumnName.getPart1()!=null && filterColumnName.getPart1().length>0) {

        if(filterColumnName.getPart2()!=null && filterColumnName.getPart2().length>0) {
            range = range.withPrefix(filterColumnName.getPart1());
        } else {
            range = range.greaterThanEquals(filterColumnName.getPart1()).lessThanEquals(filterColumnName.getPart1());
        }

        if(filterColumnName.getPart2()!=null && filterColumnName.getPart2().length>0) {

            if(filterColumnName.getPart3()!=null && filterColumnName.getPart3().length>0) {
                range = range.withPrefix(filterColumnName.getPart2());
            } else {
                range = range.greaterThanEquals(filterColumnName.getPart2()).lessThanEquals(filterColumnName.getPart2());
            }

            if(filterColumnName.getPart3()!=null && filterColumnName.getPart3().length>0) {

                if(filterColumnName.getTimestamp()!=null) {
                    range = range.withPrefix(filterColumnName.getPart3());
                } else {
                    range = range.greaterThanEquals(filterColumnName.getPart3()).lessThanEquals(filterColumnName.getPart3());
                }

                if(filterColumnName.getTimestamp()!=null) {

                    range = range.withPrefix(filterColumnName.getTimestamp());

                }
            }
        }
    }

    return range;
}

{code}

@jimooo

Hi Guys,
I appreciate the discussion about the workarounds but can we get an estimate of getting the original problem fixed? I really need it. Thanks.

@jimooo

Are there any updates on this?

@jimooo

I'm testing this more now. I'm finding that I can't set the second prefix either. This seems like a major piece of advertised functionality that is broken. Is any one else having the same problem or am I doing something wrong?

This rangebuilder doesn't work either:
final CompositeRangeBuilder rangeBuilder = serializer.buildRange().withPrefix(theStr1).withPrefix(theStr2);

I really need to be able to set these prefixes. I can't set greaterThan or lesThan since I don't know what the minimum or maxinum string will be.

I would really appreciate help and/or workarounds.

@elandau
Owner

Yes, the API needs some work. For now try this as a workaround,

final CompositeRangeBuilder rangeBuilder = serializer.buildRange().withPrefix(theStr1).greaterThanEquals(theStr2).lessThanEquals(theStr2);

@mkmainali

Is there any progress in this issue?

I am facing a similar situation, where my composite column is made of "id:value", both string

and I am trying to query all the columns matching id. I use the following range for column range

compositeSerializer.buildRange().withPrefix("id")

but it does not return any columns. I tried the work around mentioned above

compositeSerializer.greaterThanEquals("id").lessThanEquals("id")

but this one causes InvalidRequestException(why:range finish must come after start in the order of traversal). I cannot add the second component because I do not know the value.

Is there any other work around for this? or Am I doing something wrong when building the column range. I can see the columns in the CF though, so I know the columns actually exist.

@elandau
Owner

I'm working on a new EntityManager based API that will make using composite columns a lot simpler. It's part of a larger change to one of the recipes and hopefully will be released in about 2 weeks. I'll provide more details and documentation as I get close to releasing.

@mkmainali

Thanks and looking for the new release.

In the mean time, I looked into the CompositeRangeBuilder source and found that inside withPrefix, when appending the start and end it is using EQUAL for the end too.

    append(start, object, Equality.EQUAL);
    append(end, object, Equality.EQUAL);

However, for composites the end should be added with Equality.GREATER_THAN_EQUALS. The reason for it can be found here http://www.datastax.com/dev/blog/introduction-to-composite-columns-part-1 . I made a custom built by chaning it to

    append(start, object, Equality.EQUAL);
    append(end, object, Equality.GREATER_THAN_EQUALS);

and it works in the above failure case. By the way, I am working on a 1.56.26 version.

I will do more tests and see that it does not break anything. I can create a pull request for the change, if required.

@msfrank

@mkmainali could you post your pull request? i am having the same issue and i'd like to move forward while waiting for @elandau's new API. i tried patching the same line you referenced above (CompositeRangeBuilder.java:39) and building a custom version of the library, but i am getting InvalidRequestException(why:range finish must come after start in the order of traversal). however, i branched off of the astyanax-1.56.37 tag, not 1.56.26.

@mkmainali

@msfrank
Here is my pull request. #339

@sagarl
Collaborator

Seems to be a non-issue. Please reopen if you are still facing any problem.

@sagarl sagarl closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.