Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring version checker improvements to 5.1.x #2240

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion ksql-cli/src/main/java/io/confluent/ksql/Ksql.java
Expand Up @@ -55,7 +55,7 @@ public static void main(final String[] args) throws IOException {
creds -> restClient.setupAuthenticationCredentials(creds.left, creds.right)
);

final KsqlVersionCheckerAgent versionChecker = new KsqlVersionCheckerAgent();
final KsqlVersionCheckerAgent versionChecker = new KsqlVersionCheckerAgent(() -> false);
versionChecker.start(KsqlModuleType.CLI, properties);

try (Cli cli = new Cli(options.getStreamedQueryRowLimit(),
Expand Down
Expand Up @@ -76,6 +76,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.websocket.DeploymentException;
import javax.websocket.server.ServerEndpoint;
Expand Down Expand Up @@ -262,7 +264,8 @@ public <T> T getEndpointInstance(final Class<T> endpointClass) {
JsonMapper.INSTANCE.mapper,
statementParser,
ksqlEngine,
exec
exec,
versionCheckerAgent::updateLastRequestTime
);
}

Expand All @@ -276,9 +279,8 @@ public <T> T getEndpointInstance(final Class<T> endpointClass) {

public static KsqlRestApplication buildApplication(
final KsqlRestConfig restConfig,
final VersionCheckerAgent versionCheckerAgent
)
throws Exception {
final Function<Supplier<Boolean>, VersionCheckerAgent> versionCheckerFactory
) {

final String ksqlInstallDir = restConfig.getString(KsqlRestConfig.INSTALL_DIR_CONFIG);

Expand Down Expand Up @@ -361,18 +363,22 @@ public static KsqlRestApplication buildApplication(
final RootDocument rootDocument = new RootDocument();

final StatusResource statusResource = new StatusResource(statementExecutor);
final VersionCheckerAgent versionChecker = versionCheckerFactory
.apply(() -> !ksqlEngine.getLivePersistentQueries().isEmpty());
final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlConfig,
ksqlEngine,
statementParser,
Duration.ofMillis(
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG))
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)),
versionChecker::updateLastRequestTime
);
final KsqlResource ksqlResource = new KsqlResource(
ksqlConfig,
ksqlEngine,
commandStore,
restConfig.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)
restConfig.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG),
versionChecker::updateLastRequestTime
);

commandRunner.processPriorCommands();
Expand All @@ -386,7 +392,7 @@ public static KsqlRestApplication buildApplication(
statusResource,
streamedQueryResource,
ksqlResource,
versionCheckerAgent
versionChecker
);
}

