Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/storm-kafka-monitor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
<maxAllowedViolations>87</maxAllowedViolations>
<maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.json.simple.JSONAware;

/**
* Class representing the log head offsets, spout offsets and the lag for a topic
* Class representing the log head offsets, spout offsets and the lag for a topic.
*/
public class KafkaOffsetLagResult implements JSONAware {
private String topic;
Expand Down Expand Up @@ -60,26 +60,38 @@ public long getLag() {

@Override
public String toString() {
return "KafkaOffsetLagResult{" +
"topic='" + topic + '\'' +
", partition=" + partition +
", consumerCommittedOffset=" + consumerCommittedOffset +
", logHeadOffset=" + logHeadOffset +
", lag=" + lag +
'}';
return "KafkaOffsetLagResult{"
+ "topic='" + topic + '\''
+ ", partition=" + partition
+ ", consumerCommittedOffset=" + consumerCommittedOffset
+ ", logHeadOffset=" + logHeadOffset
+ ", lag=" + lag
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

KafkaOffsetLagResult that = (KafkaOffsetLagResult) o;

if (partition != that.partition) return false;
if (consumerCommittedOffset != that.consumerCommittedOffset) return false;
if (logHeadOffset != that.logHeadOffset) return false;
if (lag != that.lag) return false;
if (partition != that.partition) {
return false;
}
if (consumerCommittedOffset != that.consumerCommittedOffset) {
return false;
}
if (logHeadOffset != that.logHeadOffset) {
return false;
}
if (lag != that.lag) {
return false;
}
return !(topic != null ? !topic.equals(that.topic) : that.topic != null);

}
Expand All @@ -96,7 +108,11 @@ public int hashCode() {

@Override
public String toJSONString() {
return "{\"topic\":\"" + topic + "\",\"partition\":" + partition + ",\"consumerCommittedOffset\":" + consumerCommittedOffset + "," +
"\"logHeadOffset\":" + logHeadOffset + ",\"lag\":" + lag + "}";
return "{\"topic\":\"" + topic
+ "\",\"partition\":" + partition
+ ",\"consumerCommittedOffset\":" + consumerCommittedOffset
+ ",\"logHeadOffset\":" + logHeadOffset
+ ",\"lag\":" + lag
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.json.simple.JSONValue;

/**
* Utility class for querying offset lag for kafka spout
* Utility class for querying offset lag for kafka spout.
*/
public class KafkaOffsetLagUtil {
private static final String OPTION_TOPIC_SHORT = "t";
Expand All @@ -51,7 +51,7 @@ public class KafkaOffsetLagUtil {
private static final String OPTION_CONSUMER_CONFIG_SHORT = "c";
private static final String OPTION_CONSUMER_CONFIG_LONG = "consumer-config";

public static void main(String args[]) {
public static void main(String[] args) {
try {
Options options = buildOptions();
CommandLineParser parser = new DefaultParser();
Expand Down Expand Up @@ -106,20 +106,28 @@ private static void printUsageAndExit(Options options, String message) {

private static Options buildOptions() {
Options options = new Options();
options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true,
"REQUIRED Topics (comma separated list) for fetching log head and spout committed " +
"offset");
options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true,
"Comma separated list of bootstrap broker hosts for new " +
"consumer/spout e.g. hostname1:9092,hostname2:9092");
options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG,
true,
"REQUIRED Topics (comma separated list) for fetching log head and spout committed "
+ "offset");
options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG,
true,
"Comma separated list of bootstrap broker hosts for new "
+ "consumer/spout e.g. hostname1:9092,hostname2:9092");
options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer");
options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Properties file with additional " +
"Kafka consumer properties");
options.addOption(OPTION_SECURITY_PROTOCOL_SHORT,
OPTION_SECURITY_PROTOCOL_LONG,
true,
"Security protocol to connect to kafka");
options.addOption(OPTION_CONSUMER_CONFIG_SHORT,
OPTION_CONSUMER_CONFIG_LONG,
true,
"Properties file with additional Kafka consumer properties");
return options;
}

/**
* Get offset lags.
* @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
* @return log head offset, spout offset and lag for each partition
*/
Expand Down Expand Up @@ -168,9 +176,11 @@ public static List<KafkaOffsetLagResult> getOffsetLags(NewKafkaSpoutOffsetQuery
}

private static Collection<TopicPartition> toArrayList(final TopicPartition tp) {
return new ArrayList<TopicPartition>(1) {{
add(tp);
}};
return new ArrayList<TopicPartition>(1) {
{
add(tp);
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,21 @@ public long getLag() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof KafkaPartitionOffsetLag)) return false;
if (this == o) {
return true;
}
if (!(o instanceof KafkaPartitionOffsetLag)) {
return false;
}

KafkaPartitionOffsetLag that = (KafkaPartitionOffsetLag) o;

if (getConsumerCommittedOffset() != that.getConsumerCommittedOffset()) return false;
if (getLogHeadOffset() != that.getLogHeadOffset()) return false;
if (getConsumerCommittedOffset() != that.getConsumerCommittedOffset()) {
return false;
}
if (getLogHeadOffset() != that.getLogHeadOffset()) {
return false;
}
return getLag() == that.getLag();

}
Expand All @@ -65,8 +73,8 @@ public int hashCode() {
@Override
public String toString() {
// JSONAware not working for nested element on Map so write JSON format from here
return "{\"consumerCommittedOffset\": " + consumerCommittedOffset + ", " +
"\"logHeadOffset\": " + logHeadOffset + ", " +
"\"lag\": " + lag + "}";
return "{\"consumerCommittedOffset\": " + consumerCommittedOffset + ", "
+ "\"logHeadOffset\": " + logHeadOffset + ", "
+ "\"lag\": " + lag + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.storm.kafka.monitor;

/**
* Class representing information for querying kafka for log head offsets, consumer offsets and the difference for new kafka spout using new
* consumer api
* Class representing information for querying kafka for log head offsets, consumer offsets and the difference for new
* kafka spout using new consumer api.
*/
public class NewKafkaSpoutOffsetQuery {
private final String topics; // comma separated list of topics
Expand Down Expand Up @@ -60,13 +60,13 @@ public String getConsumerPropertiesFileName() {

@Override
public String toString() {
return "NewKafkaSpoutOffsetQuery{" +
"topics='" + topics + '\'' +
", consumerGroupId='" + consumerGroupId + '\'' +
", bootStrapBrokers='" + bootStrapBrokers + '\'' +
", securityProtocol='" + securityProtocol + '\'' +
", consumerPropertiesFileName='" + consumerPropertiesFileName + '\'' +
'}';
return "NewKafkaSpoutOffsetQuery{"
+ "topics='" + topics + '\''
+ ", consumerGroupId='" + consumerGroupId + '\''
+ ", bootStrapBrokers='" + bootStrapBrokers + '\''
+ ", securityProtocol='" + securityProtocol + '\''
+ ", consumerPropertiesFileName='" + consumerPropertiesFileName + '\''
+ '}';
}

@Override
Expand Down