Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
MINOR: Replace for within for each; replace if-elseif with match
In some Places for the loop was used but it can be replaced by the for each.
In One file if else if else was used so I replaced the same with match.

Author: Akash Sethi <akash.sethi@knoldus.in>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2435 from akashsethi24/trunk
  • Loading branch information
Akash Sethi authored and guozhangwang committed Jan 27, 2017
1 parent 662c2f8 commit 9213de8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 25 deletions.
Expand Up @@ -33,8 +33,8 @@ public ByteBufferReceive(String source, ByteBuffer... buffers) {
super();
this.source = source;
this.buffers = buffers;
for (int i = 0; i < buffers.length; i++)
remaining += buffers[i].remaining();
for (ByteBuffer buffer : buffers)
remaining += buffer.remaining();
}

@Override
Expand Down
31 changes: 16 additions & 15 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Expand Up @@ -306,22 +306,23 @@ object ConsumerGroupCommand extends Logging {
val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())

offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
// this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
// (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
try {
val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
offsetMap.put(topicAndPartition, offset)
} catch {
case z: ZkNoNodeException =>
printError(s"Could not fetch offset from zookeeper for group '$group' partition '$topicAndPartition' due to missing offset data in zookeeper.", Some(z))
}
offsetAndMetadata match {
case OffsetMetadataAndError.NoOffset =>
val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
// this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
// (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
try {
val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
offsetMap.put(topicAndPartition, offset)
} catch {
case z: ZkNoNodeException =>
printError(s"Could not fetch offset from zookeeper for group '$group' partition '$topicAndPartition' due to missing offset data in zookeeper.", Some(z))
}
case offsetAndMetaData if offsetAndMetaData.error == Errors.NONE.code =>
offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
case _ =>
printError(s"Could not fetch offset from kafka for group '$group' partition '$topicAndPartition' due to ${Errors.forCode(offsetAndMetadata.error).exception}.")
}
else if (offsetAndMetadata.error == Errors.NONE.code)
offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
else
printError(s"Could not fetch offset from kafka for group '$group' partition '$topicAndPartition' due to ${Errors.forCode(offsetAndMetadata.error).exception}.")
}
channel.disconnect()
offsetMap.toMap
Expand Down
Expand Up @@ -119,17 +119,17 @@ public void testJoin() throws Exception {

// push all four items to the primary stream. this should produce two items.

for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
for (int expectedKey : expectedKeys) {
driver.process(topic1, expectedKey, "XX" + expectedKey);
}
driver.flushState();

processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
checkJoinedValues(getter, kv(0, "XX0+Y0"), kv(1, "XX1+Y1"));

// push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
for (int expectedKey : expectedKeys) {
driver.process(topic2, expectedKey, "YY" + expectedKey);
}
driver.flushState();

Expand All @@ -138,8 +138,8 @@ public void testJoin() throws Exception {

// push all four items to the primary stream. this should produce four items.

for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
for (int expectedKey : expectedKeys) {
driver.process(topic1, expectedKey, "X" + expectedKey);
}
driver.flushState();

Expand Down Expand Up @@ -311,8 +311,8 @@ public void testSendingOldValues() throws Exception {

// push all four items to the primary stream. this should produce four items.

for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
for (int expectedKey : expectedKeys) {
driver.process(topic1, expectedKey, "X" + expectedKey);
}
driver.flushState();
proc.checkAndClearProcessResult("0:(X0+YY0<-XX0+YY0)", "1:(X1+YY1<-XX1+YY1)", "2:(X2+YY2<-XX2+YY2)", "3:(X3+YY3<-XX3+YY3)");
Expand Down

0 comments on commit 9213de8

Please sign in to comment.