Expand Down
Expand Up @@ -81,7 +81,7 @@ private static Executable createExecutable(
final KsqlRestConfig restConfig = new KsqlRestConfig(properties);
return KsqlRestApplication.buildApplication(
restConfig,
new KsqlVersionCheckerAgent()
KsqlVersionCheckerAgent::new
);
}
}
Expand Up @@ -37,6 +37,9 @@
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.Version;
import io.confluent.ksql.util.WelcomeMsgUtils;
import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent;
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import java.io.Console;
import java.io.IOException;
import java.io.OutputStreamWriter;
Expand Down Expand Up @@ -65,17 +68,21 @@ public class StandaloneExecutor implements Executable {
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final Map<String, Object> configProperties = new HashMap<>();
private final boolean failOnNoQueries;
private final VersionCheckerAgent versionCheckerAgent;

StandaloneExecutor(final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final String queriesFile,
final UdfLoader udfLoader,
final boolean failOnNoQueries) {
final boolean failOnNoQueries,
final VersionCheckerAgent versionCheckerAgent) {
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig can't be null");
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine can't be null");
this.queriesFile = Objects.requireNonNull(queriesFile, "queriesFile can't be null");
this.udfLoader = Objects.requireNonNull(udfLoader, "udfLoader can't be null");
this.failOnNoQueries = failOnNoQueries;
this.versionCheckerAgent =
Objects.requireNonNull(versionCheckerAgent, "VersionCheckerAgenr cannot be null.");
}

private interface Handler<T extends Statement> {
Expand Down Expand Up @@ -185,6 +192,9 @@ public void start() {
udfLoader.load();
executeStatements(readQueriesFile(queriesFile));
showWelcomeMessage();
final Properties properties = new Properties();
properties.putAll(configProperties);
versionCheckerAgent.start(KsqlModuleType.SERVER, properties);
} catch (final Exception e) {
log.error("Failed to start KSQL Server with query file: " + queriesFile, e);
stop();
Expand All @@ -206,15 +216,22 @@ public void join() throws InterruptedException {
shutdownLatch.await();
}

public static StandaloneExecutor create(final Properties properties,
final String queriesFile,
final String installDir) {
public static StandaloneExecutor create(
final Properties properties,
final String queriesFile,
final String installDir) {
final KsqlConfig ksqlConfig = new KsqlConfig(properties);
final KsqlEngine ksqlEngine = KsqlEngine.create(ksqlConfig);
final UdfLoader udfLoader = UdfLoader.newInstance(ksqlConfig,
ksqlEngine.getMetaStore(),
installDir);
return new StandaloneExecutor(ksqlConfig, ksqlEngine, queriesFile, udfLoader, true);
return new StandaloneExecutor(
ksqlConfig,
ksqlEngine,
queriesFile,
udfLoader,
true,
new KsqlVersionCheckerAgent(() -> !ksqlEngine.getLivePersistentQueries().isEmpty()));
}

private void showWelcomeMessage() {
Expand Down
Expand Up @@ -108,6 +108,7 @@
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.StatementWithSchema;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -140,25 +141,30 @@ public class KsqlResource {
private final KsqlEngine ksqlEngine;
private final ReplayableCommandQueue replayableCommandQueue;
private final long distributedCommandResponseTimeout;
private final ActivenessRegistrar activenessRegistrar;

public KsqlResource(
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final ReplayableCommandQueue replayableCommandQueue,
final long distributedCommandResponseTimeout
final long distributedCommandResponseTimeout,
final ActivenessRegistrar activenessRegistrar
) {
this.ksqlConfig = ksqlConfig;
this.ksqlEngine = ksqlEngine;
this.replayableCommandQueue = replayableCommandQueue;
this.distributedCommandResponseTimeout = distributedCommandResponseTimeout;
this.registerKsqlStatementTasks();
this.activenessRegistrar =
Objects.requireNonNull(activenessRegistrar, "activenessRegistrar cannot be null.");
}

@POST
public Response handleKsqlStatements(final KsqlRequest request) {
final List<PreparedStatement> parsedStatements;
final KsqlEntityList result = new KsqlEntityList();

activenessRegistrar.updateLastRequestTime();
try {
parsedStatements = ksqlEngine.parseStatements(request.getKsql());
} catch (final ParseFailedException e) {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.rest.util.JsonMapper;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.time.Duration;
import java.util.Objects;
import javax.ws.rs.Consumes;
Expand All @@ -52,19 +53,23 @@ public class StreamedQueryResource {
private final StatementParser statementParser;
private final Duration disconnectCheckInterval;
private final ObjectMapper objectMapper;
private final ActivenessRegistrar activenessRegistrar;

public StreamedQueryResource(
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final StatementParser statementParser,
final Duration disconnectCheckInterval
final Duration disconnectCheckInterval,
final ActivenessRegistrar activenessRegistrar
) {
this.ksqlConfig = ksqlConfig;
this.ksqlEngine = ksqlEngine;
this.statementParser = statementParser;
this.disconnectCheckInterval =
Objects.requireNonNull(disconnectCheckInterval, "disconnectCheckInterval");
this.objectMapper = JsonMapper.INSTANCE.mapper;
this.activenessRegistrar =
Objects.requireNonNull(activenessRegistrar, "activenessRegistrar");
}

@POST
Expand All @@ -74,7 +79,7 @@ public Response streamQuery(final KsqlRequest request) throws Exception {
if (ksql.isEmpty()) {
return Errors.badRequest("\"ksql\" field must be populated");
}

activenessRegistrar.updateLastRequestTime();
try {
statement = statementParser.parseSingleStatement(ksql);
} catch (IllegalArgumentException | KsqlException e) {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.rest.entity.Versions;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand All @@ -55,6 +56,7 @@ public class WSQueryEndpoint {
private final StatementParser statementParser;
private final KsqlEngine ksqlEngine;
private final ListeningScheduledExecutorService exec;
private final ActivenessRegistrar activenessRegistrar;

private WebSocketSubscriber subscriber;

Expand All @@ -63,19 +65,23 @@ public WSQueryEndpoint(
final ObjectMapper mapper,
final StatementParser statementParser,
final KsqlEngine ksqlEngine,
final ListeningScheduledExecutorService exec
final ListeningScheduledExecutorService exec,
final ActivenessRegistrar activenessRegistrar
) {
this.ksqlConfig = ksqlConfig;
this.mapper = mapper;
this.statementParser = statementParser;
this.ksqlEngine = ksqlEngine;
this.exec = exec;
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig");
this.mapper = Objects.requireNonNull(mapper, "mapper");
this.statementParser = Objects.requireNonNull(statementParser, "statementParser");
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.exec = Objects.requireNonNull(exec, "exec");
this.activenessRegistrar =
Objects.requireNonNull(activenessRegistrar, "activenessRegistrar");
}

@OnOpen
public void onOpen(final Session session, final EndpointConfig endpointConfig) {
log.debug("Opening websocket session {}", session.getId());
final Map<String, List<String>> parameters = session.getRequestParameterMap();
activenessRegistrar.updateLastRequestTime();

final List<String> versionParam = parameters.getOrDefault(
Versions.KSQL_V1_WS_PARAM, Arrays.asList(Versions.KSQL_V1_WS));
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;

import org.easymock.EasyMock;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -33,7 +34,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
Expand All @@ -54,7 +54,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PageViewDataProvider;
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import io.confluent.rest.RestConfig;
import io.confluent.rest.validation.JacksonMessageBodyProvider;

Expand All @@ -74,7 +73,6 @@ public class RestApiTest {

private static String serverAddress;


private Client restClient;

@BeforeClass
Expand All @@ -83,6 +81,7 @@ public static void setUpClass() throws Exception {
config.put(KsqlRestConfig.INSTALL_DIR_CONFIG, TestUtils.tempDirectory().getPath());
config.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "rest_api_test_service");


testHarness.start(config);

startRestServer(testHarness.allConfigs());
Expand Down Expand Up @@ -140,13 +139,6 @@ public static void cleanUpClass() throws Exception {
testHarness.stop();
}

private static class DummyVersionCheckerAgent implements VersionCheckerAgent {
@Override
public void start(final KsqlModuleType moduleType, final Properties ksqlProperties) {
// do nothing;
}
}

private static Client buildClient() {
final ObjectMapper objectMapper = JsonMapper.INSTANCE.mapper;
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Expand Down Expand Up @@ -175,8 +167,10 @@ private static void startRestServer(Map<String, Object> configs) throws Exceptio
final int port = randomFreeLocalPort();
serverAddress = "http://localhost:" + port;
configs.put(RestConfig.LISTENERS_CONFIG, serverAddress);
restApplication = KsqlRestApplication.buildApplication(new KsqlRestConfig(configs),
new DummyVersionCheckerAgent());
restApplication = KsqlRestApplication.buildApplication(
new KsqlRestConfig(configs),
(booleanSupplier) -> EasyMock.niceMock(VersionCheckerAgent.class)
);
restApplication.start();
return;
} catch (BindException e) {
Expand Down