From 6a158adf344cb525199bee58a5987031e36f7f4f Mon Sep 17 00:00:00 2001 From: Pil0tXia Date: Sun, 2 Jun 2024 20:30:50 +0800 Subject: [PATCH] [ISSUE #4847] HTTPS/(m)TLSv1.3 support for HTTP Admin Server (#4848) * Move admin port to http configs * Refactor AdminServer to own independent configuration * Support TLS/SSL for AdminServer * Add default configs * Rename jks file to a shorter name * Add one line of comment * Fetch current version --- .../apache/eventmesh/common/Constants.java | 2 + .../utils/ConfigurationContextUtil.java | 3 +- .../config/samples/eventmesh_v1_runtime.yaml | 1 - eventmesh-runtime/conf/admin-server.jks | Bin 0 -> 2808 bytes eventmesh-runtime/conf/eventmesh.properties | 24 ++++- .../handler/v2/ConfigurationHandler.java | 3 +- .../runtime/boot/AbstractHTTPServer.java | 41 +++----- .../runtime/boot/AbstractRemotingServer.java | 51 +++------- .../runtime/boot/EventMeshAdminBootstrap.java | 22 ++++- .../runtime/boot/EventMeshAdminServer.java | 88 +++++++++++++----- .../runtime/boot/EventMeshGrpcBootstrap.java | 11 +-- .../runtime/boot/EventMeshHTTPServer.java | 66 +------------ .../runtime/boot/EventMeshHttpBootstrap.java | 15 +-- .../runtime/boot/EventMeshServer.java | 81 +++++----------- .../runtime/boot/EventMeshTcpBootstrap.java | 12 +-- .../runtime/boot/HTTPThreadPoolGroup.java | 51 ++-------- .../runtime/boot/SSLContextFactory.java | 25 +++-- .../EventMeshAdminConfiguration.java | 61 ++++++++++++ .../EventMeshGrpcConfiguration.java | 3 - .../EventMeshHTTPConfiguration.java | 4 - .../EventMeshTCPConfiguration.java | 3 - .../http/processor/AdminMetricsProcessor.java | 6 +- .../runtime/boot/EventMeshServerTest.java | 1 - .../EventMeshGrpcConfigurationTest.java | 1 - .../EventMeshHTTPConfigurationTest.java | 1 - .../EventMeshTCPConfigurationTest.java | 1 - .../test/resources/configuration.properties | 2 - 27 files changed, 259 insertions(+), 320 deletions(-) create mode 100644 eventmesh-runtime/conf/admin-server.jks create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshAdminConfiguration.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java index 97286f0968..2460129e75 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java @@ -200,6 +200,8 @@ public class Constants { public static final String GRPC = "GRPC"; + public static final String ADMIN = "ADMIN"; + public static final String OS_NAME_KEY = "os.name"; public static final String OS_WIN_PREFIX = "win"; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ConfigurationContextUtil.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ConfigurationContextUtil.java index 8617888994..fede64d650 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ConfigurationContextUtil.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ConfigurationContextUtil.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.common.utils; +import static org.apache.eventmesh.common.Constants.ADMIN; import static org.apache.eventmesh.common.Constants.GRPC; import static org.apache.eventmesh.common.Constants.HTTP; import static org.apache.eventmesh.common.Constants.TCP; @@ -36,7 +37,7 @@ public class ConfigurationContextUtil { private static final ConcurrentHashMap CONFIGURATION_MAP = new ConcurrentHashMap<>(); - public static final List KEYS = Lists.newArrayList(HTTP, TCP, GRPC); + public static final List KEYS = Lists.newArrayList(HTTP, TCP, GRPC, ADMIN); /** * Save http, tcp, grpc configuration at startup for global use. diff --git a/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml b/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml index fc640aa639..590928ca97 100644 --- a/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml +++ b/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml @@ -41,7 +41,6 @@ data: # HTTP Admin Server eventMesh.server.admin.http.port=10106 ########################## eventMesh tcp configuration ############################ - eventMesh.server.tcp.enabled=true eventMesh.server.tcp.readerIdleSeconds=120 eventMesh.server.tcp.writerIdleSeconds=120 eventMesh.server.tcp.allIdleSeconds=120 diff --git a/eventmesh-runtime/conf/admin-server.jks b/eventmesh-runtime/conf/admin-server.jks new file mode 100644 index 0000000000000000000000000000000000000000..92deb897a48d9bb9463065fa6e35ae67172b1e6e GIT binary patch literal 2808 zcma);X*d)L7sqF127{Q<*hMjwWyU`Cr7{{qqegbR_QrZIkt`Dibu}pC+PNb_Dk8F0 zcBL`y*tc+_$d;R(*SkF3=l%4)AO6pI&hMQ6`S^bhmddjM0)nwro;)u2nFK<@CIZL} zETHn-hfsO$9$^}m3hn!^5wrzDg*F`Fmq)7R;{A_{j|T`UphAs~pdQxqhXabhUc+kt z-ST12LXexSn0MH_V+8ay-?lSt5NXdwN-qecAP54=VEMRU|Go%33pJF^Th?TgQA2o=IgH*1$%Eu<=%ZG3lG+}iIs`-MKmzUvtypczwf z3pVJxd%1yPFmi0g;Z{_y#%LuBz4pO(dxm|3NCr0lMQU4FrLl67*Sa)*%NjMtPq{|h z?dNNYc#o*=q<*lUuJhbvI2_#Qt&$0u)u(+~et{Bw&*X8Lm^W3YEHBSU-0duGTI$Ub zH7>975?mC=6{;mT_`=EkMFgDrvO&AUx$SE7WI^dj(T*wqmyJPnB;9bw*(E5|J5! zqU!WNX@{cXCnPvJ5k_y6Vxr_O;@&3K>rF2B)@k&r&>x_lX*|{gbGW$t6LEJGu;8m( zqrtTsXdBx~?P&8u`%8!?=P`{Y(Xe(DeM=FgJZyn8OS_V6LVUTRs3Ap<{M1l!U1;)A zY5s#$m+=X^V`TM62tq<@2wM24ti|v2O`1;V{CJoTq}y?wxRFNvNpzISsOkTWgLxB8<+!rQTu7}i0ySSgAS8|aLvUoZ<1mKqZ{%SyYI@@>Hf@tqyAe?P7f z*dV@`x%+27f-Uzw3!POM5A6IJG^{BQtcQ6XHn{3c)fX64e*ausPsmk^|gns#w1z z)w_w8`uNpYq#nU_Cm6(BtYgiP9y`Sv>(wYvM%M@gGQ49wZTK+F?5+Yg<5)}8Vrs8& zK63r0m$J?n=XzCclfYZ+$t*y>cEUpw2Cg)r;V^7vx4iDe9J){1wI))>LeqD6e1_QO zZ(-d79vr#>Y)d3S8ZWU9gjgVBij;!7XM9_+HPb2ZR7#3R#fI4X?4C&2#|<5`tk zrG^(e{5N~XiGPr2CxyMZzW+Zlp3E!~TCuz&n}q{|enp>5dGa*;cnlj7^UzR`Sf|I}FtyqTOk?j&P29Xr@x4_Hvcj~- zNCt1#_LswiWW1A{hDRja-)NMlTlg3ZR~C{gh(%7WBFm|TL-3(f{M%1ucsqSjDPOMV zuI81%|B60T`Ba}>!{r`P9cwZAa4+wGFY%29x++tX{m|PMZ*@?qFfPo|pwR5i%rdU& zviT$SlU9uls+2pw$C(=|WJoe|+LKKnvp+E~F&x@HnNp-|e`U~iSuoKM-3mQ8uJm}q z$#3lv+9XBT zk=*fYW_k{U`a7Qfr8E!}Z00uB#7X8K;shAwtpB3ey+0V89w1ttV}~3{tDpXWJ@hlJ z)*RhDv*vKmoHlyu)-t}NvO>yYWH(c@PWmEVizyW9c;5JmZ$&I@=P+~SaI9uiO@doG zo_&{jQg$mU>Qr!er>@dXO)o9(*!%KE_JE~tF0A;S(tva;-r6VTMk>gxUG9weaB9gO zbcL)v8!Iat$O z_7p+Hc*tleNa^6sfB}1g;M&=__-;7C*9k`I@D7|B$l=!ld-;ey2bq(?3Loc!gxREo|@op+$_x zqiM$GQ6ZZ7ncBNE2jUk{#E4X7Y?|BD#Tn1Z&N^nG`-Zby48yJ7CE=#kBf#*9lke@_ zGkJoNszNuWkV{eP8!zPRY-!f!Kack{`St5l9rQ^&h1S2*i=Jq8k9gY^f_xVdd#92f zYbW;Q<#!mRI#+;g`&VdN7!IQ_K;ht!l&SM{aqmx^9Kn$Lsc@OoWEJjw``P8b;(KL6 zZSICyx>K+8`5(6bB7WgZRPl1HxMfqn`_$f(HA|P$ub+`aty=oi3zH83NUwl%5YS%M zu>t&oin?8MD}4C0X23L-7zp$xeqqAQ+Qsg+yfTV0LC`0bl3&u43CG{{Z9%HFTpKOMg+ZAcB2 zi4HPNR~N6n_>!ySpafa3`Tn;~syfYH&D2~Vu0Yj zfJV5|fViy{bJ@(W&&xt~8soRFvk(9aa~&eK-_4FZyPmuKNl=!q9~p)l>g;x%@-M^e zR}#0zv^0nZ(_o+oNqW4{)VO!v{>U(CcMgZi`KoF>;I(v)z{q-awEX bE! httpHandlerOpt = adminHandlerManager.getHttpHandler(uri.getPath()); - if (httpHandlerOpt.isPresent()) { - try { - httpHandlerOpt.get().handle(httpRequest, ctx); - } catch (Exception e) { - log.error("admin server channelRead error", e); - ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()), ctx, - HttpHeaderValues.APPLICATION_JSON, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE); - } - } else { - ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE); - } + private void initThreadPool() { + adminMetricsExecutor = ThreadPoolFactory.createThreadPoolExecutor( + eventMeshAdminConfiguration.getEventMeshServerAdminThreadNum(), + eventMeshAdminConfiguration.getEventMeshServerAdminThreadNum(), + new LinkedBlockingQueue<>(50), "eventMesh-admin-metrics", true); + } + + private void registerAdminRequestProcessor() { + final AdminMetricsProcessor adminMetricsProcessor = new AdminMetricsProcessor(this); + registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor); } private class AdminServerInitializer extends ChannelInitializer { + private final transient SSLContext sslContext; + private final transient boolean useTLS; + + public AdminServerInitializer(final SSLContext sslContext, final boolean useTLS) { + this.sslContext = sslContext; + this.useTLS = useTLS; + } + @Override protected void initChannel(final SocketChannel channel) { final ChannelPipeline pipeline = channel.pipeline(); + if (sslContext != null && useTLS) { + final SSLEngine sslEngine = sslContext.createSSLEngine(); + sslEngine.setUseClientMode(false); + pipeline.addFirst(getWorkerGroup(), "ssl", new SslHandler(sslEngine)); + } + pipeline.addLast(getWorkerGroup(), new HttpRequestDecoder(), new HttpResponseEncoder(), @@ -131,5 +160,22 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) { } }); } + + private void parseHttpRequest(ChannelHandlerContext ctx, HttpRequest httpRequest) { + String uriStr = httpRequest.uri(); + URI uri = URI.create(uriStr); + Optional httpHandlerOpt = adminHandlerManager.getHttpHandler(uri.getPath()); + if (httpHandlerOpt.isPresent()) { + try { + httpHandlerOpt.get().handle(httpRequest, ctx); + } catch (Exception e) { + log.error("admin server channelRead error", e); + ctx.writeAndFlush(HttpResponseUtils.buildHttpResponse(Objects.requireNonNull(e.getMessage()), ctx, + HttpHeaderValues.APPLICATION_JSON, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE); + } + } else { + ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE); + } + } } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcBootstrap.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcBootstrap.java index dc218084b1..b6e493c1bc 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcBootstrap.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcBootstrap.java @@ -23,10 +23,13 @@ import org.apache.eventmesh.common.utils.ConfigurationContextUtil; import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration; +import lombok.Getter; + public class EventMeshGrpcBootstrap implements EventMeshBootstrap { private final EventMeshGrpcConfiguration eventMeshGrpcConfiguration; + @Getter private EventMeshGrpcServer eventMeshGrpcServer; private final EventMeshServer eventMeshServer; @@ -62,12 +65,4 @@ public void shutdown() throws Exception { eventMeshGrpcServer.shutdown(); } } - - public EventMeshGrpcServer getEventMeshGrpcServer() { - return eventMeshGrpcServer; - } - - public void setEventMeshGrpcServer(EventMeshGrpcServer eventMeshGrpcServer) { - this.eventMeshGrpcServer = eventMeshGrpcServer; - } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index cc47cc9727..1089a1cafb 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -31,7 +31,6 @@ import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager; -import org.apache.eventmesh.runtime.core.protocol.http.processor.AdminMetricsProcessor; import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageProcessor; import org.apache.eventmesh.runtime.core.protocol.http.processor.BatchSendMessageV2Processor; import org.apache.eventmesh.runtime.core.protocol.http.processor.CreateTopicProcessor; @@ -68,6 +67,7 @@ import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.RateLimiter; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -75,6 +75,7 @@ * Add multiple managers to the underlying server */ @Slf4j +@Getter public class EventMeshHTTPServer extends AbstractHTTPServer { private final EventMeshServer eventMeshServer; @@ -95,7 +96,6 @@ public class EventMeshHTTPServer extends AbstractHTTPServer { private transient RateLimiter batchRateLimiter; public EventMeshHTTPServer(final EventMeshServer eventMeshServer, final EventMeshHTTPConfiguration eventMeshHttpConfiguration) { - super(eventMeshHttpConfiguration.getHttpServerPort(), eventMeshHttpConfiguration.isEventMeshServerUseTls(), eventMeshHttpConfiguration); @@ -103,7 +103,6 @@ public EventMeshHTTPServer(final EventMeshServer eventMeshServer, final EventMes this.eventMeshHttpConfiguration = eventMeshHttpConfiguration; this.metaStorage = eventMeshServer.getMetaStorage(); this.acl = eventMeshServer.getAcl(); - } public void init() throws Exception { @@ -251,9 +250,6 @@ private void registerHTTPRequestProcessor() throws Exception { final SendAsyncRemoteEventProcessor sendAsyncRemoteEventProcessor = new SendAsyncRemoteEventProcessor(this); this.getHandlerService().register(sendAsyncRemoteEventProcessor); - final AdminMetricsProcessor adminMetricsProcessor = new AdminMetricsProcessor(this); - registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor); - final HeartBeatProcessor heartProcessor = new HeartBeatProcessor(this); registerProcessor(RequestCode.HEARTBEAT.getRequestCode(), heartProcessor); @@ -299,62 +295,4 @@ private void registerWebhook() throws Exception { this.getHandlerService().register(webHookProcessor, super.getHttpThreadPoolGroup().getWebhookExecutor()); } - - public SubscriptionManager getSubscriptionManager() { - return subscriptionManager; - } - - public ConsumerManager getConsumerManager() { - return consumerManager; - } - - public ProducerManager getProducerManager() { - return producerManager; - } - - public EventMeshHTTPConfiguration getEventMeshHttpConfiguration() { - return eventMeshHttpConfiguration; - } - - public EventBus getEventBus() { - return eventBus; - } - - public HttpRetryer getHttpRetryer() { - return httpRetryer; - } - - public Acl getAcl() { - return acl; - } - - public EventMeshServer getEventMeshServer() { - return eventMeshServer; - } - - public RateLimiter getMsgRateLimiter() { - return msgRateLimiter; - } - - public RateLimiter getBatchRateLimiter() { - return batchRateLimiter; - } - - public FilterEngine getFilterEngine() { - return filterEngine; - } - - public TransformerEngine getTransformerEngine() { - return transformerEngine; - } - - public MetaStorage getMetaStorage() { - return metaStorage; - } - - public HTTPClientPool getHttpClientPool() { - return httpClientPool; - } - - } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java index 87ce3c23a4..df3b227e42 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java @@ -23,12 +23,15 @@ import org.apache.eventmesh.common.utils.ConfigurationContextUtil; import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; -public class EventMeshHttpBootstrap implements EventMeshBootstrap { +import lombok.Getter; - private final EventMeshHTTPConfiguration eventMeshHttpConfiguration; +public class EventMeshHttpBootstrap implements EventMeshBootstrap { + @Getter public EventMeshHTTPServer eventMeshHttpServer; + private final EventMeshHTTPConfiguration eventMeshHttpConfiguration; + private final EventMeshServer eventMeshServer; public EventMeshHttpBootstrap(final EventMeshServer eventMeshServer) { @@ -64,12 +67,4 @@ public void shutdown() throws Exception { eventMeshHttpServer.shutdown(); } } - - public EventMeshHTTPServer getEventMeshHttpServer() { - return eventMeshHttpServer; - } - - public void setEventMeshHttpServer(EventMeshHTTPServer eventMeshHttpServer) { - this.eventMeshHttpServer = eventMeshHttpServer; - } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java index b3250795d6..d61580b9c8 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java @@ -43,23 +43,32 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; @Slf4j public class EventMeshServer { + @Getter private final Acl acl; + @Getter + @Setter private MetaStorage metaStorage; + @Getter private static Trace trace; private final StorageResource storageResource; + @Getter private ServiceState serviceState; + @Getter private ProducerTopicManager producerTopicManager; + @Getter private final CommonConfiguration configuration; // private transient ClientManageController clientManageController; @@ -70,13 +79,17 @@ public class EventMeshServer { private static final ConfigService configService = ConfigService.getInstance(); - private EventMeshAdminBootstrap adminBootstrap; - + @Getter private EventMeshTCPServer eventMeshTCPServer = null; + @Getter + private EventMeshHTTPServer eventMeshHTTPServer = null; + + @Getter private EventMeshGrpcServer eventMeshGrpcServer = null; - private EventMeshHTTPServer eventMeshHTTPServer = null; + @Getter + private EventMeshAdminServer eventMeshAdminServer = null; private EventMeshMetricsManager eventMeshMetricsManager; @@ -105,8 +118,7 @@ public EventMeshServer() { case GRPC: BOOTSTRAP_LIST.add(new EventMeshGrpcBootstrap(this)); break; - default: - // nothing to do + default: // nothing to do } } @@ -115,6 +127,9 @@ public EventMeshServer() { BOOTSTRAP_LIST.add(new EventMeshTcpBootstrap(this)); } + // HTTP Admin Server always enabled + BOOTSTRAP_LIST.add(new EventMeshAdminBootstrap(this)); + List metricsPluginTypes = configuration.getEventMeshMetricsPluginType(); if (CollectionUtils.isNotEmpty(metricsPluginTypes)) { List metricsRegistries = metricsPluginTypes.stream().map(metric -> MetricsPluginFactory.getMetricsRegistry(metric)) @@ -147,6 +162,9 @@ public void init() throws Exception { if (eventMeshBootstrap instanceof EventMeshGrpcBootstrap) { eventMeshGrpcServer = ((EventMeshGrpcBootstrap) eventMeshBootstrap).getEventMeshGrpcServer(); } + if (eventMeshBootstrap instanceof EventMeshAdminBootstrap) { + eventMeshAdminServer = ((EventMeshAdminBootstrap) eventMeshBootstrap).getEventMeshAdminServer(); + } } if (Objects.nonNull(eventMeshTCPServer)) { @@ -168,12 +186,6 @@ public void init() throws Exception { eventMeshMetricsManager.init(); } - - if (Objects.nonNull(eventMeshTCPServer) && Objects.nonNull(eventMeshHTTPServer) && Objects.nonNull(eventMeshGrpcServer)) { - adminBootstrap = new EventMeshAdminBootstrap(this); - adminBootstrap.init(); - } - producerTopicManager = new ProducerTopicManager(this); producerTopicManager.init(); @@ -203,13 +215,10 @@ public void start() throws Exception { eventMeshBootstrap.start(); } - if (Objects.nonNull(adminBootstrap)) { - adminBootstrap.start(); - } producerTopicManager.start(); + serviceState = ServiceState.RUNNING; log.info(SERVER_STATE_MSG, serviceState); - } public void shutdown() throws Exception { @@ -235,48 +244,8 @@ public void shutdown() throws Exception { } producerTopicManager.shutdown(); ConfigurationContextUtil.clear(); - serviceState = ServiceState.STOPPED; + serviceState = ServiceState.STOPPED; log.info(SERVER_STATE_MSG, serviceState); } - - public static Trace getTrace() { - return trace; - } - - public ServiceState getServiceState() { - return serviceState; - } - - public MetaStorage getMetaStorage() { - return metaStorage; - } - - public void setMetaStorage(final MetaStorage metaStorage) { - this.metaStorage = metaStorage; - } - - public Acl getAcl() { - return acl; - } - - public ProducerTopicManager getProducerTopicManager() { - return producerTopicManager; - } - - public CommonConfiguration getConfiguration() { - return configuration; - } - - public EventMeshTCPServer getEventMeshTCPServer() { - return eventMeshTCPServer; - } - - public EventMeshGrpcServer getEventMeshGrpcServer() { - return eventMeshGrpcServer; - } - - public EventMeshHTTPServer getEventMeshHTTPServer() { - return eventMeshHTTPServer; - } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTcpBootstrap.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTcpBootstrap.java index 9cd665d28a..e098b203c4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTcpBootstrap.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTcpBootstrap.java @@ -23,8 +23,11 @@ import org.apache.eventmesh.common.utils.ConfigurationContextUtil; import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration; +import lombok.Getter; + public class EventMeshTcpBootstrap implements EventMeshBootstrap { + @Getter private EventMeshTCPServer eventMeshTcpServer; private final EventMeshTCPConfiguration eventMeshTcpConfiguration; @@ -63,13 +66,4 @@ public void shutdown() throws Exception { eventMeshTcpServer.shutdown(); } } - - public EventMeshTCPServer getEventMeshTcpServer() { - return eventMeshTcpServer; - } - - public void setEventMeshTcpServer(EventMeshTCPServer eventMeshTcpServer) { - this.eventMeshTcpServer = eventMeshTcpServer; - } - } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/HTTPThreadPoolGroup.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/HTTPThreadPoolGroup.java index bf6c740b56..aa11788501 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/HTTPThreadPoolGroup.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/HTTPThreadPoolGroup.java @@ -23,17 +23,25 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; +import lombok.Getter; + public class HTTPThreadPoolGroup implements ThreadPoolGroup { private final EventMeshHTTPConfiguration eventMeshHttpConfiguration; + @Getter private ThreadPoolExecutor batchMsgExecutor; + @Getter private ThreadPoolExecutor sendMsgExecutor; + @Getter private ThreadPoolExecutor remoteMsgExecutor; + @Getter private ThreadPoolExecutor replyMsgExecutor; + @Getter private ThreadPoolExecutor pushMsgExecutor; + @Getter private ThreadPoolExecutor clientManageExecutor; - private ThreadPoolExecutor runtimeAdminExecutor; + @Getter private ThreadPoolExecutor webhookExecutor; public HTTPThreadPoolGroup(EventMeshHTTPConfiguration eventMeshHttpConfiguration) { @@ -73,12 +81,6 @@ public void initThreadPool() { new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerClientManageBlockQSize()), "eventMesh-clientManage", true); - // The runtimeAdminExecutor here is for the runtime.admin package. - runtimeAdminExecutor = ThreadPoolFactory.createThreadPoolExecutor( - eventMeshHttpConfiguration.getEventMeshServerAdminThreadNum(), - eventMeshHttpConfiguration.getEventMeshServerAdminThreadNum(), - new LinkedBlockingQueue<>(50), "eventMesh-runtime-admin", true); - replyMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor( eventMeshHttpConfiguration.getEventMeshServerReplyMsgThreadNum(), eventMeshHttpConfiguration.getEventMeshServerReplyMsgThreadNum(), @@ -95,9 +97,6 @@ public void shutdownThreadPool() { if (batchMsgExecutor != null) { batchMsgExecutor.shutdown(); } - if (runtimeAdminExecutor != null) { - runtimeAdminExecutor.shutdown(); - } if (clientManageExecutor != null) { clientManageExecutor.shutdown(); } @@ -114,36 +113,4 @@ public void shutdownThreadPool() { replyMsgExecutor.shutdown(); } } - - public ThreadPoolExecutor getBatchMsgExecutor() { - return batchMsgExecutor; - } - - public ThreadPoolExecutor getSendMsgExecutor() { - return sendMsgExecutor; - } - - public ThreadPoolExecutor getRemoteMsgExecutor() { - return remoteMsgExecutor; - } - - public ThreadPoolExecutor getReplyMsgExecutor() { - return replyMsgExecutor; - } - - public ThreadPoolExecutor getPushMsgExecutor() { - return pushMsgExecutor; - } - - public ThreadPoolExecutor getClientManageExecutor() { - return clientManageExecutor; - } - - public ThreadPoolExecutor getRuntimeAdminExecutor() { - return runtimeAdminExecutor; - } - - public ThreadPoolExecutor getWebhookExecutor() { - return webhookExecutor; - } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/SSLContextFactory.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/SSLContextFactory.java index 0f48220a4d..a0736b430a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/SSLContextFactory.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/SSLContextFactory.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.boot; +import org.apache.eventmesh.runtime.configuration.EventMeshAdminConfiguration; import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import org.apache.eventmesh.runtime.constants.EventMeshConstants; @@ -40,23 +41,19 @@ public class SSLContextFactory { - private static String protocol = "TLSv1.1"; - - private static String fileName; - - private static String password; - + /** + * {@link EventMeshAdminConfiguration} will be parsed into {@link EventMeshHTTPConfiguration}. + */ public static SSLContext getSslContext(final EventMeshHTTPConfiguration eventMeshHttpConfiguration) - throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, - UnrecoverableKeyException, KeyManagementException { + throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, UnrecoverableKeyException, KeyManagementException { + + String protocol = eventMeshHttpConfiguration.getEventMeshServerSSLProtocol(); + String fileName = eventMeshHttpConfiguration.getEventMeshServerSSLCer(); + String password = eventMeshHttpConfiguration.getEventMeshServerSSLPass(); SSLContext sslContext; - try (InputStream inputStream = Files.newInputStream(Paths.get(EventMeshConstants.EVENTMESH_CONF_HOME - + File.separator - + fileName), StandardOpenOption.READ)) { - protocol = eventMeshHttpConfiguration.getEventMeshServerSSLProtocol(); - fileName = eventMeshHttpConfiguration.getEventMeshServerSSLCer(); - password = eventMeshHttpConfiguration.getEventMeshServerSSLPass(); + try (InputStream inputStream = Files.newInputStream(Paths.get(EventMeshConstants.EVENTMESH_CONF_HOME + File.separator + fileName), + StandardOpenOption.READ)) { char[] filePass = StringUtils.isNotBlank(password) ? password.toCharArray() : new char[0]; final KeyStore keyStore = KeyStore.getInstance("JKS"); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshAdminConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshAdminConfiguration.java new file mode 100644 index 0000000000..dff80eaaa2 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshAdminConfiguration.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.configuration; + +import org.apache.eventmesh.common.config.Config; +import org.apache.eventmesh.common.config.ConfigField; + +import java.util.Collections; +import java.util.List; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +import inet.ipaddr.IPAddress; + +@Data +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +@Config(prefix = "eventMesh.server") +public class EventMeshAdminConfiguration extends EventMeshHTTPConfiguration { + + @ConfigField(field = "admin.http.port") + private int eventMeshServerAdminPort = 10106; + + @ConfigField(field = "admin.threads.num") + private int eventMeshServerAdminThreadNum = 2; + + @ConfigField(field = "admin.useTls.enabled") + private boolean eventMeshServerUseTls = false; + + @ConfigField(field = "admin.ssl.protocol") + private String eventMeshServerSSLProtocol = "TLSv1.3"; + + @ConfigField(field = "admin.ssl.cer") + private String eventMeshServerSSLCer = "admin-server.jks"; + + @ConfigField(field = "admin.ssl.pass") + private String eventMeshServerSSLPass = "eventmesh-admin-server"; + + @ConfigField(field = "admin.blacklist.ipv4") + private List eventMeshIpv4BlackList = Collections.emptyList(); // TODO implement after merging #4835 + + @ConfigField(field = "admin.blacklist.ipv6") + private List eventMeshIpv6BlackList = Collections.emptyList(); +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfiguration.java index cf08f27553..924a07ab01 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfiguration.java @@ -59,9 +59,6 @@ public class EventMeshGrpcConfiguration extends CommonConfiguration { @ConfigField(field = "metaStorage.threads.num") private int eventMeshServerMetaStorageThreadNum = 10; - @ConfigField(field = "admin.threads.num") - private int eventMeshServerAdminThreadNum = 2; - @ConfigField(field = "retry.threads.num") private int eventMeshServerRetryThreadNum = 2; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java index b0fa18df4c..287c222245 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java @@ -63,9 +63,6 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { @ConfigField(field = "metaStorage.threads.num") private int eventMeshServerMetaStorageThreadNum = 10; - @ConfigField(field = "admin.threads.num") - private int eventMeshServerAdminThreadNum = 2; - @ConfigField(field = "retry.threads.num") private int eventMeshServerRetryThreadNum = 2; @@ -131,5 +128,4 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { @ConfigField(field = "blacklist.ipv6") private List eventMeshIpv6BlackList = Collections.emptyList(); - } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java index 891f986167..907d80f686 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java @@ -109,9 +109,6 @@ public class EventMeshTCPConfiguration extends CommonConfiguration { @ConfigField(field = "tcp.RebalanceIntervalInMills") private Integer eventMeshTcpRebalanceIntervalInMills = 30 * 1000; - @ConfigField(field = "admin.http.port") - private int eventMeshServerAdminPort = 10106; - @ConfigField(field = "tcp.sendBack.enabled") private boolean eventMeshTcpSendBackEnabled = Boolean.TRUE; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java index b3fbf0d6a8..9a8b369341 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java @@ -18,7 +18,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; import org.apache.eventmesh.common.protocol.http.HttpCommand; -import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; +import org.apache.eventmesh.runtime.boot.EventMeshAdminServer; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import java.util.concurrent.Executor; @@ -31,7 +31,7 @@ @RequiredArgsConstructor public class AdminMetricsProcessor extends AbstractHttpRequestProcessor { - private final EventMeshHTTPServer eventMeshHTTPServer; + private final EventMeshAdminServer eventMeshAdminServer; @Override public void processRequest(ChannelHandlerContext ctx, AsyncContext asyncContext) throws Exception { @@ -39,6 +39,6 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext @Override public Executor executor() { - return eventMeshHTTPServer.getHttpThreadPoolGroup().getRuntimeAdminExecutor(); + return eventMeshAdminServer.getAdminMetricsExecutor(); } } diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java index c546d38e3b..83baf20fb9 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java @@ -77,7 +77,6 @@ private void assertTCPConfig(EventMeshTCPConfiguration config) { Assertions.assertEquals(15816, config.getEventMeshTcpMsgRetryAsyncDelayInMills()); Assertions.assertEquals(16816, config.getEventMeshTcpMsgRetryQueueSize()); Assertions.assertEquals(Integer.valueOf(17816), config.getEventMeshTcpRebalanceIntervalInMills()); - Assertions.assertEquals(18816, config.getEventMeshServerAdminPort()); Assertions.assertEquals(Boolean.TRUE, config.isEventMeshTcpSendBackEnabled()); Assertions.assertEquals(3, config.getEventMeshTcpSendBackMaxTimes()); Assertions.assertEquals(21816, config.getEventMeshTcpPushFailIsolateTimeInMills()); diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfigurationTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfigurationTest.java index 6b206167dc..4dc59a2000 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfigurationTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfigurationTest.java @@ -50,7 +50,6 @@ private void assertGrpcConfig(EventMeshGrpcConfiguration config) { Assertions.assertEquals(5816, config.getEventMeshServerReplyMsgThreadNum()); Assertions.assertEquals(6816, config.getEventMeshServerSubscribeMsgThreadNum()); Assertions.assertEquals(7816, config.getEventMeshServerMetaStorageThreadNum()); - Assertions.assertEquals(8816, config.getEventMeshServerAdminThreadNum()); Assertions.assertEquals(9816, config.getEventMeshServerRetryThreadNum()); Assertions.assertEquals(11816, config.getEventMeshServerPullMetaStorageInterval()); Assertions.assertEquals(12816, config.getEventMeshServerAsyncAccumulationThreshold()); diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfigurationTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfigurationTest.java index d522ff5519..9e99bc511d 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfigurationTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfigurationTest.java @@ -53,7 +53,6 @@ private void assertHTTPConfig(EventMeshHTTPConfiguration config) throws AddressS Assertions.assertEquals(5816, config.getEventMeshServerReplyMsgThreadNum()); Assertions.assertEquals(6816, config.getEventMeshServerClientManageThreadNum()); Assertions.assertEquals(7816, config.getEventMeshServerMetaStorageThreadNum()); - Assertions.assertEquals(8816, config.getEventMeshServerAdminThreadNum()); Assertions.assertEquals(9816, config.getEventMeshServerRetryThreadNum()); Assertions.assertEquals(11816, config.getEventMeshServerPullMetaStorageInterval()); diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java index 1501cf1b5d..2aee84ef72 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java @@ -58,7 +58,6 @@ private void assertTCPConfig(EventMeshTCPConfiguration config) { Assertions.assertEquals(15816, config.getEventMeshTcpMsgRetryAsyncDelayInMills()); Assertions.assertEquals(16816, config.getEventMeshTcpMsgRetryQueueSize()); Assertions.assertEquals(Integer.valueOf(17816), config.getEventMeshTcpRebalanceIntervalInMills()); - Assertions.assertEquals(18816, config.getEventMeshServerAdminPort()); Assertions.assertEquals(Boolean.TRUE, config.isEventMeshTcpSendBackEnabled()); Assertions.assertEquals(3, config.getEventMeshTcpSendBackMaxTimes()); Assertions.assertEquals(21816, config.getEventMeshTcpPushFailIsolateTimeInMills()); diff --git a/eventmesh-runtime/src/test/resources/configuration.properties b/eventmesh-runtime/src/test/resources/configuration.properties index 70ff82e05c..836fc9c981 100644 --- a/eventmesh-runtime/src/test/resources/configuration.properties +++ b/eventmesh-runtime/src/test/resources/configuration.properties @@ -78,7 +78,6 @@ eventMesh.server.tcp.writerIdleSeconds=2816 eventMesh.server.tcp.readerIdleSeconds=3816 eventMesh.server.tcp.msgReqnumPerSecond=4816 eventMesh.server.tcp.clientMaxNum=5816 -eventMesh.server.tcp.enabled=true eventMesh.server.global.scheduler=6816 eventMesh.server.tcp.taskHandleExecutorPoolSize=7816 eventMesh.server.tcp.msgDownStreamExecutorPoolSize=8816 @@ -89,7 +88,6 @@ eventMesh.server.retry.sync.pushRetryDelayInMills=14816 eventMesh.server.retry.async.pushRetryDelayInMills=15816 eventMesh.server.retry.pushRetryQueueSize=16816 eventMesh.server.tcp.RebalanceIntervalInMills=17816 -eventMesh.server.admin.http.port=18816 eventMesh.server.tcp.sendBack.enabled=true eventMesh.server.tcp.pushFailIsolateTimeInMills=21816 eventMesh.server.gracefulShutdown.sleepIntervalInMills=22816