Skip to content

Commit

Permalink
fix: json creator for QueryStatusCount should match getter (MINOR) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Apr 11, 2020
1 parent 0e0c283 commit 988c3d1
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 18 deletions.
Expand Up @@ -110,7 +110,7 @@ private static Map<QueryId, RunningQuery> getLocalSimple(
ImmutableSet.of(q.getSinkName().text()),
ImmutableSet.of(q.getResultTopic().getKafkaTopicName()),
q.getQueryId(),
new QueryStatusCount(
QueryStatusCount.fromStreamsStateCounts(
Collections.singletonMap(KafkaStreams.State.valueOf(q.getState()), 1)))
));
}
Expand Down
Expand Up @@ -228,7 +228,7 @@ private static List<RunningQuery> getQueries(
ImmutableSet.of(q.getSinkName().text()),
ImmutableSet.of(q.getResultTopic().getKafkaTopicName()),
q.getQueryId(),
new QueryStatusCount(
QueryStatusCount.fromStreamsStateCounts(
Collections.singletonMap(
KafkaStreams.State.valueOf(q.getState()), 1)))).collect(Collectors.toList());
}
Expand Down
Expand Up @@ -167,7 +167,7 @@ public void shouldScatterGatherAndMergeShowQueries() {

final List<RunningQuery> remoteRunningQueries = Collections.singletonList(queryMetaDataToRunningQuery(
remoteMetadata,
new QueryStatusCount(Collections.singletonMap(ERROR_QUERY_STATE, 1))));
QueryStatusCount.fromStreamsStateCounts(Collections.singletonMap(ERROR_QUERY_STATE, 1))));
when(remoteQueries.getQueries()).thenReturn(remoteRunningQueries);
when(ksqlEntityList.get(anyInt())).thenReturn(remoteQueries);
when(response.getResponse()).thenReturn(ksqlEntityList);
Expand Down Expand Up @@ -200,7 +200,7 @@ public void shouldNotMergeDifferentRunningQueries() {

final List<RunningQuery> remoteRunningQueries = Collections.singletonList(queryMetaDataToRunningQuery(
remoteMetadata,
new QueryStatusCount(Collections.singletonMap(RUNNING_QUERY_STATE, 1))));
QueryStatusCount.fromStreamsStateCounts(Collections.singletonMap(RUNNING_QUERY_STATE, 1))));
when(remoteQueries.getQueries()).thenReturn(remoteRunningQueries);
when(ksqlEntityList.get(anyInt())).thenReturn(remoteQueries);
when(response.getResponse()).thenReturn(ksqlEntityList);
Expand Down
Expand Up @@ -253,7 +253,7 @@ public void shouldShowColumnsSource() {
).orElseThrow(IllegalStateException::new);

// Then:
final QueryStatusCount queryStatusCount = new QueryStatusCount(
final QueryStatusCount queryStatusCount = QueryStatusCount.fromStreamsStateCounts(
Collections.singletonMap(KafkaStreams.State.valueOf(metadata.getState()), 1));

assertThat(sourceDescription.getSourceDescription(),
Expand Down
Expand Up @@ -1990,7 +1990,7 @@ private List<RunningQuery> createRunningQueries(
ImmutableSet.of(md.getSinkName().toString(FormatOptions.noEscape())),
ImmutableSet.of(md.getResultTopic().getKafkaTopicName()),
md.getQueryId(),
new QueryStatusCount(
QueryStatusCount.fromStreamsStateCounts(
Collections.singletonMap(KafkaStreams.State.valueOf(md.getState()), 1)))
).collect(Collectors.toList());
}
Expand Down
Expand Up @@ -39,21 +39,24 @@ public class QueryStatusCount {
// Use a EnumMap so toString() will always return the same string
private final EnumMap<KsqlQueryStatus, Integer> statuses;

public QueryStatusCount() {
this.statuses = returnEnumMap();
}

@SuppressWarnings("unused") // Invoked by reflection
@JsonCreator
public QueryStatusCount(final Map<KafkaStreams.State, Integer> states) {
public static QueryStatusCount fromStreamsStateCounts(
final Map<KafkaStreams.State, Integer> states) {
final Map<KsqlQueryStatus, Integer> ksqlQueryStatus = states.entrySet().stream()
.collect(Collectors.toMap(
e -> KsqlConstants.fromStreamsState(e.getKey()),
Map.Entry::getValue));
this.statuses = states.isEmpty() ? returnEnumMap() : new EnumMap<>(ksqlQueryStatus);
return new QueryStatusCount(ksqlQueryStatus);
}

public QueryStatusCount() {
this(Collections.emptyMap());
}

@JsonCreator
public QueryStatusCount(final Map<KsqlQueryStatus, Integer> states) {
this.statuses = states.isEmpty() ? returnEnumMap() : new EnumMap<>(states);
}


public void updateStatusCount(final String state, final int change) {
updateStatusCount(KafkaStreams.State.valueOf(state), change);
}
Expand Down
Expand Up @@ -79,8 +79,8 @@ public void shouldToString() {

@Test
public void shouldImplementHashCodeAndEqualsCorrectly() {
final QueryStatusCount queryStatusCount1 = new QueryStatusCount(Collections.singletonMap(KafkaStreams.State.ERROR, 2));
final QueryStatusCount queryStatusCount2 = new QueryStatusCount(Collections.singletonMap(KafkaStreams.State.RUNNING, 1));
final QueryStatusCount queryStatusCount1 = new QueryStatusCount(Collections.singletonMap(KsqlQueryStatus.ERROR, 2));
final QueryStatusCount queryStatusCount2 = new QueryStatusCount(Collections.singletonMap(KsqlQueryStatus.RUNNING, 1));
queryStatusCount2.updateStatusCount(KafkaStreams.State.ERROR, 2);
final QueryStatusCount queryStatusCount3 = new QueryStatusCount();
queryStatusCount3.updateStatusCount(KafkaStreams.State.ERROR, 2);
Expand All @@ -106,14 +106,16 @@ public void shouldRoundTripWhenNotEmpty() {
// Given:
queryStatusCount.updateStatusCount(KafkaStreams.State.RUNNING, 2);
queryStatusCount.updateStatusCount(KafkaStreams.State.ERROR, 10);
queryStatusCount.updateStatusCount(KsqlQueryStatus.UNRESPONSIVE, 1);

// When:
final String json = assertDeserializedToSame(queryStatusCount);

// Then:
assertThat(json, is("{"
+ "\"RUNNING\":2,"
+ "\"ERROR\":10"
+ "\"ERROR\":10,"
+ "\"UNRESPONSIVE\":1"
+ "}"));
}

Expand Down

0 comments on commit 988c3d1

Please sign in to comment.