Skip to content

Commit

Permalink
Merge pull request #673 from hexagonkt/develop
Browse files Browse the repository at this point in the history
Minor Netty optimizations
  • Loading branch information
jaguililla committed Nov 4, 2023
2 parents b68d208 + 72a71d0 commit bcb8edb
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 98 deletions.
13 changes: 0 additions & 13 deletions core/api/core.api
Original file line number Diff line number Diff line change
Expand Up @@ -293,19 +293,6 @@ public abstract interface class com/hexagonkt/core/logging/LoggingPort {
public abstract fun setLoggerLevel (Ljava/lang/String;Lcom/hexagonkt/core/logging/LoggingLevel;)V
}

public final class com/hexagonkt/core/logging/SystemLogger : com/hexagonkt/core/logging/LoggerPort {
public fun <init> (Ljava/lang/String;)V
public final fun component1 ()Ljava/lang/String;
public final fun copy (Ljava/lang/String;)Lcom/hexagonkt/core/logging/SystemLogger;
public static synthetic fun copy$default (Lcom/hexagonkt/core/logging/SystemLogger;Ljava/lang/String;ILjava/lang/Object;)Lcom/hexagonkt/core/logging/SystemLogger;
public fun equals (Ljava/lang/Object;)Z
public final fun getName ()Ljava/lang/String;
public fun hashCode ()I
public fun log (Lcom/hexagonkt/core/logging/LoggingLevel;Ljava/lang/Throwable;Lkotlin/jvm/functions/Function1;)V
public fun log (Lcom/hexagonkt/core/logging/LoggingLevel;Lkotlin/jvm/functions/Function0;)V
public fun toString ()Ljava/lang/String;
}

