Skip to content

Commit

Permalink
reduce max batch size and revert other changes based on PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Aug 31, 2015
1 parent e123390 commit 51d8cc3
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.hawkular.metrics.core.impl.transformers;

import static com.datastax.driver.core.BatchStatement.Type.UNLOGGED;
import static com.google.common.base.Preconditions.checkArgument;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Statement;
Expand All @@ -31,8 +32,7 @@
* @author Thomas Segismont
*/
public class BatchStatementTransformer implements Transformer<Statement, BatchStatement> {
// Max batch size is 0xFFFF (greatest unsigned short)
// public static final int MAX_BATCH_SIZE = 0xFFFF;

public static final int MAX_BATCH_SIZE = 1024;

/**
Expand All @@ -51,20 +51,19 @@ public BatchStatementTransformer() {
}

/**
* @param batchStatementFactory function used to initiliaze a new {@link BatchStatement}
* @param batchStatementFactory function used to initialize a new {@link BatchStatement}
* @param batchSize maximum number of statements in the batch
*/
public BatchStatementTransformer(Func0<BatchStatement> batchStatementFactory, int batchSize) {
this.batchSize = batchSize;
// checkArgument(batchSize <= MAX_BATCH_SIZE, "batchSize exceeds limit");
checkArgument(batchSize <= MAX_BATCH_SIZE, "batchSize exceeds limit");
this.batchStatementFactory = batchStatementFactory;
}

@Override
public Observable<BatchStatement> call(Observable<Statement> statements) {
// return statements
// .window(batchSize)
// .flatMap(window -> window.collect(batchStatementFactory, BatchStatement::add));
return statements.collect(batchStatementFactory, BatchStatement::add);
return statements
.window(batchSize)
.flatMap(window -> window.collect(batchStatementFactory, BatchStatement::add));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,16 @@ public void setUp() throws Exception {
@Test
public void testCall() throws Exception {
int expected = 6;
int numStatements = (expected - 1) * batchSize + 1;
// Emit enough statements to get expected count of batches, with the last batch holding just one
List<BatchStatement> result = Observable.range(0, numStatements)
List<BatchStatement> result = Observable.range(0, (expected - 1) * batchSize + 1)
.map(i -> mock(Statement.class))
.compose(batchStatementTransformer)
.toList()
.toBlocking()
.single();
assertEquals(1, result.size());
assertEquals(numStatements, result.get(0).size());
assertEquals(expected, result.size());
for (int i = 0; i < result.size(); i++) {
assertEquals(i < (result.size() - 1) ? batchSize : 1, result.get(i).size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class GaugeMetricStatisticsITest extends RESTTest {
String metric = "test"
int nbOfBuckets = 10
long bucketSize = Duration.standardDays(1).millis
int interval = Duration.standardMinutes(10).millis
int interval = Duration.standardMinutes(1).millis
int sampleSize = (bucketSize / interval) - 1

def start = new DateTimeService().currentHour().minus(3 * nbOfBuckets * bucketSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.junit.BeforeClass

import java.util.concurrent.atomic.AtomicInteger

import static org.hawkular.metrics.core.impl.transformers.BatchStatementTransformer.MAX_BATCH_SIZE
import static org.junit.Assert.assertEquals

class RESTTest {
Expand All @@ -31,7 +32,7 @@ class RESTTest {
static final double DELTA = 0.001
static final String TENANT_PREFIX = UUID.randomUUID().toString()
static final AtomicInteger TENANT_ID_COUNTER = new AtomicInteger(0)
static final int LARGE_PAYLOAD_SIZE = 1024
static final int LARGE_PAYLOAD_SIZE = MAX_BATCH_SIZE
static String tenantHeaderName = "Hawkular-Tenant";
static RESTClient hawkularMetrics
static defaultFailureHandler
Expand Down

0 comments on commit 51d8cc3

Please sign in to comment.