Skip to content

Commit

Permalink
tuning ssh keepalive settings (#33727)
Browse files Browse the repository at this point in the history
Signed-off-by: Gireesh Sreepathi <gisripa@gmail.com>
  • Loading branch information
gisripa committed Jan 5, 2024
1 parent 64f07e4 commit 8524b32
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 12 deletions.
3 changes: 2 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ MavenLocal debugging steps:
### Java CDK

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.11.1 | 2024-01-04 | [\#33727](https://github.com/airbytehq/airbyte/pull/33727) | SSH bastion heartbeats for Destinations |
| 0.11.0 | 2024-01-04 | [\#33730](https://github.com/airbytehq/airbyte/pull/33730) | DV2 T+D uses Sql struct to represent transactions; other T+D-related changes |
| 0.10.4 | 2023-12-20 | [\#33071](https://github.com/airbytehq/airbyte/pull/33071) | Add the ability to parse JDBC parameters with another delimiter than '&' |
| 0.10.3 | 2024-01-03 | [\#33312](https://github.com/airbytehq/airbyte/pull/33312) | Send out count in AirbyteStateMessage |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.session.SessionHeartbeatController;
import org.apache.sshd.common.util.net.SshdSocketAddress;
import org.apache.sshd.common.util.security.SecurityUtils;
import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,12 +49,24 @@ public class SshTunnel implements AutoCloseable {
public static final String SSH_TIMEOUT_DISPLAY_MESSAGE =
"Timed out while opening a SSH Tunnel. Please double check the given SSH configurations and try again.";

public static final String CONNECTION_OPTIONS_KEY = "ssh_connection_options";
public static final String SESSION_HEARTBEAT_INTERVAL_KEY = "session_heartbeat_interval";
public static final long SESSION_HEARTBEAT_INTERVAL_DEFAULT_IN_MILLIS = 1000;
public static final String GLOBAL_HEARTBEAT_INTERVAL_KEY = "global_heartbeat_interval";
public static final long GLOBAL_HEARTBEAT_INTERVAL_DEFAULT_IN_MILLIS = 2000;
public static final String IDLE_TIMEOUT_KEY = "idle_timeout";
public static final long IDLE_TIMEOUT_DEFAULT_INFINITE = 0;

public enum TunnelMethod {
NO_TUNNEL,
SSH_PASSWORD_AUTH,
SSH_KEY_AUTH
}

public record SshConnectionOptions(Duration sessionHeartbeatInterval,
Duration globalHeartbeatInterval,
Duration idleTimeout) {}

public static final int TIMEOUT_MILLIS = 15000; // 15 seconds

private final JsonNode config;
Expand Down Expand Up @@ -99,6 +114,7 @@ public enum TunnelMethod {
* tunnel host).
* @param remoteServicePort - the actual port of the remote service (as it is known to the tunnel
* host).
* @param connectionOptions - optional connection options for ssh client.
*/
public SshTunnel(final JsonNode config,
final List<String> hostKey,
Expand All @@ -112,7 +128,8 @@ public SshTunnel(final JsonNode config,
final String sshKey,
final String tunnelUserPassword,
final String remoteServiceHost,
final int remoteServicePort) {
final int remoteServicePort,
final Optional<SshConnectionOptions> connectionOptions) {
this.config = config;
this.hostKey = hostKey;
this.portKey = portKey;
Expand Down Expand Up @@ -168,11 +185,42 @@ public SshTunnel(final JsonNode config,
this.tunnelUser = tunnelUser;
this.sshKey = sshKey;
this.tunnelUserPassword = tunnelUserPassword;
this.sshclient = createClient();
this.sshclient = connectionOptions.map(sshConnectionOptions -> createClient(sshConnectionOptions.sessionHeartbeatInterval(),
sshConnectionOptions.globalHeartbeatInterval(),
sshConnectionOptions.idleTimeout())).orElseGet(this::createClient);
this.tunnelSession = openTunnel(sshclient);
}
}

public SshTunnel(final JsonNode config,
final List<String> hostKey,
final List<String> portKey,
final String endPointKey,
final String remoteServiceUrl,
final TunnelMethod tunnelMethod,
final String tunnelHost,
final int tunnelPort,
final String tunnelUser,
final String sshKey,
final String tunnelUserPassword,
final String remoteServiceHost,
final int remoteServicePort) {
this(config,
hostKey,
portKey,
endPointKey,
remoteServiceUrl,
tunnelMethod,
tunnelHost,
tunnelPort,
tunnelUser,
sshKey,
tunnelUserPassword,
remoteServiceHost,
remoteServicePort,
Optional.empty());
}

public JsonNode getOriginalConfig() {
return config;
}
Expand Down Expand Up @@ -216,7 +264,32 @@ public static SshTunnel getInstance(final JsonNode config, final List<String> ho
Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "ssh_key")),
Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "tunnel_user_password")),
Strings.safeTrim(Jsons.getStringOrNull(config, hostKey)),
Jsons.getIntOrZero(config, portKey));
Jsons.getIntOrZero(config, portKey),
getSshConnectionOptions(config));
}

@NotNull
private static Optional<SshConnectionOptions> getSshConnectionOptions(JsonNode config) {
// piggybacking on JsonNode config to make it configurable at connector level.
Optional<JsonNode> connectionOptionConfig = Jsons.getOptional(config, CONNECTION_OPTIONS_KEY);
final Optional<SshConnectionOptions> connectionOptions;
if (connectionOptionConfig.isPresent()) {
JsonNode connectionOptionsNode = connectionOptionConfig.get();
Duration sessionHeartbeatInterval = Jsons.getOptional(connectionOptionsNode, SESSION_HEARTBEAT_INTERVAL_KEY)
.map(interval -> Duration.ofMillis(interval.asLong()))
.orElse(Duration.ofSeconds(1));
Duration globalHeartbeatInterval = Jsons.getOptional(connectionOptionsNode, GLOBAL_HEARTBEAT_INTERVAL_KEY)
.map(interval -> Duration.ofMillis(interval.asLong()))
.orElse(Duration.ofSeconds(2));
Duration idleTimeout = Jsons.getOptional(connectionOptionsNode, IDLE_TIMEOUT_KEY)
.map(interval -> Duration.ofMillis(interval.asLong()))
.orElse(Duration.ZERO);
connectionOptions = Optional.of(
new SshConnectionOptions(sessionHeartbeatInterval, globalHeartbeatInterval, idleTimeout));
} else {
connectionOptions = Optional.empty();
}
return connectionOptions;
}

public static SshTunnel getInstance(final JsonNode config, final String endPointKey) throws Exception {
Expand All @@ -237,7 +310,8 @@ public static SshTunnel getInstance(final JsonNode config, final String endPoint
Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "tunnel_user")),
Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "ssh_key")),
Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "tunnel_user_password")),
null, 0);
null, 0,
getSshConnectionOptions(config));
}