public final class com/hexagonkt/core/logging/SystemLoggingAdapter : com/hexagonkt/core/logging/LoggingPort {
public fun <init> ()V
public fun <init> (Lcom/hexagonkt/core/logging/LoggingLevel;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.hexagonkt.core.logging

import com.hexagonkt.core.logging.LoggingLevel.*

data class SystemLogger(val name: String) : LoggerPort {
internal data class SystemLogger(val name: String) : LoggerPort {

private val logger: System.Logger = System.getLogger(name)

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ org.gradle.warning.mode=all
org.gradle.console=plain

# Gradle
version=3.4.1
version=3.4.2
group=com.hexagonkt
description=The atoms of your platform

Expand Down
16 changes: 3 additions & 13 deletions http/http_server_netty/api/http_server_netty.api
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public final class com/hexagonkt/http/server/netty/NettyKt {
}

public final class com/hexagonkt/http/server/netty/NettyRequestAdapter : com/hexagonkt/http/model/HttpRequestPort {
public fun <init> (Lio/netty/handler/codec/http/HttpMethod;Lio/netty/handler/codec/http/HttpRequest;Ljava/util/List;Ljava/net/InetSocketAddress;Lio/netty/handler/codec/http/HttpHeaders;)V
public fun <init> (Lio/netty/handler/codec/http/HttpMethod;Lio/netty/handler/codec/http/HttpRequest;Ljava/util/List;Lio/netty/channel/Channel;Lio/netty/handler/codec/http/HttpHeaders;)V
public fun authorization ()Lcom/hexagonkt/http/model/Authorization;
public fun bodyString ()Ljava/lang/String;
public fun certificate ()Ljava/security/cert/X509Certificate;
Expand Down Expand Up @@ -46,8 +46,8 @@ public final class com/hexagonkt/http/server/netty/NettyRequestAdapter : com/hex

public class com/hexagonkt/http/server/netty/NettyServerAdapter : com/hexagonkt/http/server/HttpServerPort {
public fun <init> ()V
public fun <init> (IIIIZZJJ)V
public synthetic fun <init> (IIIIZZJJILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (IIIIZZJJZZZZ)V
public synthetic fun <init> (IIIIZZJJZZZZILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun groupSupplier (I)Lio/netty/channel/MultithreadEventLoopGroup;
public fun options ()Ljava/util/Map;
public fun runtimePort ()I
Expand All @@ -59,13 +59,3 @@ public class com/hexagonkt/http/server/netty/NettyServerAdapter : com/hexagonkt/
public fun supportedProtocols ()Ljava/util/Set;
}

public final class com/hexagonkt/http/server/netty/NettyServerAdapter$HttpChannelInitializer : io/netty/channel/ChannelInitializer {
public fun <init> (Ljava/util/Map;Lio/netty/util/concurrent/EventExecutorGroup;Lcom/hexagonkt/http/server/HttpServerSettings;)V
public synthetic fun initChannel (Lio/netty/channel/Channel;)V
}

public final class com/hexagonkt/http/server/netty/NettyServerAdapter$HttpsChannelInitializer : io/netty/channel/ChannelInitializer {
public fun <init> (Ljava/util/Map;Lio/netty/handler/ssl/SslContext;Lcom/hexagonkt/http/SslSettings;Lio/netty/util/concurrent/EventExecutorGroup;Lcom/hexagonkt/http/server/HttpServerSettings;)V
public synthetic fun initChannel (Lio/netty/channel/Channel;)V
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.hexagonkt.http.server.netty

import com.hexagonkt.http.handlers.HttpHandler
import com.hexagonkt.http.server.HttpServerSettings
import io.netty.channel.ChannelInitializer
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.http.*
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.EventExecutorGroup

internal class HttpChannelInitializer(
private val handlers: Map<HttpMethod, HttpHandler>,
private val executorGroup: EventExecutorGroup?,
private val settings: HttpServerSettings,
private val keepAliveHandler: Boolean = true,
private val httpAggregatorHandler: Boolean = true,
private val chunkedHandler: Boolean = true,
private val enableWebsockets: Boolean = true,
) : ChannelInitializer<SocketChannel>() {

override fun initChannel(channel: SocketChannel) {
val pipeline = channel.pipeline()

pipeline.addLast(HttpServerCodec())

if (keepAliveHandler)
pipeline.addLast(HttpServerKeepAliveHandler())
if (httpAggregatorHandler)
pipeline.addLast(HttpObjectAggregator(Int.MAX_VALUE))
if (chunkedHandler)
pipeline.addLast(ChunkedWriteHandler())
if (settings.zip)
pipeline.addLast(HttpContentCompressor())

val nettyServerHandler = NettyServerHandler(handlers, null, enableWebsockets)

if (executorGroup == null)
pipeline.addLast(nettyServerHandler)
else
pipeline.addLast(executorGroup, nettyServerHandler)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.hexagonkt.http.server.netty

import com.hexagonkt.http.SslSettings
import com.hexagonkt.http.handlers.HttpHandler
import com.hexagonkt.http.server.HttpServerSettings
import io.netty.channel.ChannelInitializer
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.http.*
import io.netty.handler.ssl.SslContext
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.EventExecutorGroup

internal class HttpsChannelInitializer(
private val handlers: Map<HttpMethod, HttpHandler>,
private val sslContext: SslContext,
private val sslSettings: SslSettings,
private val executorGroup: EventExecutorGroup?,
private val settings: HttpServerSettings,
private val keepAliveHandler: Boolean = true,
private val httpAggregatorHandler: Boolean = true,
private val chunkedHandler: Boolean = true,
private val enableWebsockets: Boolean = true,
) : ChannelInitializer<SocketChannel>() {

override fun initChannel(channel: SocketChannel) {
val pipeline = channel.pipeline()
val sslHandler = sslContext.newHandler(channel.alloc())
val handlerSsl = if (sslSettings.clientAuth) sslHandler else null

pipeline.addLast(sslHandler)
pipeline.addLast(HttpServerCodec())

if (keepAliveHandler)
pipeline.addLast(HttpServerKeepAliveHandler())
if (httpAggregatorHandler)
pipeline.addLast(HttpObjectAggregator(Int.MAX_VALUE))
if (chunkedHandler)
pipeline.addLast(ChunkedWriteHandler())
if (settings.zip)
pipeline.addLast(HttpContentCompressor())

val serverHandler = NettyServerHandler(handlers, handlerSsl, enableWebsockets)

if (executorGroup == null)
pipeline.addLast(serverHandler)
else
pipeline.addLast(executorGroup, serverHandler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.hexagonkt.http.parseContentType
import io.netty.buffer.ByteBufHolder
import io.netty.buffer.ByteBufUtil
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.handler.codec.http.HttpHeaderNames.*
import io.netty.handler.codec.http.HttpHeaders
import io.netty.handler.codec.http.QueryStringDecoder
Expand All @@ -26,10 +27,12 @@ class NettyRequestAdapter(
methodName: NettyHttpMethod,
req: HttpRequest,
override val certificateChain: List<X509Certificate>,
address: InetSocketAddress,
channel: Channel,
nettyHeaders: HttpHeaders,
) : HttpRequestPort {

private val address: InetSocketAddress by lazy { channel.remoteAddress() as InetSocketAddress }

override val accept: List<ContentType> by lazy {
nettyHeaders.getAll(ACCEPT).flatMap { it.split(",") }.map { parseContentType(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,17 @@ import com.hexagonkt.http.handlers.HttpHandler
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.*
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.http.*
import io.netty.handler.ssl.ClientAuth.OPTIONAL
import io.netty.handler.ssl.ClientAuth.REQUIRE
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit.SECONDS
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.TrustManagerFactory
import kotlin.Int.Companion.MAX_VALUE

/**
* Implements [HttpServerPort] using Netty [Channel].
Expand All @@ -43,6 +39,10 @@ open class NettyServerAdapter(
private val soKeepAlive: Boolean = true,
private val shutdownQuietSeconds: Long = 0,
private val shutdownTimeoutSeconds: Long = 0,
private val keepAliveHandler: Boolean = true,
private val httpAggregatorHandler: Boolean = true,
private val chunkedHandler: Boolean = true,
private val enableWebsockets: Boolean = true,
) : HttpServerPort {

private var nettyChannel: Channel? = null
Expand Down Expand Up @@ -124,7 +124,15 @@ open class NettyServerAdapter(
) =
when {
sslSettings != null -> sslInitializer(sslSettings, handlers, group, settings)
else -> HttpChannelInitializer(handlers, group, settings)
else -> HttpChannelInitializer(
handlers,
group,
settings,
keepAliveHandler,
httpAggregatorHandler,
chunkedHandler,
enableWebsockets,
)
}

private fun sslInitializer(
Expand All @@ -133,7 +141,17 @@ open class NettyServerAdapter(
group: DefaultEventExecutorGroup?,
settings: HttpServerSettings
): HttpsChannelInitializer =
HttpsChannelInitializer(handlers, sslContext(sslSettings), sslSettings, group, settings)
HttpsChannelInitializer(
handlers,
sslContext(sslSettings),
sslSettings,
group,
settings,
keepAliveHandler,
httpAggregatorHandler,
chunkedHandler,
enableWebsockets,
)

private fun sslContext(sslSettings: SslSettings): SslContext {
val keyManager = createKeyManagerFactory(sslSettings)
Expand Down Expand Up @@ -195,58 +213,9 @@ open class NettyServerAdapter(
NettyServerAdapter::soKeepAlive to soKeepAlive,
NettyServerAdapter::shutdownQuietSeconds to shutdownQuietSeconds,
NettyServerAdapter::shutdownTimeoutSeconds to shutdownTimeoutSeconds,
NettyServerAdapter::keepAliveHandler to keepAliveHandler,
NettyServerAdapter::httpAggregatorHandler to httpAggregatorHandler,
NettyServerAdapter::chunkedHandler to chunkedHandler,
NettyServerAdapter::enableWebsockets to enableWebsockets,
)

class HttpChannelInitializer(
private val handlers: Map<HttpMethod, HttpHandler>,
private val executorGroup: EventExecutorGroup?,
private val settings: HttpServerSettings,
) : ChannelInitializer<SocketChannel>() {

override fun initChannel(channel: SocketChannel) {
val pipeline = channel.pipeline()

pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpServerKeepAliveHandler())
pipeline.addLast(HttpObjectAggregator(MAX_VALUE))
pipeline.addLast(ChunkedWriteHandler())

if (settings.zip)
pipeline.addLast(HttpContentCompressor())

if (executorGroup == null)
pipeline.addLast(NettyServerHandler(handlers, null))
else
pipeline.addLast(executorGroup, NettyServerHandler(handlers, null))
}
}

class HttpsChannelInitializer(
private val handlers: Map<HttpMethod, HttpHandler>,
private val sslContext: SslContext,
private val sslSettings: SslSettings,
private val executorGroup: EventExecutorGroup?,
private val settings: HttpServerSettings,
) : ChannelInitializer<SocketChannel>() {

override fun initChannel(channel: SocketChannel) {
val pipeline = channel.pipeline()
val sslHandler = sslContext.newHandler(channel.alloc())
val handlerSsl = if (sslSettings.clientAuth) sslHandler else null

pipeline.addLast(sslHandler)
pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpServerKeepAliveHandler())
pipeline.addLast(HttpObjectAggregator(MAX_VALUE))
pipeline.addLast(ChunkedWriteHandler())

if (settings.zip)
pipeline.addLast(HttpContentCompressor())

if (executorGroup == null)
pipeline.addLast(NettyServerHandler(handlers, handlerSsl))
else
pipeline.addLast(executorGroup, NettyServerHandler(handlers, handlerSsl))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT as STRICT_E
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory
import io.netty.handler.ssl.SslHandler
import io.netty.handler.ssl.SslHandshakeCompletionEvent
import java.net.InetSocketAddress
import java.security.cert.X509Certificate
import java.util.concurrent.Flow.*
import com.hexagonkt.http.model.HttpRequest as HexagonHttpRequest

internal class NettyServerHandler(
private val handlers: Map<HttpMethod, HttpHandler>,
private val sslHandler: SslHandler?,
private val enableWebsockets: Boolean = true,
) : ChannelInboundHandlerAdapter() {

private var certificates: List<X509Certificate> = emptyList()
Expand All @@ -53,12 +53,11 @@ internal class NettyServerHandler(
throw IllegalStateException(result.cause())

val channel = context.channel()
val address = channel.remoteAddress() as InetSocketAddress
val method = nettyRequest.method()
val pathHandler = handlers[method]

val headers = nettyRequest.headers()
val request = NettyRequestAdapter(method, nettyRequest, certificates, address, headers)
val request = NettyRequestAdapter(method, nettyRequest, certificates, channel, headers)

if (pathHandler == null) {
writeResponse(context, request, HttpResponse(), HttpUtil.isKeepAlive(nettyRequest))
Expand All @@ -68,15 +67,12 @@ internal class NettyServerHandler(
val resultContext = pathHandler.process(request)
val response = resultContext.event.response

val body = response.body
val connection = headers[CONNECTION]?.lowercase()
val upgrade = headers[UPGRADE]?.lowercase()
val isWebSocket =
if (enableWebsockets) isWebsocket(headers, method, response.status)
else false

val body = response.body
val isSse = body is Publisher<*>
val isWebSocket = connection == "upgrade"
&& upgrade == "websocket"
&& method == GET
&& response.status == ACCEPTED_202

when {
isSse -> handleSse(context, request, response, body)
Expand All @@ -85,6 +81,15 @@ internal class NettyServerHandler(
}
}

private fun isWebsocket(headers: HttpHeaders, method: HttpMethod, status: HttpStatus): Boolean {
val connection = headers[CONNECTION]?.lowercase()
val upgrade = headers[UPGRADE]?.lowercase()
return connection == "upgrade"
&& upgrade == "websocket"
&& method == GET
&& status == ACCEPTED_202
}

@Suppress("UNCHECKED_CAST") // Body not cast to Publisher<HttpServerEvent> due to type erasure
private fun handleSse(
context: ChannelHandlerContext,
Expand Down

0 comments on commit bcb8edb

Please sign in to comment.