Skip to content

Commit

Permalink
fix: support trailing slashes in listener URLs
Browse files Browse the repository at this point in the history
Co-authored-by: Steven Zhang <stevenz@confluent.io>
  • Loading branch information
agavra and stevenpyzhang committed Apr 16, 2020
1 parent 2b7e79f commit f4f78e0
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 8 deletions.
2 changes: 1 addition & 1 deletion config/ksql-server.properties
Expand Up @@ -19,7 +19,7 @@
# The URL the KSQL server will listen on:
# The default is any IPv4 interface on the machine.
# NOTE: If set to wildcard or loopback set 'advertised.listener' to enable pull queries across machines
listeners=http://0.0.0.0:8088
listeners=http://0.0.0.0:8088/

# Use the 'listeners' line below for any IPv6 interface on the machine.
# listeners=http://[::]:8088
Expand Down
Expand Up @@ -272,14 +272,20 @@ private URL getInterNodeListenerFromExplicitConfig(final Logger logger) {
}
});

final URL sanitized = sanitizeInterNodeListener(
listener,
foo -> listener.getPort(),
false
);

logInterNodeListener(
logger,
listener,
sanitized,
address,
"'" + ADVERTISED_LISTENER_CONFIG + "'"
);

return listener;
return sanitized;
}

private static void logInterNodeListener(
Expand Down Expand Up @@ -329,10 +335,6 @@ private static URL sanitizeInterNodeListener(
final Function<URL, Integer> portResolver,
final boolean replaceHost
) {
if (!replaceHost && listener.getPort() > 0) {
return listener;
}

final String host = replaceHost
? getLocalHostName()
: listener.getHost();
Expand All @@ -342,7 +344,7 @@ private static URL sanitizeInterNodeListener(
: listener.getPort();

try {
return new URL(listener.getProtocol(), host, port, listener.getFile());
return new URL(listener.getProtocol(), host, port, "");
} catch (final MalformedURLException e) {
throw new KsqlServerException("Resolved first listener to malformed URL", e);
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -249,6 +250,27 @@ public void shouldUseExplicitInterNodeListenerIfSetToIpv6Loopback() {
verifyNoMoreInteractions(logger);
}

@Test
public void shouldSanitizeInterNodeListenerWithTrailingSlash() {
// Given:
final URL expected = url("https://example.com:12345");
final URL configured = url("https://example.com:12345/");

final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.<String, Object>builder()
.putAll(MIN_VALID_CONFIGS)
.put(ADVERTISED_LISTENER_CONFIG, configured.toString())
.build()
);

// When:
final URL actual = config.getInterNodeListener(portResolver, logger);

// Then:
assertThat(actual, is(expected));
verifyLogsInterNodeListener(expected, QUOTED_INTER_NODE_LISTENER_CONFIG);
verifyNoMoreInteractions(logger);
}

@Test
public void shouldThrowIfExplicitInterNodeListenerHasAutoPortAssignment() {
// Given:
Expand Down Expand Up @@ -451,6 +473,30 @@ public void shouldResolveInterNodeListenerToFirstListenerWithAutoPortAssignment(
verifyNoMoreInteractions(logger);
}

@Test
public void shouldResolveInterNodeListenerToFirstListenerWithAutoPortAssignmentAndTrailingSlash() {
// Given:
final URL autoPort = url("https://example.com:0/");

when(portResolver.apply(any())).thenReturn(2222);

final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.<String, Object>builder()
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
.put(LISTENERS_CONFIG, autoPort.toString() + ",http://localhost:2589/")
.build()
);

// When:
final URL actual = config.getInterNodeListener(portResolver, logger);

// Then:
final URL expected = url("https://example.com:2222");

assertThat(actual, is(expected));
verifyLogsInterNodeListener(expected, QUOTED_FIRST_LISTENER_CONFIG);
verifyNoMoreInteractions(logger);
}

@Test
public void shouldResolveInterNodeListenerToFirstListenerWithIpv4WildcardAddress() {
// Given:
Expand Down

0 comments on commit f4f78e0

Please sign in to comment.