Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: json creator for QueryStatusCount should match getter (MINOR) #5052

Merged
merged 1 commit into from Apr 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -110,7 +110,7 @@ private static Map<QueryId, RunningQuery> getLocalSimple(
ImmutableSet.of(q.getSinkName().text()),
ImmutableSet.of(q.getResultTopic().getKafkaTopicName()),
q.getQueryId(),
new QueryStatusCount(
QueryStatusCount.fromStreamsStateCounts(
Collections.singletonMap(KafkaStreams.State.valueOf(q.getState()), 1)))
));
}
Expand Down
Expand Up @@ -228,7 +228,7 @@ private static List<RunningQuery> getQueries(
ImmutableSet.of(q.getSinkName().text()),
ImmutableSet.of(q.getResultTopic().getKafkaTopicName()),
q.getQueryId(),
new QueryStatusCount(
QueryStatusCount.fromStreamsStateCounts(
Collections.singletonMap(
KafkaStreams.State.valueOf(q.getState()), 1)))).collect(Collectors.toList());
}
Expand Down
Expand Up @@ -167,7 +167,7 @@ public void shouldScatterGatherAndMergeShowQueries() {

final List<RunningQuery> remoteRunningQueries = Collections.singletonList(queryMetaDataToRunningQuery(
remoteMetadata,
new QueryStatusCount(Collections.singletonMap(ERROR_QUERY_STATE, 1))));
QueryStatusCount.fromStreamsStateCounts(Collections.singletonMap(ERROR_QUERY_STATE, 1))));
when(remoteQueries.getQueries()).thenReturn(remoteRunningQueries);
when(ksqlEntityList.get(anyInt())).thenReturn(remoteQueries);
when(response.getResponse()).thenReturn(ksqlEntityList);
Expand Down Expand Up @@ -200,7 +200,7 @@ public void shouldNotMergeDifferentRunningQueries() {

final List<RunningQuery> remoteRunningQueries = Collections.singletonList(queryMetaDataToRunningQuery(
remoteMetadata,
new QueryStatusCount(Collections.singletonMap(RUNNING_QUERY_STATE, 1))));
QueryStatusCount.fromStreamsStateCounts(Collections.singletonMap(RUNNING_QUERY_STATE, 1))));
when(remoteQueries.getQueries()).thenReturn(remoteRunningQueries);
when(ksqlEntityList.get(anyInt())).thenReturn(remoteQueries);
when(response.getResponse()).thenReturn(ksqlEntityList);
Expand Down
Expand Up @@ -253,7 +253,7 @@ public void shouldShowColumnsSource() {
).orElseThrow(IllegalStateException::new);

// Then:
final QueryStatusCount queryStatusCount = new QueryStatusCount(
final QueryStatusCount queryStatusCount = QueryStatusCount.fromStreamsStateCounts(
Collections.singletonMap(KafkaStreams.State.valueOf(metadata.getState()), 1));

assertThat(sourceDescription.getSourceDescription(),
Expand Down
Expand Up @@ -1990,7 +1990,7 @@ private List<RunningQuery> createRunningQueries(
ImmutableSet.of(md.getSinkName().toString(FormatOptions.noEscape())),
ImmutableSet.of(md.getResultTopic().getKafkaTopicName()),
md.getQueryId(),
new QueryStatusCount(
QueryStatusCount.fromStreamsStateCounts(
Collections.singletonMap(KafkaStreams.State.valueOf(md.getState()), 1)))
).collect(Collectors.toList());
}
Expand Down
Expand Up @@ -39,21 +39,24 @@ public class QueryStatusCount {
// Use a EnumMap so toString() will always return the same string
private final EnumMap<KsqlQueryStatus, Integer> statuses;

public QueryStatusCount() {
this.statuses = returnEnumMap();
}

@SuppressWarnings("unused") // Invoked by reflection
@JsonCreator
public QueryStatusCount(final Map<KafkaStreams.State, Integer> states) {
public static QueryStatusCount fromStreamsStateCounts(
final Map<KafkaStreams.State, Integer> states) {
final Map<KsqlQueryStatus, Integer> ksqlQueryStatus = states.entrySet().stream()
.collect(Collectors.toMap(
e -> KsqlConstants.fromStreamsState(e.getKey()),
Map.Entry::getValue));
this.statuses = states.isEmpty() ? returnEnumMap() : new EnumMap<>(ksqlQueryStatus);
return new QueryStatusCount(ksqlQueryStatus);
}

public QueryStatusCount() {
this(Collections.emptyMap());
}

@JsonCreator
public QueryStatusCount(final Map<KsqlQueryStatus, Integer> states) {
this.statuses = states.isEmpty() ? returnEnumMap() : new EnumMap<>(states);
}


public void updateStatusCount(final String state, final int change) {
updateStatusCount(KafkaStreams.State.valueOf(state), change);
}
Expand Down
Expand Up @@ -79,8 +79,8 @@ public void shouldToString() {

@Test
public void shouldImplementHashCodeAndEqualsCorrectly() {
final QueryStatusCount queryStatusCount1 = new QueryStatusCount(Collections.singletonMap(KafkaStreams.State.ERROR, 2));
final QueryStatusCount queryStatusCount2 = new QueryStatusCount(Collections.singletonMap(KafkaStreams.State.RUNNING, 1));
final QueryStatusCount queryStatusCount1 = new QueryStatusCount(Collections.singletonMap(KsqlQueryStatus.ERROR, 2));
final QueryStatusCount queryStatusCount2 = new QueryStatusCount(Collections.singletonMap(KsqlQueryStatus.RUNNING, 1));
queryStatusCount2.updateStatusCount(KafkaStreams.State.ERROR, 2);
final QueryStatusCount queryStatusCount3 = new QueryStatusCount();
queryStatusCount3.updateStatusCount(KafkaStreams.State.ERROR, 2);
Expand All @@ -106,14 +106,16 @@ public void shouldRoundTripWhenNotEmpty() {
// Given:
queryStatusCount.updateStatusCount(KafkaStreams.State.RUNNING, 2);
queryStatusCount.updateStatusCount(KafkaStreams.State.ERROR, 10);
queryStatusCount.updateStatusCount(KsqlQueryStatus.UNRESPONSIVE, 1);

// When:
final String json = assertDeserializedToSame(queryStatusCount);

// Then:
assertThat(json, is("{"
+ "\"RUNNING\":2,"
+ "\"ERROR\":10"
+ "\"ERROR\":10,"
+ "\"UNRESPONSIVE\":1"
+ "}"));
}

Expand Down