Skip to content

Commit

Permalink
Version checker improvement (#2020) (#2240)
Browse files Browse the repository at this point in the history
* Added isActive field to the ksql support metrics.

* Added tests.

* Updated the active status detection interval to 24 hours.

* Applied review feedback.

* Some refactoring.

* Fix tests

* Added to standalone.

* Test fix

* Refactoring based on Andy's feedabck.

* Refactoring with Andy!

* Change frequency to hour.

* CHange the frequency back to 24 hours.

* More changes addressing feedback.

* Added a comment.

* Temp commit

* Applied the feedback.

* Applied more feedback

* more feedback!

* Final feedback from Andy!

* Feedback from Victoria.
  • Loading branch information
hjafarpour committed Dec 7, 2018
1 parent 765a9e6 commit 4336b2f
Show file tree
Hide file tree
Showing 24 changed files with 614 additions and 196 deletions.
2 changes: 1 addition & 1 deletion ksql-cli/src/main/java/io/confluent/ksql/Ksql.java
Expand Up @@ -55,7 +55,7 @@ public static void main(final String[] args) throws IOException {
creds -> restClient.setupAuthenticationCredentials(creds.left, creds.right)
);

final KsqlVersionCheckerAgent versionChecker = new KsqlVersionCheckerAgent();
final KsqlVersionCheckerAgent versionChecker = new KsqlVersionCheckerAgent(() -> false);
versionChecker.start(KsqlModuleType.CLI, properties);

try (Cli cli = new Cli(options.getStreamedQueryRowLimit(),
Expand Down
Expand Up @@ -76,6 +76,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.websocket.DeploymentException;
import javax.websocket.server.ServerEndpoint;
Expand Down Expand Up @@ -262,7 +264,8 @@ public <T> T getEndpointInstance(final Class<T> endpointClass) {
JsonMapper.INSTANCE.mapper,
statementParser,
ksqlEngine,
exec
exec,
versionCheckerAgent::updateLastRequestTime
);
}

Expand All @@ -276,9 +279,8 @@ public <T> T getEndpointInstance(final Class<T> endpointClass) {

public static KsqlRestApplication buildApplication(
final KsqlRestConfig restConfig,
final VersionCheckerAgent versionCheckerAgent
)
throws Exception {
final Function<Supplier<Boolean>, VersionCheckerAgent> versionCheckerFactory
) {

final String ksqlInstallDir = restConfig.getString(KsqlRestConfig.INSTALL_DIR_CONFIG);

Expand Down Expand Up @@ -361,18 +363,22 @@ public static KsqlRestApplication buildApplication(
final RootDocument rootDocument = new RootDocument();

final StatusResource statusResource = new StatusResource(statementExecutor);
final VersionCheckerAgent versionChecker = versionCheckerFactory
.apply(() -> !ksqlEngine.getLivePersistentQueries().isEmpty());
final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlConfig,
ksqlEngine,
statementParser,
Duration.ofMillis(
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG))
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)),
versionChecker::updateLastRequestTime
);
final KsqlResource ksqlResource = new KsqlResource(
ksqlConfig,
ksqlEngine,
commandStore,
restConfig.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)
restConfig.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG),
versionChecker::updateLastRequestTime
);

commandRunner.processPriorCommands();
Expand All @@ -386,7 +392,7 @@ public static KsqlRestApplication buildApplication(
statusResource,
streamedQueryResource,
ksqlResource,
versionCheckerAgent
versionChecker
);
}

