Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public final class KuduConstants {
public static final String CAMEL_KUDU_SCHEMA = "CamelKuduSchema";
@Metadata(description = "The create table options", javaType = "org.apache.kudu.client.CreateTableOptions")
public static final String CAMEL_KUDU_TABLE_OPTIONS = "CamelKuduTableOptions";
@Metadata(description = "The predicate for scan operation", javaType = "org.apache.kudu.client.KuduPredicate")
public static final String CAMEL_KUDU_SCAN_PREDICATE = "CamelKuduScanPredicate";

private KuduConstants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.Update;
Expand Down Expand Up @@ -175,6 +176,7 @@ private void doCreateTable(Exchange exchange, String tableName) throws KuduExcep
}

private void doScan(Exchange exchange, String tableName) throws KuduException {
exchange.getIn().setBody(KuduUtils.doScan(tableName, endpoint.getKuduClient()));
KuduPredicate predicate = (KuduPredicate) exchange.getIn().getHeader(KuduConstants.CAMEL_KUDU_SCAN_PREDICATE);
exchange.getIn().setBody(KuduUtils.doScan(tableName, endpoint.getKuduClient(), predicate));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduScannerIterator;
import org.apache.kudu.client.KuduTable;
Expand Down Expand Up @@ -57,6 +58,11 @@ public static List<Map<String, Object>> scannerToList(KuduTable table, KuduScann
}

public static List<Map<String, Object>> doScan(String tableName, KuduClient connection) throws KuduException {
return doScan(tableName, connection, null);
}

public static List<Map<String, Object>> doScan(String tableName, KuduClient connection, KuduPredicate predicate)
throws KuduException {
LOG.trace("Scanning table {}", tableName);
KuduTable table = connection.openTable(tableName);

Expand All @@ -66,9 +72,11 @@ public static List<Map<String, Object>> doScan(String tableName, KuduClient conn
projectColumns.add(columnSchema.getName());
}

KuduScanner scanner = connection.newScannerBuilder(table)
.setProjectedColumnNames(projectColumns)
.build();
KuduScanner.KuduScannerBuilder builder = connection.newScannerBuilder(table);
if (predicate != null) {
builder.addPredicate(predicate);
}
KuduScanner scanner = builder.setProjectedColumnNames(projectColumns).build();
return KuduUtils.scannerToList(table, scanner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
*/
package org.apache.camel.component.kudu;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduPredicate;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -107,6 +111,32 @@ public void scan() throws InterruptedException {

}

@Test
public void scanWithPredicate() throws InterruptedException {
errorEndpoint.expectedMessageCount(0);
successEndpoint.expectedMessageCount(2);

// without predicate
Map<String, Object> headers = new HashMap<>();
headers.put(KuduConstants.CAMEL_KUDU_SCAN_PREDICATE, null);
sendBody("direct:scan", null, headers);
List<Map<String, Object>> results = (List<Map<String, Object>>) successEndpoint.getReceivedExchanges()
.get(0).getIn().getBody(List.class);
assertEquals(2, results.size(), "two records with id=1 and id=2 are expected to be returned");

// with predicate
ColumnSchema schema = new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).build();
KuduPredicate predicate = KuduPredicate.newComparisonPredicate(schema, KuduPredicate.ComparisonOp.EQUAL, 2);
headers.put(KuduConstants.CAMEL_KUDU_SCAN_PREDICATE, predicate);
sendBody("direct:scan", null, headers);
results = (List<Map<String, Object>>) successEndpoint.getReceivedExchanges()
.get(1).getIn().getBody(List.class);
assertEquals(1, results.size(), "only one record with id=2 is expected to be returned");

errorEndpoint.assertIsSatisfied();
successEndpoint.assertIsSatisfied();
}

@Test
public void scanTable() throws InterruptedException {
createTestTable("TestTable");
Expand Down