Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Cleaning up vertical whitespace
Browse files Browse the repository at this point in the history
  • Loading branch information
nickwallen committed Jun 8, 2018
1 parent 9e99bd5 commit 5f18927
Showing 1 changed file with 0 additions and 8 deletions.
Expand Up @@ -546,7 +546,6 @@ public static class KafkaFind implements StellarFunction {

@Override
public Object apply(List<Object> args, Context context) throws ParseException {

// required - name of the topic to retrieve messages from
String topic = getArg("topic", 0, String.class, args);

Expand Down Expand Up @@ -581,7 +580,6 @@ public Object apply(List<Object> args, Context context) throws ParseException {
* @return A list of messages that satisfy the filter expression.
*/
private List<Object> findMessages(String topic, LambdaExpression filter, int count, Properties properties) {

final int pollTimeout = getPollTimeout(properties);
final int maxWait = getMaxWait(properties);

Expand All @@ -603,7 +601,6 @@ private List<Object> findMessages(String topic, LambdaExpression filter, int cou

// only keep the message if the filter expression is satisfied
if(isSatisfied(filter, record.value())) {

messages.add(record.value());

// do we have enough messages already?
Expand Down Expand Up @@ -637,7 +634,6 @@ public boolean isSatisfied(LambdaExpression expr, String message) {
boolean result = false;
Map<String, Object> messageAsMap;
try {

// transform the message to a map of fields
messageAsMap = JSONUtils.INSTANCE.load(message, JSONUtils.MAP_SUPPLIER);

Expand Down Expand Up @@ -678,7 +674,6 @@ public boolean isInitialized() {
* @return A set of topic-partitions that were manually assigned to the consumer.
*/
private static Set<TopicPartition> manualPartitionAssignment(String topic, KafkaConsumer<String, String> consumer) {

// find all partitions for the topic
Set<TopicPartition> partitions = new HashSet<>();
for(PartitionInfo partition : consumer.partitionsFor(topic)) {
Expand Down Expand Up @@ -706,7 +701,6 @@ private static Set<TopicPartition> manualPartitionAssignment(String topic, Kafka
* @param context The Stellar context.
*/
private static Properties buildKafkaProperties(Map<String, String> overrides, Context context) {

// start with minimal set of default properties
Properties properties = new Properties();
properties.putAll(defaultProperties);
Expand Down Expand Up @@ -766,7 +760,6 @@ private static int getPollTimeout(Properties properties) {
* via the global properties.
*/
private static Properties defaultKafkaProperties() {

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "kafka-functions-stellar");
Expand Down Expand Up @@ -812,7 +805,6 @@ private static Properties defaultKafkaProperties() {
* @param <T> The type of the argument expected.
*/
public static <T> T getArg(String argName, int index, Class<T> clazz, List<Object> args) {

if(index >= args.size()) {
throw new IllegalArgumentException(format("missing '%s'; expected at least %d argument(s), found %d",
argName, index+1, args.size()));
Expand Down

0 comments on commit 5f18927

Please sign in to comment.