public static void sshWrap(final JsonNode config,
Expand Down Expand Up @@ -332,7 +406,22 @@ private SshClient createClient() {
final SshClient client = SshClient.setUpDefaultClient();
client.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE);
client.setServerKeyVerifier(AcceptAllServerKeyVerifier.INSTANCE);
CoreModuleProperties.IDLE_TIMEOUT.set(client, Duration.ZERO);
return client;
}

private SshClient createClient(Duration sessionHeartbeatInterval, Duration globalHeartbeatInterval, Duration idleTimeout) {
LOGGER.info("Creating SSH client with Heartbeat and Keepalive enabled");
final SshClient client = createClient();
// Session level heartbeat using SSH_MSG_IGNORE every second.
client.setSessionHeartbeat(SessionHeartbeatController.HeartbeatType.IGNORE, sessionHeartbeatInterval);
// idle-timeout zero indicates NoTimeout.
CoreModuleProperties.IDLE_TIMEOUT.set(client, idleTimeout);
// Use tcp keep-alive mechanism.
CoreModuleProperties.SOCKET_KEEPALIVE.set(client, true);
// Additional delay used for ChannelOutputStream to wait for space in the remote socket send buffer.
CoreModuleProperties.WAIT_FOR_SPACE_TIMEOUT.set(client, Duration.ofMinutes(2));
// Global keepalive message sent every 2 seconds. This precedes the session level heartbeat.
CoreModuleProperties.HEARTBEAT_INTERVAL.set(client, globalHeartbeatInterval);
return client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.cdk.integrations.base.ssh;

import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
Expand All @@ -18,6 +20,7 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,8 +67,8 @@ public ConnectorSpecification spec() throws Exception {
@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
try {
return (endPointKey != null) ? SshTunnel.sshWrap(config, endPointKey, delegate::check)
: SshTunnel.sshWrap(config, hostKey, portKey, delegate::check);
return (endPointKey != null) ? sshWrap(config, endPointKey, delegate::check)
: sshWrap(config, hostKey, portKey, delegate::check);
} catch (final RuntimeException e) {
final String sshErrorMessage = "Could not connect with provided SSH configuration. Error: " + e.getMessage();
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, sshErrorMessage);
Expand Down Expand Up @@ -98,7 +101,17 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final SshTunnel tunnel = getTunnelInstance(config);
final JsonNode clone = Jsons.clone(config);
Optional<JsonNode> connectionOptionsConfig = Jsons.getOptional(clone, CONNECTION_OPTIONS_KEY);
if (connectionOptionsConfig.isEmpty()) {
LOGGER.info("No SSH connection options found, using defaults");
if (clone instanceof ObjectNode) { // Defensive check, it will always be object node
ObjectNode connectionOptions = ((ObjectNode) clone).putObject(CONNECTION_OPTIONS_KEY);
connectionOptions.put(SESSION_HEARTBEAT_INTERVAL_KEY, SESSION_HEARTBEAT_INTERVAL_DEFAULT_IN_MILLIS);
connectionOptions.put(GLOBAL_HEARTBEAT_INTERVAL_KEY, GLOBAL_HEARTBEAT_INTERVAL_DEFAULT_IN_MILLIS);
}
}
final SshTunnel tunnel = getTunnelInstance(clone);
final SerializedAirbyteMessageConsumer delegateConsumer;
try {
delegateConsumer = delegate.getSerializedMessageConsumer(tunnel.getConfigInTunnel(), catalog, outputRecordCollector);
Expand All @@ -112,8 +125,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN

protected SshTunnel getTunnelInstance(final JsonNode config) throws Exception {
return (endPointKey != null)
? SshTunnel.getInstance(config, endPointKey)
: SshTunnel.getInstance(config, hostKey, portKey);
? getInstance(config, endPointKey)
: getInstance(config, hostKey, portKey);
}

}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.11.0
version=0.11.1

0 comments on commit 8524b32

Please sign in to comment.