Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* Packet payload.
*/
public interface PacketPayload extends AutoCloseable {
public interface PacketPayload {

/**
* Get byte buf.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,4 @@ public void skipReserved(final int length) {
public void writeReserved(final int length) {
byteBuf.writeZero(length);
}

@Override
public void close() {
byteBuf.release();
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.db.protocol.postgresql.payload;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
Expand Down Expand Up @@ -176,16 +175,4 @@ public void skipReserved(final int length) {
public boolean hasCompletePacket() {
return byteBuf.readableBytes() >= 5 && byteBuf.readableBytes() - 1 >= byteBuf.getInt(byteBuf.readerIndex() + 1);
}

@Override
public void close() {
if (byteBuf instanceof CompositeByteBuf) {
int remainBytes = byteBuf.readableBytes();
if (remainBytes > 0) {
byteBuf.skipBytes(remainBytes);
}
((CompositeByteBuf) byteBuf).discardReadComponents();
}
byteBuf.release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,5 @@ void assertReadWrite() {
assertThat(payload.bytesBeforeZero(), is(expectedString.length()));
assertThat(payload.readStringNul(), is(expectedString));
assertThat(payload.getByteBuf(), is(byteBuf));
payload.close();
}
}
2 changes: 1 addition & 1 deletion infra/util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.proxy.frontend.command;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -70,11 +71,12 @@ public final class CommandExecutorTask implements Runnable {
public void run() {
boolean isNeedFlush = false;
boolean sqlShowEnabled = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.SQL_SHOW);
try (PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message, context.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get())) {
try {
if (sqlShowEnabled) {
fillLogMDC();
}
isNeedFlush = executeCommand(context, payload);
isNeedFlush = executeCommand(context,
databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message, context.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()));
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
Expand All @@ -99,9 +101,18 @@ public void run() {
if (sqlShowEnabled) {
clearLogMDC();
}
if (message instanceof CompositeByteBuf) {
releaseCompositeByteBuf((CompositeByteBuf) message);
}
((ByteBuf) message).release();
}
}

private void fillLogMDC() {
MDC.put(LogMDCConstants.DATABASE_KEY, connectionSession.getDatabaseName());
MDC.put(LogMDCConstants.USER_KEY, connectionSession.getGrantee().toString());
}

private boolean executeCommand(final ChannelHandlerContext context, final PacketPayload payload) throws SQLException {
CommandExecuteEngine commandExecuteEngine = databaseProtocolFrontendEngine.getCommandExecuteEngine();
CommandPacketType type = commandExecuteEngine.getCommandPacketType(payload);
Expand Down Expand Up @@ -151,12 +162,15 @@ private void processClosedExceptions(final Collection<SQLException> exceptions)
processException(ex);
}

private void fillLogMDC() {
MDC.put(LogMDCConstants.DATABASE_KEY, connectionSession.getDatabaseName());
MDC.put(LogMDCConstants.USER_KEY, connectionSession.getGrantee().toString());
}

private void clearLogMDC() {
MDC.clear();
}

private void releaseCompositeByteBuf(final CompositeByteBuf compositeByteBuf) {
int remainBytes = compositeByteBuf.readableBytes();
if (remainBytes > 0) {
compositeByteBuf.skipBytes(remainBytes);
}
compositeByteBuf.discardReadComponents();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
Expand Down Expand Up @@ -77,8 +76,9 @@ public void channelRead(final ChannelHandlerContext context, final Object messag
}

private boolean authenticate(final ChannelHandlerContext context, final ByteBuf message) {
try (PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload(message, context.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get())) {
AuthenticationResult authResult = databaseProtocolFrontendEngine.getAuthenticationEngine().authenticate(context, payload);
try {
AuthenticationResult authResult = databaseProtocolFrontendEngine.getAuthenticationEngine().authenticate(context,
databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload(message, context.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()));
if (authResult.isFinished()) {
connectionSession.setGrantee(new Grantee(authResult.getUsername(), authResult.getHostname()));
connectionSession.setCurrentDatabase(authResult.getDatabase());
Expand All @@ -95,6 +95,8 @@ private boolean authenticate(final ChannelHandlerContext context, final ByteBuf
}
context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(ex));
context.close();
} finally {
message.release();
}
return false;
}
Expand Down