Expand Down
Expand Up @@ -81,7 +81,7 @@ private static Executable createExecutable(
final KsqlRestConfig restConfig = new KsqlRestConfig(properties);
return KsqlRestApplication.buildApplication(
restConfig,
new KsqlVersionCheckerAgent()
KsqlVersionCheckerAgent::new
);
}
}
Expand Up @@ -37,6 +37,9 @@
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.Version;
import io.confluent.ksql.util.WelcomeMsgUtils;
import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent;
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import java.io.Console;
import java.io.IOException;
import java.io.OutputStreamWriter;
Expand Down Expand Up @@ -65,17 +68,21 @@ public class StandaloneExecutor implements Executable {
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final Map<String, Object> configProperties = new HashMap<>();
private final boolean failOnNoQueries;
private final VersionCheckerAgent versionCheckerAgent;

StandaloneExecutor(final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final String queriesFile,
final UdfLoader udfLoader,
final boolean failOnNoQueries) {
final boolean failOnNoQueries,
final VersionCheckerAgent versionCheckerAgent) {
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig can't be null");
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine can't be null");
this.queriesFile = Objects.requireNonNull(queriesFile, "queriesFile can't be null");
this.udfLoader = Objects.requireNonNull(udfLoader, "udfLoader can't be null");
this.failOnNoQueries = failOnNoQueries;
this.versionCheckerAgent =
Objects.requireNonNull(versionCheckerAgent, "VersionCheckerAgenr cannot be null.");
}

private interface Handler<T extends Statement> {
Expand Down Expand Up @@ -185,6 +192,9 @@ public void start() {
udfLoader.load();
executeStatements(readQueriesFile(queriesFile));
showWelcomeMessage();
final Properties properties = new Properties();
properties.putAll(configProperties);
versionCheckerAgent.start(KsqlModuleType.SERVER, properties);
} catch (final Exception e) {
log.error("Failed to start KSQL Server with query file: " + queriesFile, e);
stop();
Expand All @@ -206,15 +216,22 @@ public void join() throws InterruptedException {
shutdownLatch.await();
}

public static StandaloneExecutor create(final Properties properties,
final String queriesFile,
final String installDir) {
public static StandaloneExecutor create(
final Properties properties,
final String queriesFile,
final String installDir) {
final KsqlConfig ksqlConfig = new KsqlConfig(properties);
final KsqlEngine ksqlEngine = KsqlEngine.create(ksqlConfig);
final UdfLoader udfLoader = UdfLoader.newInstance(ksqlConfig,
ksqlEngine.getMetaStore(),
installDir);
return new StandaloneExecutor(ksqlConfig, ksqlEngine, queriesFile, udfLoader, true);
return new StandaloneExecutor(
ksqlConfig,
ksqlEngine,
queriesFile,
udfLoader,
true,
new KsqlVersionCheckerAgent(() -> !ksqlEngine.getLivePersistentQueries().isEmpty()));
}

private void showWelcomeMessage() {
Expand Down
Expand Up @@ -108,6 +108,7 @@
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.StatementWithSchema;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -140,25 +141,30 @@ public class KsqlResource {
private final KsqlEngine ksqlEngine;
private final ReplayableCommandQueue replayableCommandQueue;
private final long distributedCommandResponseTimeout;
private final ActivenessRegistrar activenessRegistrar;

public KsqlResource(
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final ReplayableCommandQueue replayableCommandQueue,
final long distributedCommandResponseTimeout
final long distributedCommandResponseTimeout,
final ActivenessRegistrar activenessRegistrar
) {
this.ksqlConfig = ksqlConfig;
this.ksqlEngine = ksqlEngine;
this.replayableCommandQueue = replayableCommandQueue;
this.distributedCommandResponseTimeout = distributedCommandResponseTimeout;
this.registerKsqlStatementTasks();
this.activenessRegistrar =
Objects.requireNonNull(activenessRegistrar, "activenessRegistrar cannot be null.");
}

@POST
public Response handleKsqlStatements(final KsqlRequest request) {
final List<PreparedStatement> parsedStatements;
final KsqlEntityList result = new KsqlEntityList();

activenessRegistrar.updateLastRequestTime();
try {
parsedStatements = ksqlEngine.parseStatements(request.getKsql());
} catch (final ParseFailedException e) {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.rest.util.JsonMapper;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.time.Duration;
import java.util.Objects;
import javax.ws.rs.Consumes;
Expand All @@ -52,19 +53,23 @@ public class StreamedQueryResource {
private final StatementParser statementParser;
private final Duration disconnectCheckInterval;
private final ObjectMapper objectMapper;
private final ActivenessRegistrar activenessRegistrar;

public StreamedQueryResource(
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final StatementParser statementParser,
final Duration disconnectCheckInterval
final Duration disconnectCheckInterval,
final ActivenessRegistrar activenessRegistrar
) {
this.ksqlConfig = ksqlConfig;
this.ksqlEngine = ksqlEngine;
this.statementParser = statementParser;
this.disconnectCheckInterval =
Objects.requireNonNull(disconnectCheckInterval, "disconnectCheckInterval");
this.objectMapper = JsonMapper.INSTANCE.mapper;
this.activenessRegistrar =
Objects.requireNonNull(activenessRegistrar, "activenessRegistrar");
}

@POST
Expand All @@ -74,7 +79,7 @@ public Response streamQuery(final KsqlRequest request) throws Exception {
if (ksql.isEmpty()) {
return Errors.badRequest("\"ksql\" field must be populated");
}

activenessRegistrar.updateLastRequestTime();
try {
statement = statementParser.parseSingleStatement(ksql);
} catch (IllegalArgumentException | KsqlException e) {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.rest.entity.Versions;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand All @@ -55,6 +56,7 @@ public class WSQueryEndpoint {
private final StatementParser statementParser;
private final KsqlEngine ksqlEngine;
private final ListeningScheduledExecutorService exec;
private final ActivenessRegistrar activenessRegistrar;

private WebSocketSubscriber subscriber;

Expand All @@ -63,19 +65,23 @@ public WSQueryEndpoint(
final ObjectMapper mapper,
final StatementParser statementParser,
final KsqlEngine ksqlEngine,
final ListeningScheduledExecutorService exec
final ListeningScheduledExecutorService exec,
final ActivenessRegistrar activenessRegistrar
) {
this.ksqlConfig = ksqlConfig;
this.mapper = mapper;
this.statementParser = statementParser;
this.ksqlEngine = ksqlEngine;
this.exec = exec;
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig");
this.mapper = Objects.requireNonNull(mapper, "mapper");
this.statementParser = Objects.requireNonNull(statementParser, "statementParser");
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.exec = Objects.requireNonNull(exec, "exec");
this.activenessRegistrar =
Objects.requireNonNull(activenessRegistrar, "activenessRegistrar");
}

@OnOpen
public void onOpen(final Session session, final EndpointConfig endpointConfig) {
log.debug("Opening websocket session {}", session.getId());
final Map<String, List<String>> parameters = session.getRequestParameterMap();
activenessRegistrar.updateLastRequestTime();

final List<String> versionParam = parameters.getOrDefault(
Versions.KSQL_V1_WS_PARAM, Arrays.asList(Versions.KSQL_V1_WS));
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;

import org.easymock.EasyMock;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -33,7 +34,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
Expand All @@ -54,7 +54,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PageViewDataProvider;
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import io.confluent.rest.RestConfig;
import io.confluent.rest.validation.JacksonMessageBodyProvider;

Expand All @@ -74,7 +73,6 @@ public class RestApiTest {

private static String serverAddress;


private Client restClient;

@BeforeClass
Expand All @@ -83,6 +81,7 @@ public static void setUpClass() throws Exception {
config.put(KsqlRestConfig.INSTALL_DIR_CONFIG, TestUtils.tempDirectory().getPath());
config.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "rest_api_test_service");


testHarness.start(config);

startRestServer(testHarness.allConfigs());
Expand Down Expand Up @@ -140,13 +139,6 @@ public static void cleanUpClass() throws Exception {
testHarness.stop();
}

private static class DummyVersionCheckerAgent implements VersionCheckerAgent {
@Override
public void start(final KsqlModuleType moduleType, final Properties ksqlProperties) {
// do nothing;
}
}

private static Client buildClient() {
final ObjectMapper objectMapper = JsonMapper.INSTANCE.mapper;
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Expand Down Expand Up @@ -175,8 +167,10 @@ private static void startRestServer(Map<String, Object> configs) throws Exceptio
final int port = randomFreeLocalPort();
serverAddress = "http://localhost:" + port;
configs.put(RestConfig.LISTENERS_CONFIG, serverAddress);
restApplication = KsqlRestApplication.buildApplication(new KsqlRestConfig(configs),
new DummyVersionCheckerAgent());
restApplication = KsqlRestApplication.buildApplication(
new KsqlRestConfig(configs),
(booleanSupplier) -> EasyMock.niceMock(VersionCheckerAgent.class)
);
restApplication.start();
return;
} catch (BindException e) {
Expand Down

0 comments on commit 4336b2f

Please sign in to comment.