diff --git a/.github/workflows/emulate.yml b/.github/workflows/emulate.yml index 76f0842e0..3efd24324 100644 --- a/.github/workflows/emulate.yml +++ b/.github/workflows/emulate.yml @@ -16,6 +16,8 @@ jobs: excludeModules: true steps: - uses: actions/checkout@v3 + with: + submodules: recursive # Gradle 7 requires Java 11 to run - uses: actions/setup-java@v3 @@ -23,6 +25,9 @@ jobs: distribution: zulu java-version: '19' + - name: Start SDK test proxy server + run: cd external/sdk-test-proxy && ./start-service + - uses: reactivecircus/android-emulator-runner@v2 with: api-level: ${{ matrix.api-level }} diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..e702685c1 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "external/sdk-test-proxy"] + path = external/sdk-test-proxy + url = git@github.com:ably/sdk-test-proxy.git diff --git a/android-test-common/build.gradle b/android-test-common/build.gradle index 5e94517eb..3d554d527 100644 --- a/android-test-common/build.gradle +++ b/android-test-common/build.gradle @@ -6,6 +6,8 @@ plugins { id 'com.android.library' id 'kotlin-android' id 'org.jlleitschuh.gradle.ktlint' + // TODO should this be here? + id 'org.jetbrains.kotlin.plugin.serialization' version "$kotlin_version" } dependencies { @@ -21,6 +23,8 @@ dependencies { implementation "io.ktor:ktor-client-logging:$ktor_version" implementation "uk.uuid.slf4j:slf4j-android:1.7.36-0" + implementation 'org.jetbrains.kotlinx:kotlinx-serialization-json:1.4.1' + implementation 'org.msgpack:msgpack-core:0.9.3' } diff --git a/android-test-common/src/main/java/com/ably/tracking/test/android/common/AblyProxy.kt b/android-test-common/src/main/java/com/ably/tracking/test/android/common/AblyProxy.kt index a1ab869aa..a6354038a 100644 --- a/android-test-common/src/main/java/com/ably/tracking/test/android/common/AblyProxy.kt +++ b/android-test-common/src/main/java/com/ably/tracking/test/android/common/AblyProxy.kt @@ -2,93 +2,25 @@ package com.ably.tracking.test.android.common import io.ably.lib.types.ClientOptions import io.ably.lib.util.Log -import io.ktor.client.HttpClient -import io.ktor.client.plugins.logging.LogLevel -import io.ktor.client.plugins.logging.Logger -import io.ktor.client.plugins.logging.Logging -import io.ktor.client.plugins.websocket.ClientWebSocketSession -import io.ktor.client.plugins.websocket.cio.wsRaw -import io.ktor.http.Parameters -import io.ktor.http.ParametersBuilder -import io.ktor.http.Url -import io.ktor.server.application.install -import io.ktor.server.engine.ApplicationEngine -import io.ktor.server.engine.embeddedServer -import io.ktor.server.plugins.callloging.CallLogging -import io.ktor.server.request.httpMethod -import io.ktor.server.request.path -import io.ktor.server.routing.Route -import io.ktor.server.routing.routing -import io.ktor.server.websocket.WebSocketServerSession -import io.ktor.server.websocket.WebSockets -import io.ktor.server.websocket.webSocketRaw -import io.ktor.websocket.Frame -import io.ktor.websocket.FrameType -import io.ktor.websocket.close -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch -import org.slf4j.event.Level -import java.net.ServerSocket -import java.net.Socket -import java.net.SocketException -import javax.net.ssl.SSLSocketFactory -import io.ktor.client.engine.cio.CIO as ClientCIO -import io.ktor.server.cio.CIO as ServerCIO private const val AGENT_HEADER_NAME = "ably-asset-tracking-android-publisher-tests" -private const val PROXY_HOST = "localhost" -private const val PROXY_PORT = 13579 -private const val REALTIME_HOST = "realtime.ably.io" -private const val REALTIME_PORT = 443 - const val PUBLISHER_CLIENT_ID = "AatNetworkConnectivityTests_Publisher" const val SUBSCRIBER_CLIENT_ID = "AatNetworkConnectivityTests_Subscriber" /** * A local proxy that can be used to intercept Realtime traffic for testing */ -interface RealtimeProxy { - /** - * Start the proxy listening for connections - */ - fun start() - - /** - * Stop the proxy and close any active connetions - */ - fun stop() - +class RealtimeProxy( + private val dto: ProxyDto, + private val host: String, + private val apiKey: String +) { /** * Ably ClientOptions that have been configured to direct traffic * through this proxy service */ - fun clientOptions(): ClientOptions -} - -/** - * Common base class for proxies to provide ClientOptions generation - */ -abstract class AatProxy( - private val apiKey: String -) : RealtimeProxy { - - /** - * The host address the proxy will listen on - */ - abstract val listenHost: String - - /** - * The port the proxy will be listening on - */ - abstract val listenPort: Int - - /** - * Pre-configured client options to configure AblyRealtime to send traffic locally through - * this proxy. Note that TLS is disabled, so that the proxy can act as a man in the middle. - */ - override fun clientOptions() = ClientOptions().apply { + fun clientOptions() = ClientOptions().apply { this.clientId = PUBLISHER_CLIENT_ID this.agents = mapOf(AGENT_HEADER_NAME to BuildConfig.VERSION_NAME) this.idempotentRestPublishing = true @@ -98,466 +30,9 @@ abstract class AatProxy( testLogD("${msg!!} - $tr", tr) } this.logLevel = Log.VERBOSE - this.realtimeHost = listenHost - this.port = listenPort + this.realtimeHost = host + this.port = dto.listenPort + // Note that TLS is disabled, so that the proxy can act as a man in the middle. this.tls = false } } - -/** - * A TCP Proxy, which can run locally and intercept traffic to Ably realtime. - * - * This proxy is only capable of simulating faults at the transport layer, such - * as connections being interrupted or packets being dropped entirely. - */ -class Layer4Proxy( - apiKey: String, - override val listenHost: String = PROXY_HOST, - override val listenPort: Int = PROXY_PORT, - private val targetAddress: String = REALTIME_HOST, - private val targetPort: Int = REALTIME_PORT, -) : AatProxy(apiKey) { - - private val loggingTag = "Layer4Proxy" - - private var server: ServerSocket? = null - private val sslSocketFactory = SSLSocketFactory.getDefault() - private val connections: MutableList = mutableListOf() - - /** - * Flag mutated by fault implementations to hang the TCP connection - */ - var isForwarding = true - - /** - * Block current thread and wait for a new incoming client connection on the server socket. - * Returns a connection object when a client has connected. - */ - private fun accept(): Layer4ProxyConnection { - val clientSock = server?.accept() - testLogD("$loggingTag: accepted connection") - - val serverSock = sslSocketFactory.createSocket(targetAddress, targetPort) - val conn = Layer4ProxyConnection(serverSock, clientSock!!, targetAddress, parentProxy = this) - connections.add(conn) - return conn - } - - /** - * Close open connections and stop listening for new local connections - */ - override fun stop() { - server?.close() - server = null - - connections.forEach { - it.stop() - } - connections.clear() - } - - /** - * Begin a background thread listening for local Realtime connections - */ - override fun start() { - if (server != null) { - testLogD("$loggingTag: start() called while already running") - return - } - - server = ServerSocket(listenPort) - Thread { - while (true) { - testLogD("$loggingTag: proxy trying to accept") - try { - val conn = this.accept() - testLogD("$loggingTag: proxy starting to run") - conn.run() - } catch (e: Exception) { - testLogD("$loggingTag: proxy shutting down: " + e.message) - break - } - } - }.start() - } -} - -/** - * A TCP Proxy connection between a local client and the remote Ably service. - */ -internal class Layer4ProxyConnection( - private val server: Socket, - private val client: Socket, - private val targetHost: String, - private val parentProxy: Layer4Proxy -) { - - private val loggingTag = "Layer4ProxyConnection" - - /** - * Starts two threads, one forwarding traffic in each direction between - * the local client and the Ably Realtime service. - */ - fun run() { - Thread { proxy(server, client, true) }.start() - Thread { proxy(client, server) }.start() - } - - /** - * Close socket connections, causing proxy threads to exit. - */ - fun stop() { - try { - server.close() - } catch (e: Exception) { - testLogD("$loggingTag: stop() server: $e", e) - } - - try { - client.close() - } catch (e: Exception) { - testLogD("$loggingTag: stop() client: $e", e) - } - } - - /** - * Copies traffic between source and destination sockets, rewriting the - * HTTP host header if requested to remove the proxy host details. - */ - private fun proxy(dstSock: Socket, srcSock: Socket, rewriteHost: Boolean = false) { - try { - val dst = dstSock.getOutputStream() - val src = srcSock.getInputStream() - val buff = ByteArray(4096) - var bytesRead: Int - - // deal with the initial HTTP upgrade packet - bytesRead = src.read(buff) - if (bytesRead < 0) { - return - } - - // HTTP is plaintext so we can just read it - val msg = String(buff, 0, bytesRead) - testLogD("$loggingTag: ${String(buff.copyOfRange(0, bytesRead))}") - if (rewriteHost) { - val newMsg = msg.replace( - oldValue = "${parentProxy.listenHost}:${parentProxy.listenPort}", - newValue = targetHost - ) - val newBuff = newMsg.toByteArray() - dst.write(newBuff, 0, newBuff.size) - } else { - dst.write(buff, 0, bytesRead) - } - - while (-1 != src.read(buff).also { bytesRead = it }) { - if (parentProxy.isForwarding) { - dst.write(buff, 0, bytesRead) - } - } - } catch (ignored: SocketException) { - } catch (e: Exception) { - testLogD("$loggingTag: $e", e) - } finally { - try { - srcSock.close() - } catch (ignored: Exception) { - } - } - } -} - -/** - * A WebSocket proxy for realtime connections, to allow interventions at - * the Ably protocol level. - */ -class Layer7Proxy( - apiKey: String, - override val listenHost: String = PROXY_HOST, - override val listenPort: Int = PROXY_PORT, - private val targetHost: String = REALTIME_HOST, - private val targetPort: Int = REALTIME_PORT -) : AatProxy(apiKey) { - - companion object { - const val tag = "Layer7Proxy" - } - - private var server: ApplicationEngine? = null - var interceptor: Layer7Interceptor = PassThroughInterceptor() - - override fun start() { - testLogD("$tag: starting...") - server = embeddedServer( - ServerCIO, - port = listenPort, - host = listenHost - ) { - install(CallLogging) { - level = Level.TRACE - } - install(WebSockets) - routing { - wsProxy( - path = "/", - target = Url("wss://$targetHost:$targetPort/"), - parent = this@Layer7Proxy - ) - } - }.start(wait = false) - } - - override fun stop() { - testLogD("$tag: stopping...") - server?.stop() - testLogD("$tag: done stopping...") - } - - /** - * Receives frames from incoming channel and forwards to receiver as appropriate, - * calling an intercept to see if any interventions are required. - */ - suspend fun forwardFrames( - direction: FrameDirection, - incoming: ReceiveChannel, - clientSession: ClientWebSocketSession, - serverSession: WebSocketServerSession, - ) { - for (received in incoming) { - testLogD("$tag: (raw) [$direction] ${logFrame(received)}") - try { - for (action in interceptor.interceptFrame(direction, received)) { - testLogD("$tag: (forwarding) [${action.direction}]: ${logFrame(action.frame)}") - when (action.direction) { - FrameDirection.ClientToServer -> { - clientSession.send(action.frame) - if (action.sendAndClose) { - clientSession.close() - } - } - FrameDirection.ServerToClient -> { - serverSession.send(action.frame) - if (action.sendAndClose) { - serverSession.close() - } - } - } - } - } catch (e: Exception) { - testLogD("$tag: forwardFrames error: $e", e) - throw(e) - } - } - } -} - -/** - * Proxy a WebSocket connection to the remote URL, setting up coroutines - * to send a receive frames in both directions - */ -fun Route.wsProxy(path: String, target: Url, parent: Layer7Proxy) { - webSocketRaw(path) { - testLogD("${Layer7Proxy.tag}: Client connected to $path") - - val serverSession = this - val client = configureWsClient() - - val params = parent.interceptor.interceptConnection( - ConnectionParams.fromRequestParameters(call.request.queryParameters) - ) - - client.wsRaw( - method = call.request.httpMethod, - host = target.host, - port = target.port, - path = call.request.path(), - request = { - url.protocol = target.protocol - url.port = target.port - - // Forward connection parameters and rewrite the Host header, as - // it will be the proxy host by default - params.applyToBuilder(url.parameters) - headers["Host"] = target.host - } - ) { - val clientSession = this - - val serverJob = launch { - testLogD("${Layer7Proxy.tag}: ==> (started)") - parent.forwardFrames( - FrameDirection.ClientToServer, - serverSession.incoming, - clientSession, - serverSession - ) - } - - val clientJob = launch { - testLogD("${Layer7Proxy.tag}: <== (started)") - parent.forwardFrames( - FrameDirection.ServerToClient, - clientSession.incoming, - clientSession, - serverSession - ) - } - - listOf(serverJob, clientJob).joinAll() - } - } -} - -/** - * Return a Ktor HTTP Client configured for WebSockets and with logging - * we can see in logcat - */ -fun configureWsClient() = - HttpClient(ClientCIO).config { - install(io.ktor.client.plugins.websocket.WebSockets) { - } - install(Logging) { - logger = object : Logger { - override fun log(message: String) { - testLogD("${Layer7Proxy.tag}: ktor client: $message") - } - } - level = LogLevel.ALL - } - } - -/** - * Return a string representation of a WS Frame for logging purposes - */ -fun logFrame(frame: Frame) = - if (frame.frameType == FrameType.BINARY) { - (frame.data.unpack()).toString() - } else { - frame.toString() - } - -interface Layer7Interceptor { - - /** - * Handle a new incoming connection with provided parameters. - * Return (potentially) altered connection parameters and apply any - * fault-specific side-effects internally. - */ - fun interceptConnection(params: ConnectionParams): ConnectionParams - - /** - * Intercept a Frame being passed through the proxy, returning a list - * of Actions to be performed in response. Note that doing nothing - * (i.e. passing through), is an Action in itself - */ - fun interceptFrame(direction: FrameDirection, frame: Frame): List -} - -/** - * Ably WebSocket connection parameters. - * Enables faults to make alterations to incoming request parameters, before - * the corresponding outgoing connection is made. - */ -data class ConnectionParams( - val clientId: String?, - val connectionSerial: String?, - val resume: String?, - val key: String?, - val heartbeats: String?, - val v: String?, - val format: String?, - val agent: String? -) { - companion object { - - /** - * Construct ConnectionParams from an incoming WebSocket connection request - */ - fun fromRequestParameters(params: Parameters) = - ConnectionParams( - clientId = params["clientId"], - connectionSerial = params["connectionSerial"], - resume = params["resume"], - key = params["key"], - heartbeats = params["heartbeats"], - v = params["v"], - format = params["format"], - agent = params["agent"] - ) - } - - /** - * Apply the (potentially altered) connection parameters in this instance - * to an outgoing connection WebSocket connection request - */ - fun applyToBuilder(paramsBuilder: ParametersBuilder) { - if (clientId != null) { - paramsBuilder["clientId"] = clientId - } - if (connectionSerial != null) { - paramsBuilder["connectionSerial"] = connectionSerial - } - if (resume != null) { - paramsBuilder["resume"] = resume - } - if (key != null) { - paramsBuilder["key"] = key - } - if (heartbeats != null) { - paramsBuilder["heartbeats"] = heartbeats - } - if (v != null) { - paramsBuilder["v"] = v - } - if (format != null) { - paramsBuilder["format"] = format - } - if (agent != null) { - paramsBuilder["agent"] = agent - } - } -} - -/** - * Direction of a WebSocket Frame being intercepted by the proxy - */ -enum class FrameDirection { - ClientToServer, - ServerToClient, -} - -/** - * Action an interception wants to perform in response to an observed - * message, or potentially a sequence of messages if it's retaining state. - */ -data class Action( - /** - * Direction to send the frame in - */ - val direction: FrameDirection, - - /** - * Websocket frame to be sent - */ - val frame: Frame, - - /** - * Flag to instruct proxy to close connection after performing - * this action - */ - val sendAndClose: Boolean = frame.frameType == FrameType.CLOSE -) - -/** - * An interceptor implementation that passes all data through normally - */ -class PassThroughInterceptor : Layer7Interceptor { - - override fun interceptConnection(params: ConnectionParams) = params - - override fun interceptFrame( - direction: FrameDirection, - frame: Frame - ) = listOf(Action(direction, frame)) -} diff --git a/android-test-common/src/main/java/com/ably/tracking/test/android/common/FaultProxyClient.kt b/android-test-common/src/main/java/com/ably/tracking/test/android/common/FaultProxyClient.kt new file mode 100644 index 000000000..f2c04a6ed --- /dev/null +++ b/android-test-common/src/main/java/com/ably/tracking/test/android/common/FaultProxyClient.kt @@ -0,0 +1,73 @@ +package com.ably.tracking.test.android.common + +import java.net.URL +import kotlinx.serialization.Serializable +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.plugins.logging.Logging +import io.ktor.client.request.get +import io.ktor.client.request.post +import io.ktor.http.Url +import io.ktor.http.URLBuilder +import io.ktor.http.appendPathSegments +import kotlinx.serialization.decodeFromString +import kotlinx.serialization.json.Json +import io.ktor.client.call.body + +// TODO don't duplicate these, make a library instead +@Serializable +data class ProxyDto(val listenPort: Int) + +@Serializable +data class FaultSimulationDto(val id: String, val name: String, val type: FaultType, val proxy: ProxyDto) + +class FaultProxyClient( + private val baseUrl: URL +) { + private val client = HttpClient(CIO) { + install(Logging) + expectSuccess = true + } + + private fun urlForPathComponents(vararg pathComponents: String): Url { + return URLBuilder(baseUrl.toString()) + .appendPathSegments(*pathComponents) + .build() + } + + suspend fun getAllFaults(): List { + val url = urlForPathComponents("faults") + + val response = client.get(url) + val faultNames = Json.decodeFromString>(response.body()) + + return faultNames.map { Fault(it, this) } + } + + suspend fun createFaultSimulation(faultName: String, apiKey: String): FaultSimulation { + val url = urlForPathComponents("faults", faultName, "simulation") + + val response = client.post(url) + val dto = Json.decodeFromString(response.body()) + + return FaultSimulation(dto, baseUrl.host, apiKey, this) + } + + suspend fun enableFaultSimulation(id: String) { + val url = urlForPathComponents("fault-simulations", id, "enable") + + client.post(url) + } + + suspend fun resolveFaultSimulation(id: String) { + val url = urlForPathComponents("fault-simulations", id, "resolve") + + client.post(url) + } + + suspend fun cleanUpFaultSimulation(id: String) { + val url = urlForPathComponents("fault-simulations", id, "clean-up") + + client.post(url) + } +} diff --git a/android-test-common/src/main/java/com/ably/tracking/test/android/common/Faults.kt b/android-test-common/src/main/java/com/ably/tracking/test/android/common/Faults.kt index d4385ef34..1e8811a3f 100644 --- a/android-test-common/src/main/java/com/ably/tracking/test/android/common/Faults.kt +++ b/android-test-common/src/main/java/com/ably/tracking/test/android/common/Faults.kt @@ -1,26 +1,27 @@ package com.ably.tracking.test.android.common -import io.ktor.websocket.Frame -import io.ktor.websocket.FrameType -import java.util.Timer -import kotlin.concurrent.timerTask +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable /** * A simple factory interface to build new instances of a specific [FaultSimulation] * This is needed because JUnit parameterized tests won't construct fresh instances * of data parameters for each test - they wouldn't know how, without a factory. */ -abstract class Fault { - +class Fault( /** - * Create a fresh simulation of this fault type, using provided Ably credentials + * A human-readable name for this type of fault */ - abstract fun simulate(apiKey: String): FaultSimulation + val name: String, + private val client: FaultProxyClient +) { /** - * A human-readable name for this type of fault + * Create a fresh simulation of this fault type, using provided Ably credentials */ - abstract val name: String + suspend fun simulate(apiKey: String): FaultSimulation { + return client.createFaultSimulation(name, apiKey) + } override fun toString() = name } @@ -34,47 +35,45 @@ abstract class Fault { * and after it has been resolved, so that assertions can be made while testing * common use-cases. */ -abstract class FaultSimulation { +class FaultSimulation( + private val dto: FaultSimulationDto, + proxyHost: String, + apiKey: String, + private val client: FaultProxyClient +) { + val proxy: RealtimeProxy + val type: FaultType - /** - * The type of fault this simulates - used to validate the state of trackables - * and channel activity during and after the fault. - */ - abstract val type: FaultType + init { + proxy = RealtimeProxy(this.dto.proxy, proxyHost, apiKey) + type = dto.type + } - /** - * Subclasses can override this value to `true` in order to skip test that use this fault. - * We're using this in order to allow us to write tests which are known to fail, then allow them to pass in the CI - * environment temporarily until we subsequently raise a pull request to fix them. - * The advantage of this approach is that the test code remains active and continually compiled as - * a first class citizen of the codebase, while we work on other things to get it passing. - */ - open val skipPublisherTest: Boolean = false + // TODO decide how to address this – we don't have any skipped tests right now + val skipPublisherTest = false - // Causes the same behaviour as skipPublisherTest for the subscriber NetworkConnectivityTests - open val skipSubscriberTest: Boolean = false - - /** - * A RealtimeProxy instance that will be manipulated by this fault - */ - abstract val proxy: RealtimeProxy + val skipSubscriberTest = false /** * Break the proxy using the fault-specific failure conditions */ - abstract fun enable() + suspend fun enable() { + client.enableFaultSimulation(dto.id) + } /** * Restore the proxy to normal functionality */ - abstract fun resolve() + suspend fun resolve() { + client.resolveFaultSimulation(dto.id) + } /** * Called at start of test tearDown function to ensure fault doesn't interefere with test * tearDown of open resources. */ - open fun cleanUp() { - proxy.stop() + suspend fun cleanUp() { + client.cleanUpFaultSimulation(dto.id) } } @@ -82,6 +81,7 @@ abstract class FaultSimulation { * Describes the nature of a given fault simulation, and specifically the impact that it * should have on any Trackables or channel activity during and after resolution. */ +@kotlinx.serialization.Serializable sealed class FaultType { /** * AAT and/or Ably Java should handle this fault seamlessly Trackable state should be @@ -89,6 +89,8 @@ sealed class FaultType { * the fault will cause a brief Offline blip, but tests should expect to see Trackables * Online again before [resolvedWithinMillis] expires regardless. */ + @Serializable + @SerialName("nonfatal") data class Nonfatal( val resolvedWithinMillis: Long, ) : FaultType() @@ -100,6 +102,8 @@ sealed class FaultType { * online within [onlineWithinMillis] maximum. * */ + @Serializable + @SerialName("nonfatalWhenResolved") data class NonfatalWhenResolved( val offlineWithinMillis: Long, val onlineWithinMillis: Long, @@ -111,751 +115,9 @@ sealed class FaultType { * further location updates will be published. Tests should check that Trackables reach * the Failed state within [failedWithinMillis] */ + @Serializable + @SerialName("fatal") data class Fatal( val failedWithinMillis: Long, ) : FaultType() } - -/** - * Base class for faults requiring a Layer 4 proxy for simulation. - */ -abstract class TransportLayerFault(apiKey: String) : FaultSimulation() { - val tcpProxy = Layer4Proxy(apiKey = apiKey) - override val proxy = tcpProxy -} - -/** - * A Transport-layer fault implementation that breaks nothing, useful for ensuring the - * test code works under normal proxy functionality. - */ -class NullTransportFault(apiKey: String) : TransportLayerFault(apiKey) { - - companion object { - val fault = object : Fault() { - override fun simulate(apiKey: String) = NullTransportFault(apiKey) - override val name = "NullTransportFault" - } - } - - override val type = FaultType.Nonfatal( - resolvedWithinMillis = 10_000L - ) - - override fun enable() { - } - - override fun resolve() { - } -} - -/** - * A fault implementation that will prevent the proxy from accepting TCP connections when active - */ -class TcpConnectionRefused(apiKey: String) : TransportLayerFault(apiKey) { - - companion object { - val fault = object : Fault() { - override fun simulate(apiKey: String) = TcpConnectionRefused(apiKey) - override val name = "TcpConnectionRefused" - } - } - - override val type = FaultType.NonfatalWhenResolved( - offlineWithinMillis = 30_000, - onlineWithinMillis = 60_000 - ) - - // May be able to be removed once the issues surrounding skipTest are resolved - override val skipSubscriberTest = true - - override fun enable() { - tcpProxy.stop() - } - - override fun resolve() { - tcpProxy.start() - } -} - -/** - * A fault implementation that hangs the TCP connection by preventing the Layer 4 - * proxy from forwarding packets in both directions - */ -class TcpConnectionUnresponsive(apiKey: String) : TransportLayerFault(apiKey) { - - override val skipSubscriberTest = true - - companion object { - val fault = object : Fault() { - override fun simulate(apiKey: String) = TcpConnectionUnresponsive(apiKey) - override val name = "TcpConnectionUnresponsive" - } - } - - override val type = FaultType.NonfatalWhenResolved( - offlineWithinMillis = 120_000, - onlineWithinMillis = 60_000 - ) - - override fun enable() { - tcpProxy.isForwarding = false - } - - override fun resolve() { - tcpProxy.isForwarding = true - } -} - -/** - * Fault implementation that causes the proxy to reject incoming connections entirely - * for two minutes, then comes back online. This should force client side - */ -class DisconnectAndSuspend(apiKey: String) : TransportLayerFault(apiKey) { - - companion object { - const val SUSPEND_DELAY_MILLIS: Long = 2 * 60 * 1000 - val fault = object : Fault() { - override fun simulate(apiKey: String) = DisconnectAndSuspend(apiKey) - override val name = "DisconnectAndSuspend" - } - } - - // May be able to be removed once the issues surrounding skipTest are resolved - override val skipSubscriberTest = true - - private val timer = Timer() - - override val type = FaultType.Nonfatal( - resolvedWithinMillis = 180_000L - ) - - override fun enable() { - tcpProxy.stop() - timer.schedule( - timerTask { - tcpProxy.start() - }, - SUSPEND_DELAY_MILLIS - ) - } - - override fun resolve() { - timer.cancel() - tcpProxy.start() - } - - override fun cleanUp() { - timer.cancel() - super.cleanUp() - } -} - -/** - * Base class for Application layer faults, which will need access to the Ably - * WebSockets protocol, and therefore a Layer 7 proxy. - */ -abstract class ApplicationLayerFault(apiKey: String) : FaultSimulation() { - val applicationProxy = Layer7Proxy(apiKey) - override val proxy = applicationProxy -} - -/** - * An empty fault implementation for the Layer 7 proxy to ensure that normal - * functionality is working with no interventions - */ -class NullApplicationLayerFault(apiKey: String) : ApplicationLayerFault(apiKey) { - - companion object { - val fault = object : Fault() { - override fun simulate(apiKey: String) = NullApplicationLayerFault(apiKey) - override val name = "NullApplicationLayerFault" - } - } - - override val type = FaultType.Nonfatal( - resolvedWithinMillis = 10_000L - ) - - override fun enable() { - } - - override fun resolve() { - } -} - -/** - * Base class for all faults that simply drop messages with a specific action - * type in a specified direction - */ -abstract class DropAction( - apiKey: String, - private val direction: FrameDirection, - private val action: Message.Action, - private val dropLimit: Int, -) : ApplicationLayerFault(apiKey) { - - private var nDropped = 0 - - companion object { - private const val tag = "DropAction" - } - - private var initialConnection = true - - override val type = FaultType.Nonfatal( - resolvedWithinMillis = 120_000L - ) - - override fun enable() { - applicationProxy.interceptor = object : Layer7Interceptor { - - override fun interceptConnection(params: ConnectionParams): ConnectionParams { - if (initialConnection) { - initialConnection = false - } else { - testLogD("$tag: second connection -- resolving fault") - resolve() - } - - return params - } - - override fun interceptFrame(direction: FrameDirection, frame: Frame) = - if (shouldFilter(direction, frame)) { - testLogD("$tag: dropping: $direction - ${logFrame(frame)}") - nDropped += 1 - listOf() - } else { - testLogD("$tag: keeping: $direction - ${logFrame(frame)}") - listOf(Action(direction, frame)) - } - } - } - - override fun resolve() { - applicationProxy.interceptor = PassThroughInterceptor() - initialConnection = true - } - - /** - * Check whether this frame and direction messages the fault specification - */ - private fun shouldFilter(direction: FrameDirection, frame: Frame) = - nDropped < dropLimit && - frame.frameType == FrameType.BINARY && - direction == this.direction && - frame.data.unpack().isAction(action) -} - -/** - * A DropAction fault implementation to drop ATTACH messages, - * simulating the Ably server failing to respond to channel attachment - */ -class AttachUnresponsive(apiKey: String) : DropAction( - apiKey = apiKey, - direction = FrameDirection.ClientToServer, - action = Message.Action.ATTACH, - dropLimit = 1, -) { - companion object { - val fault = object : Fault() { - override fun simulate(apiKey: String) = AttachUnresponsive(apiKey) - override val name = "AttachUnresponsive" - } - } -} - -/** - * A DropAction fault implementation to drop DETACH messages, - * simulating the Ably server failing to detach a client from a channel. - */ -class DetachUnresponsive(apiKey: String) : DropAction( - apiKey = apiKey, - direction = FrameDirection.ClientToServer, - action = Message.Action.DETACH, - dropLimit = 1, -) { - companion object { - val fault = object : Fault() { - override fun simulate(apiKey: String) = DetachUnresponsive(apiKey) - override val name = "DetachUnresponsive" - } - } -} - -/** - * Abstract fault implementation to trigger the proxy to go unresponsive - * (i.e. stop forwarding messages in a specific direction) once a particular - * action has been seen in the given direction. - */ -abstract class UnresponsiveAfterAction( - apiKey: String, - private val direction: FrameDirection, - private val action: Message.Action -) : ApplicationLayerFault(apiKey) { - - companion object { - private const val tag = "UnresponsiveAfterAction" - private const val restoreFunctionalityAfterConnections = 2 - } - - private var nConnections = 0 - private var isTriggered = false - - override val type = FaultType.Nonfatal( - resolvedWithinMillis = 150_000L - ) - - override fun enable() { - applicationProxy.interceptor = object : Layer7Interceptor { - - override fun interceptConnection(params: ConnectionParams): ConnectionParams { - nConnections += 1 - if (nConnections >= restoreFunctionalityAfterConnections) { - testLogD("$tag: resolved after $restoreFunctionalityAfterConnections connections") - resolve() - } - - return params - } - - override fun interceptFrame(direction: FrameDirection, frame: Frame): List { - if (shouldActivate(direction, frame)) { - testLogD("$tag/$action: - connection going unresponsive") - isTriggered = true - } - - return if (isTriggered) { - testLogD("$tag/$action: unresponsive: dropping ${logFrame(frame)}") - listOf() - } else { - listOf(Action(direction, frame)) - } - } - } - } - - override fun resolve() { - applicationProxy.interceptor = PassThroughInterceptor() - isTriggered = false - nConnections = 0 - } - - private fun shouldActivate(direction: FrameDirection, frame: Frame) = - frame.frameType == FrameType.BINARY && - direction == this.direction && - frame.data.unpack().isAction(action) -} - -/** - * A fault implementation makling the connection unresponsive - * after observing an out-going Presence message - */ -class EnterUnresponsive(apiKey: String) : UnresponsiveAfterAction( - apiKey = apiKey, - direction = FrameDirection.ClientToServer, - action = Message.Action.PRESENCE -) { - override val skipSubscriberTest = true - - companion object { - val fault = object : Fault() { - override fun simulate(apiKey: String) = EnterUnresponsive(apiKey) - override val name = "EnterUnresponsive" - } - } -} - -/** - * Fault to simulate a resume failure after a reconnection. The fault implementation - * first interrupts a connection once it sees a CONNECTED response returned from the server - * by injecting a WebSocket CLOSE frame, disconnecting the client. When the client reconnects, - * it intercepts the resume= parameter in the connection URL to substitute it for a fake - * connectionId, causing the resume attempt to fail. This should not be a fatal error, the - * Publisher should continue regardless. - */ -class DisconnectWithFailedResume(apiKey: String) : ApplicationLayerFault(apiKey) { - companion object { - val fault = object : Fault() { - override fun simulate(apiKey: String) = DisconnectWithFailedResume(apiKey) - override val name = "DisconnectWithFailedResume" - } - } - - /** - * State of the fault, used to control whether we're intercepting - * the connection or looking to inject a WebSocket CLOSE at an appropriate time - */ - private enum class State { - AwaitingInitialConnection, - AwaitingDisconnect, - Reconnected - } - private var state = State.AwaitingInitialConnection - - override val type = FaultType.Nonfatal( - resolvedWithinMillis = 30_000 - ) - - override fun enable() { - applicationProxy.interceptor = object : Layer7Interceptor { - - override fun interceptConnection(params: ConnectionParams): ConnectionParams { - return when (state) { - State.AwaitingInitialConnection -> { - state = State.AwaitingDisconnect - testLogD("${fault.name}: transitioning to $state, connection params: $params") - params - } - State.AwaitingDisconnect -> { - state = State.Reconnected - params.copy(resume = modifyResumeParam(params.resume)).also { - testLogD("${fault.name}: transitioning to $state, connection params: $it") - } - } - State.Reconnected -> params - } - } - - override fun interceptFrame(direction: FrameDirection, frame: Frame): List { - return when (state) { - State.AwaitingDisconnect -> - if (shouldDisconnect(direction, frame)) { - // Inject a CLOSE frame to kill the client connection now - listOf(Action(direction, frame), Action(direction, Frame.Close(), sendAndClose = true)) - } else { - // Pass through - listOf(Action(direction, frame)) - } - State.AwaitingInitialConnection, - State.Reconnected -> - // Always pass through in these states - listOf(Action(direction, frame)) - } - } - } - } - - override fun resolve() { - state = State.AwaitingInitialConnection - applicationProxy.interceptor = PassThroughInterceptor() - } - - /** - * Replace the connectionId component of a connectionKey with a fake - */ - private fun modifyResumeParam(resume: String?) = - resume?.replace("^(.*!).*(-.*$)".toRegex()) { match -> - "${match.groups[1]?.value}FakeFakeFakeFake${match.groups[2]?.value}" - } - - /** - * Check to see if the incoming message should trigger a disconnection - */ - private fun shouldDisconnect(direction: FrameDirection, frame: Frame) = - direction == FrameDirection.ServerToClient && - frame.frameType == FrameType.BINARY && - frame.data.unpack().isAction(Message.Action.ATTACHED) -} - -/** - * Abstract fault implementation to intercept outgoing presence messages, preventing - * them from reaching Ably, and injecting NACK responses as replies to the client. - */ -abstract class PresenceNackFault( - apiKey: String, - private val nackedPresenceAction: Message.PresenceAction, - private val response: (msgSerial: Int) -> Map, - private val nackLimit: Int = 3 -) : ApplicationLayerFault(apiKey) { - - private var nacksSent = 0 - - override fun enable() { - applicationProxy.interceptor = object : Layer7Interceptor { - - override fun interceptConnection(params: ConnectionParams) = params - - override fun interceptFrame(direction: FrameDirection, frame: Frame): List { - return if (shouldNack(direction, frame)) { - val msg = frame.data.unpack() - testLogD("PresenceNackFault: will nack ($nacksSent): $msg") - - val msgSerial = msg["msgSerial"] as Int - val nackFrame = Frame.Binary(true, response(msgSerial).pack()) - testLogD("PresenceNackFault: sending nack: ${nackFrame.data.unpack()}") - nacksSent += 1 - - listOf( - // note: not sending this presence message to server - Action(FrameDirection.ServerToClient, nackFrame) - ) - } else { - listOf(Action(direction, frame)) - } - } - } - } - - override fun resolve() { - applicationProxy.interceptor = PassThroughInterceptor() - } - - /** - * Check whether this frame (and direction) is the kind we're trying to - * simulate a server NACK response for - */ - private fun shouldNack(direction: FrameDirection, frame: Frame) = - nacksSent < nackLimit && - frame.frameType == FrameType.BINARY && - direction == FrameDirection.ClientToServer && - frame.data.unpack().let { - it.isAction(Message.Action.PRESENCE) && - it.isPresenceAction(nackedPresenceAction) - } -} - -/** - * Simulates retryable presence.enter() failure. Will stop - * nacking after 3 failures - */ -class EnterFailedWithNonfatalNack(apiKey: String) : PresenceNackFault( - apiKey = apiKey, - nackedPresenceAction = Message.PresenceAction.ENTER, - response = ::nonFatalNack, - nackLimit = 3 -) { - companion object { - val fault = object : Fault() { - override fun simulate(apiKey: String) = EnterFailedWithNonfatalNack(apiKey) - override val name = "EnterFailedWithNonfatalNack" - } - } - - // Can probably be removed once skipTest issues are resolved - override val skipSubscriberTest = true - - override val type = FaultType.Nonfatal( - resolvedWithinMillis = 60_000L - ) -} - -/** - * Simulates a retryable presence.update() failure. Will stop - * nacking after 3 failures - */ -class UpdateFailedWithNonfatalNack(apiKey: String) : PresenceNackFault( - apiKey = apiKey, - nackedPresenceAction = Message.PresenceAction.UPDATE, - response = ::nonFatalNack, - nackLimit = 3 -) { - companion object { - val fault = object : Fault() { - override fun simulate(apiKey: String) = UpdateFailedWithNonfatalNack(apiKey) - override val name = "UpdateFailedWithNonfatalNack" - } - } - - override val skipSubscriberTest = true - - override val type = FaultType.Nonfatal( - resolvedWithinMillis = 60_000L - ) -} - -/** - * Simulates a client being disconnected while present on a channel, - * reconnecting with a successful resume, but the presence re-enter - * failing. Client should handle this by re-entering presence when - * it sees that re-enter has failed. - */ -class ReenterOnResumeFailed(apiKey: String) : ApplicationLayerFault(apiKey) { - - override val skipSubscriberTest = true - - companion object { - val fault = object : Fault() { - override fun simulate(apiKey: String) = ReenterOnResumeFailed(apiKey) - override val name = "ReenterOnResumeFailed" - } - } - - private var state = State.DisconnectAfterPresence - private var presenceEnterSerial: Int? = null - - private enum class State { - // Waiting for client presence enter before forcing disconnect - DisconnectAfterPresence, - - // Wait for server SYNC message, remove client from presence member - InterceptingServerSync, - - // Wait for client to respond by re-entering, note the msgSerial - InterceptingClientEnter, - - // Wait for server to ACK the enter, swap it for a NACK - InterceptingServerAck, - - // Finished - return to normal pass-through - WorkingNormally - } - - override val type = FaultType.Nonfatal( - resolvedWithinMillis = 60_000L - ) - - override fun enable() { - applicationProxy.interceptor = object : Layer7Interceptor { - - override fun interceptConnection(params: ConnectionParams): ConnectionParams { - testLogD("${fault.name}: [$state] new connection: $params") - if (state == State.DisconnectAfterPresence) { - state = State.InterceptingServerSync - } - return params - } - - override fun interceptFrame(direction: FrameDirection, frame: Frame): List { - return when (state) { - State.DisconnectAfterPresence -> disconnectAfterPresence(direction, frame) - State.InterceptingServerSync -> interceptServerSync(direction, frame) - State.InterceptingClientEnter -> interceptClientEnter(direction, frame) - State.InterceptingServerAck -> interceptServerAck(direction, frame) - State.WorkingNormally -> listOf(Action(direction, frame)) - } - } - } - } - - override fun resolve() { - state = State.DisconnectAfterPresence - applicationProxy.interceptor = PassThroughInterceptor() - } - - /** - * Step 1: disconnect the client to cause it to attempt a resume, - * but wait until we know the client has attempted to enter presence - * beforehand. - */ - private fun disconnectAfterPresence( - direction: FrameDirection, - frame: Frame - ): List { - return if ( - direction == FrameDirection.ClientToServer && - frame.frameType == FrameType.BINARY && - frame.data.unpack().isAction(Message.Action.PRESENCE) - ) { - testLogD("${fault.name}: [$state] forcing disconnect") - // Note: state will advance in interceptConnection - listOf( - Action(direction, frame), - Action(FrameDirection.ServerToClient, Frame.Close(), true) - ) - } else { - listOf(Action(direction, frame)) - } - } - - /** - * Step 2: Now that the client has reconnected, tamper with the - * incoming server SYNC message to remove this client's presence - * data, causing it to think that re-enter on resume has failed. - */ - private fun interceptServerSync( - direction: FrameDirection, - frame: Frame - ): List { - return if ( - direction == FrameDirection.ServerToClient && - frame.frameType == FrameType.BINARY && - frame.data.unpack().isAction(Message.Action.SYNC) - ) { - testLogD("${fault.name}: [$state] intercepting sync") - state = State.InterceptingClientEnter - listOf( - Action(direction, removePresenceFromSync(frame)) - ) - } else { - listOf(Action(direction, frame)) - } - } - - /** - * Step 3: The client should respond to the failed presence re-enter - * on resume by attempting to enter presence again. Note the msgSerial - * of the outgoing presence message so that we can intercept the ACK - */ - private fun interceptClientEnter( - direction: FrameDirection, - frame: Frame - ): List { - if (direction == FrameDirection.ClientToServer && - frame.frameType == FrameType.BINARY - ) { - val msg = frame.data.unpack() - if (msg.isAction(Message.Action.PRESENCE) && - msg.isPresenceAction(Message.PresenceAction.ENTER) - ) { - presenceEnterSerial = msg["msgSerial"] as Int - testLogD("${fault.name}: [$state] presence enter serial: $presenceEnterSerial") - state = State.InterceptingServerAck - } - } - - return listOf(Action(direction, frame)) - } - - /** - * Step 4: Replace server ACK response with a NACK for the same msgSerial - */ - private fun interceptServerAck( - direction: FrameDirection, - frame: Frame - ): List { - return if ( - direction == FrameDirection.ServerToClient && - frame.frameType == FrameType.BINARY && - frame.data.unpack().let { - it.isAction(Message.Action.ACK) && - (it["msgSerial"] as Int) == presenceEnterSerial - } - ) { - val nack = Message.nack( - msgSerial = presenceEnterSerial!!, - count = 1, - errorCode = 50000, - errorStatusCode = 500, - errorMessage = "injected by proxy" - ) - testLogD("${fault.name}: [$state] sending nack: $nack") - state = State.WorkingNormally - listOf(Action(direction, Frame.Binary(true, nack.pack()))) - } else { - listOf(Action(direction, frame)) - } - } - - /** - * Remove presence data, causing client to believe re-enter - * on resume has failed - */ - private fun removePresenceFromSync(frame: Frame): Frame { - val syncMsg = frame.data.unpack().toMutableMap() - syncMsg["presence"] = listOf() - return Frame.Binary(true, syncMsg.pack()) - } -} - -/** - * A non-fatal nack response for given message serial - */ -internal fun nonFatalNack(msgSerial: Int) = - Message.nack( - msgSerial = msgSerial, - count = 1, - errorCode = 50000, - errorStatusCode = 500, - errorMessage = "injected by proxy" - ) diff --git a/external/sdk-test-proxy b/external/sdk-test-proxy new file mode 160000 index 000000000..4a1d3a7aa --- /dev/null +++ b/external/sdk-test-proxy @@ -0,0 +1 @@ +Subproject commit 4a1d3a7aa565b83f2e7b0de08110cb9771839920 diff --git a/publishing-sdk/src/androidTest/java/com/ably/tracking/publisher/NetworkConnectivityTests.kt b/publishing-sdk/src/androidTest/java/com/ably/tracking/publisher/NetworkConnectivityTests.kt index c78807a35..394483cbd 100644 --- a/publishing-sdk/src/androidTest/java/com/ably/tracking/publisher/NetworkConnectivityTests.kt +++ b/publishing-sdk/src/androidTest/java/com/ably/tracking/publisher/NetworkConnectivityTests.kt @@ -27,25 +27,14 @@ import com.ably.tracking.connection.Authentication import com.ably.tracking.connection.ConnectionConfiguration import com.ably.tracking.logging.LogHandler import com.ably.tracking.logging.LogLevel -import com.ably.tracking.test.android.common.AttachUnresponsive import com.ably.tracking.test.android.common.BooleanExpectation -import com.ably.tracking.test.android.common.DetachUnresponsive -import com.ably.tracking.test.android.common.DisconnectAndSuspend -import com.ably.tracking.test.android.common.DisconnectWithFailedResume -import com.ably.tracking.test.android.common.EnterFailedWithNonfatalNack -import com.ably.tracking.test.android.common.EnterUnresponsive import com.ably.tracking.test.android.common.Fault +import com.ably.tracking.test.android.common.FaultProxyClient import com.ably.tracking.test.android.common.FaultSimulation import com.ably.tracking.test.android.common.FaultType import com.ably.tracking.test.android.common.NOTIFICATION_CHANNEL_ID -import com.ably.tracking.test.android.common.NullApplicationLayerFault -import com.ably.tracking.test.android.common.NullTransportFault import com.ably.tracking.test.android.common.PUBLISHER_CLIENT_ID -import com.ably.tracking.test.android.common.ReenterOnResumeFailed -import com.ably.tracking.test.android.common.TcpConnectionRefused -import com.ably.tracking.test.android.common.TcpConnectionUnresponsive import com.ably.tracking.test.android.common.UnitExpectation -import com.ably.tracking.test.android.common.UpdateFailedWithNonfatalNack import com.ably.tracking.test.android.common.createNotificationChannel import com.ably.tracking.test.android.common.testLogD import com.google.gson.Gson @@ -77,6 +66,7 @@ import org.junit.Before import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized +import java.net.URL import java.util.Date import java.util.UUID import java.util.concurrent.TimeoutException @@ -90,27 +80,25 @@ class NetworkConnectivityTests(private val testFault: Fault) { private var testResources: TestResources? = null companion object { + // 10.0.2.2 is the loopback interface of the host machine the emulator is running on + // https://developer.android.com/studio/run/emulator-networking.html#networkaddresses + private val client = FaultProxyClient(baseUrl = URL("http://10.0.2.2:8080")) + @JvmStatic @Parameterized.Parameters(name = "{0}") - fun data() = listOf( - arrayOf(NullTransportFault.fault), - arrayOf(NullApplicationLayerFault.fault), - arrayOf(TcpConnectionRefused.fault), - arrayOf(TcpConnectionUnresponsive.fault), - arrayOf(AttachUnresponsive.fault), - arrayOf(DetachUnresponsive.fault), - arrayOf(DisconnectWithFailedResume.fault), - arrayOf(EnterFailedWithNonfatalNack.fault), - arrayOf(UpdateFailedWithNonfatalNack.fault), - arrayOf(DisconnectAndSuspend.fault), - arrayOf(ReenterOnResumeFailed.fault), - arrayOf(EnterUnresponsive.fault), - ) + fun data() = runBlocking { + client.getAllFaults() + } } @Before fun setUp() { - val simulation = testFault.simulate(BuildConfig.ABLY_API_KEY) + val simulation = runBlocking { + testFault.simulate(BuildConfig.ABLY_API_KEY) + } + + // TODO log the simulation here so we can see what’s going on + Assume.assumeFalse(simulation.skipPublisherTest) // We cannot use ktor on API Level 21 (Lollipop) because of: @@ -335,7 +323,6 @@ class TestResources( fun setUp(faultParam: FaultSimulation): TestResources { val context = InstrumentationRegistry.getInstrumentation().targetContext val locationHelper = LocationHelper() - faultParam.proxy.start() val publisher = createPublisher( context, faultParam.proxy.clientOptions(), @@ -456,7 +443,10 @@ class TestResources( fun tearDown() { shutdownPublisher(publisher).assertSuccess() locationHelper.close() - fault.cleanUp() + + runBlocking { + fault.cleanUp() + } } /**