2727
2828import io .netty .bootstrap .Bootstrap ;
2929import io .netty .buffer .ByteBufAllocator ;
30- import io .netty .channel .*;
30+ import io .netty .channel .Channel ;
31+ import io .netty .channel .ChannelInitializer ;
32+ import io .netty .channel .ChannelOption ;
33+ import io .netty .channel .ChannelPipeline ;
34+ import io .netty .channel .DefaultEventLoopGroup ;
3135import io .netty .channel .unix .PreferredDirectByteBufAllocator ;
32- import io .netty .handler .codec .haproxy .*;
36+ import io .netty .handler .codec .haproxy .HAProxyCommand ;
37+ import io .netty .handler .codec .haproxy .HAProxyMessage ;
38+ import io .netty .handler .codec .haproxy .HAProxyMessageEncoder ;
39+ import io .netty .handler .codec .haproxy .HAProxyProtocolVersion ;
40+ import io .netty .handler .codec .haproxy .HAProxyProxiedProtocol ;
41+ import io .netty .handler .timeout .ReadTimeoutHandler ;
42+ import io .netty .handler .timeout .WriteTimeoutHandler ;
3343import io .netty .util .concurrent .DefaultThreadFactory ;
3444import org .checkerframework .checker .nullness .qual .NonNull ;
3545import org .geysermc .mcprotocollib .network .BuiltinFlags ;
3646import org .geysermc .mcprotocollib .network .codec .PacketCodecHelper ;
3747import org .geysermc .mcprotocollib .network .packet .PacketProtocol ;
48+ import org .geysermc .mcprotocollib .network .tcp .FlushHandler ;
49+ import org .geysermc .mcprotocollib .network .tcp .TcpFlowControlHandler ;
3850import org .geysermc .mcprotocollib .network .tcp .TcpPacketCodec ;
51+ import org .geysermc .mcprotocollib .network .tcp .TcpPacketCompression ;
52+ import org .geysermc .mcprotocollib .network .tcp .TcpPacketEncryptor ;
3953import org .geysermc .mcprotocollib .network .tcp .TcpPacketSizer ;
4054import org .geysermc .mcprotocollib .network .tcp .TcpSession ;
4155import org .geysermc .mcprotocollib .protocol .codec .MinecraftCodecHelper ;
4256
4357import java .net .Inet4Address ;
4458import java .net .InetSocketAddress ;
4559import java .net .SocketAddress ;
60+ import java .util .concurrent .CompletableFuture ;
4661import java .util .concurrent .TimeUnit ;
4762
4863/**
@@ -72,44 +87,53 @@ public void connect(boolean wait, boolean transferring) {
7287 if (DEFAULT_EVENT_LOOP_GROUP == null ) {
7388 DEFAULT_EVENT_LOOP_GROUP = new DefaultEventLoopGroup (new DefaultThreadFactory (this .getClass (), true ));
7489 Runtime .getRuntime ().addShutdownHook (new Thread (
75- () -> DEFAULT_EVENT_LOOP_GROUP .shutdownGracefully (100 , 500 , TimeUnit .MILLISECONDS )));
90+ () -> DEFAULT_EVENT_LOOP_GROUP .shutdownGracefully (100 , 500 , TimeUnit .MILLISECONDS )));
7691 }
7792
78- try {
79- final Bootstrap bootstrap = new Bootstrap ( );
80- bootstrap .channel ( LocalChannelWithRemoteAddress . class );
81- bootstrap . handler ( new ChannelInitializer < LocalChannelWithRemoteAddress >() {
82- @ Override
83- public void initChannel ( @ NonNull LocalChannelWithRemoteAddress channel ) {
84- channel . spoofedRemoteAddress ( new InetSocketAddress ( clientIp , 0 ) );
85- PacketProtocol protocol = getPacketProtocol ( );
86- protocol . newClientSession ( LocalSession . this , transferring );
87-
88- refreshReadTimeoutHandler ( channel );
89- refreshWriteTimeoutHandler (channel );
90-
91- ChannelPipeline pipeline = channel . pipeline ( );
92- pipeline .addLast ("sizer " , new TcpPacketSizer ( LocalSession . this , protocol . getPacketHeader (). getLengthSize ( )));
93- pipeline . addLast ( "codec" , new TcpPacketCodec ( LocalSession . this , true ));
94- pipeline .addLast ("manager " , LocalSession . this );
95-
96- addHAProxySupport ( pipeline );
97- }
98- }). group ( DEFAULT_EVENT_LOOP_GROUP ). option ( ChannelOption . CONNECT_TIMEOUT_MILLIS , getConnectTimeout () * 1000 );
99-
100- if ( PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR != null ) {
101- bootstrap . option ( ChannelOption . ALLOCATOR , PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR );
93+ final Bootstrap bootstrap = new Bootstrap ();
94+ bootstrap . channel ( LocalChannelWithRemoteAddress . class );
95+ bootstrap .handler ( new ChannelInitializer < LocalChannelWithRemoteAddress >() {
96+ @ Override
97+ public void initChannel ( @ NonNull LocalChannelWithRemoteAddress channel ) {
98+ channel . spoofedRemoteAddress ( new InetSocketAddress ( clientIp , 0 ));
99+ PacketProtocol protocol = getPacketProtocol ( );
100+ protocol . newClientSession ( LocalSession . this , transferring );
101+
102+ ChannelPipeline pipeline = channel . pipeline ();
103+
104+ initializeHAProxySupport (channel );
105+
106+ pipeline . addLast ( "read-timeout" , new ReadTimeoutHandler ( getFlag ( BuiltinFlags . READ_TIMEOUT , 30 )) );
107+ pipeline .addLast ("write-timeout " , new WriteTimeoutHandler ( getFlag ( BuiltinFlags . WRITE_TIMEOUT , 0 )));
108+
109+ pipeline .addLast ("encryption " , new TcpPacketEncryptor () );
110+ pipeline . addLast ( "sizer" , new TcpPacketSizer ( protocol . getPacketHeader (), getCodecHelper ()));
111+ pipeline . addLast ( "compression" , new TcpPacketCompression ( getCodecHelper ()) );
112+
113+ pipeline . addLast ( "flow-control" , new TcpFlowControlHandler () );
114+ pipeline . addLast ( "codec" , new TcpPacketCodec ( LocalSession . this , true ));
115+ pipeline . addLast ( "flush-handler" , new FlushHandler ());
116+ pipeline . addLast ( "manager" , LocalSession . this );
102117 }
118+ }).group (DEFAULT_EVENT_LOOP_GROUP ).option (ChannelOption .CONNECT_TIMEOUT_MILLIS , getFlag (BuiltinFlags .CLIENT_CONNECT_TIMEOUT , 30 ) * 1000 );
103119
104- bootstrap .remoteAddress (targetAddress );
120+ if (PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR != null ) {
121+ bootstrap .option (ChannelOption .ALLOCATOR , PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR );
122+ }
105123
106- bootstrap .connect ().addListener ((future ) -> {
107- if (!future .isSuccess ()) {
108- exceptionCaught (null , future .cause ());
109- }
110- });
111- } catch (Throwable t ) {
112- exceptionCaught (null , t );
124+ bootstrap .remoteAddress (targetAddress );
125+
126+ CompletableFuture <Void > handleFuture = new CompletableFuture <>();
127+ bootstrap .connect ().addListener ((futureListener ) -> {
128+ if (!futureListener .isSuccess ()) {
129+ exceptionCaught (null , futureListener .cause ());
130+ }
131+
132+ handleFuture .complete (null );
133+ });
134+
135+ if (wait ) {
136+ handleFuture .join ();
113137 }
114138 }
115139
@@ -118,32 +142,20 @@ public MinecraftCodecHelper getCodecHelper() {
118142 return (MinecraftCodecHelper ) this .codecHelper ;
119143 }
120144
121- // TODO duplicate code
122- private void addHAProxySupport (ChannelPipeline pipeline ) {
145+ private void initializeHAProxySupport (Channel channel ) {
123146 InetSocketAddress clientAddress = getFlag (BuiltinFlags .CLIENT_PROXIED_ADDRESS );
124- if (getFlag (BuiltinFlags .ENABLE_CLIENT_PROXY_PROTOCOL , false ) && clientAddress != null ) {
125- pipeline .addFirst ("proxy-protocol-packet-sender" , new ChannelInboundHandlerAdapter () {
126- @ Override
127- public void channelActive (@ NonNull ChannelHandlerContext ctx ) throws Exception {
128- HAProxyProxiedProtocol proxiedProtocol = clientAddress .getAddress () instanceof Inet4Address ? HAProxyProxiedProtocol .TCP4 : HAProxyProxiedProtocol .TCP6 ;
129- InetSocketAddress remoteAddress ;
130- if (ctx .channel ().remoteAddress () instanceof InetSocketAddress ) {
131- remoteAddress = (InetSocketAddress ) ctx .channel ().remoteAddress ();
132- } else {
133- remoteAddress = new InetSocketAddress (host , port );
134- }
135- ctx .channel ().writeAndFlush (new HAProxyMessage (
136- HAProxyProtocolVersion .V2 , HAProxyCommand .PROXY , proxiedProtocol ,
137- clientAddress .getAddress ().getHostAddress (), remoteAddress .getAddress ().getHostAddress (),
138- clientAddress .getPort (), remoteAddress .getPort ()
139- ));
140- ctx .pipeline ().remove (this );
141- ctx .pipeline ().remove ("proxy-protocol-encoder" );
142- super .channelActive (ctx );
143- }
144- });
145- pipeline .addFirst ("proxy-protocol-encoder" , HAProxyMessageEncoder .INSTANCE );
147+ if (clientAddress == null ) {
148+ return ;
146149 }
150+
151+ channel .pipeline ().addLast ("proxy-protocol-encoder" , HAProxyMessageEncoder .INSTANCE );
152+ HAProxyProxiedProtocol proxiedProtocol = clientAddress .getAddress () instanceof Inet4Address ? HAProxyProxiedProtocol .TCP4 : HAProxyProxiedProtocol .TCP6 ;
153+ InetSocketAddress remoteAddress = (InetSocketAddress ) channel .remoteAddress ();
154+ channel .writeAndFlush (new HAProxyMessage (
155+ HAProxyProtocolVersion .V2 , HAProxyCommand .PROXY , proxiedProtocol ,
156+ clientAddress .getAddress ().getHostAddress (), remoteAddress .getAddress ().getHostAddress (),
157+ clientAddress .getPort (), remoteAddress .getPort ()
158+ )).addListener (future -> channel .pipeline ().remove ("proxy-protocol-encoder" ));
147159 }
148160
149161 /**
0 commit comments