-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix MySQL reset packet with prepared statements #611
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -380,8 +380,11 @@ func (handler *Handler) ProxyClientConnection(ctx context.Context, errCh chan<- | |
|
||
handler.setQueryHandler(handler.QueryResponseHandler) | ||
break | ||
case CommandStatementClose, CommandStatementSendLongData, CommandStatementReset: | ||
case CommandStatementClose, CommandStatementSendLongData: | ||
clientLog.Debugln("Close|SendLongData|Reset command") | ||
case CommandStatementReset: | ||
clientLog.Debugln("Reset Request Statement") | ||
handler.setQueryHandler(handler.ResetStatementResponseHandler) | ||
default: | ||
clientLog.Debugf("Command %d not supported now", cmd) | ||
} | ||
|
@@ -406,32 +409,36 @@ func (handler *Handler) handleStatementExecute(ctx context.Context, packet *Pack | |
return nil | ||
} | ||
|
||
parameters, err := packet.GetBindParameters(statement.ParamsNum()) | ||
if err != nil { | ||
log.WithError(err).Error("Can't parse OnBind parameters") | ||
return nil | ||
} | ||
|
||
newParameters, changed, err := handler.queryObserverManager.OnBind(ctx, statement.Query(), parameters) | ||
if err != nil { | ||
// Security: here we should interrupt proxying in case of any keys read related errors | ||
// in other cases we just stop the processing to let db protocol handle the error. | ||
if filesystem.IsKeyReadError(err) { | ||
return err | ||
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_execute.html | ||
// we expect list of parameters if the paramsNum > 0 | ||
if paramsNum := statement.ParamsNum(); paramsNum > 0 { | ||
parameters, err := packet.GetBindParameters(paramsNum) | ||
if err != nil { | ||
log.WithError(err).Error("Can't parse OnBind parameters") | ||
return nil | ||
} | ||
|
||
log.WithError(err).Error("Failed to handle Bind packet") | ||
return nil | ||
} | ||
|
||
// Finally, if the parameter values have been changed, update the packet. | ||
// If that fails, send the packet unchanged, as usual. | ||
if changed { | ||
err := packet.SetParameters(newParameters) | ||
newParameters, changed, err := handler.queryObserverManager.OnBind(ctx, statement.Query(), parameters) | ||
if err != nil { | ||
log.WithError(err).Error("Failed to update Bind packet") | ||
// Security: here we should interrupt proxying in case of any keys read related errors | ||
// in other cases we just stop the processing to let db protocol handle the error. | ||
if filesystem.IsKeyReadError(err) { | ||
return err | ||
} | ||
|
||
log.WithError(err).Error("Failed to handle Bind packet") | ||
return nil | ||
} | ||
|
||
// Finally, if the parameter values have been changed, update the packet. | ||
// If that fails, send the packet unchanged, as usual. | ||
if changed { | ||
err := packet.SetParameters(newParameters) | ||
if err != nil { | ||
log.WithError(err).Error("Failed to update Bind packet") | ||
return nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should not send it as is because it is dangerous behavior. application expects that when it sent data out it will be protected and in safe in the database but acra will pass it as is if something fails on its side. Acra should not pass unprotected data to the database if it marked to be protected. |
||
} | ||
} | ||
} | ||
|
||
return nil | ||
|
@@ -582,6 +589,23 @@ func (handler *Handler) isPreparedStatementResult() bool { | |
return handler.currentCommand == CommandStatementExecute | ||
} | ||
|
||
// ResetStatementResponseHandler handle response for Reset Request Statement | ||
func (handler *Handler) ResetStatementResponseHandler(ctx context.Context, packet *Packet, dbConnection, clientConnection net.Conn) (err error) { | ||
if packet.IsOK() { | ||
handler.logger.Debugln("OK Packet on Reset Request Statement") | ||
} else { | ||
handler.logger.Debugln("Err Packet on Reset Request Statement") | ||
} | ||
|
||
handler.resetQueryHandler() | ||
|
||
if _, err := clientConnection.Write(packet.Dump()); err != nil { | ||
handler.logger.WithError(err).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorNetworkWrite). | ||
Debugln("Can't proxy output") | ||
} | ||
return nil | ||
} | ||
|
||
// QueryResponseHandler parses data from database response | ||
func (handler *Handler) QueryResponseHandler(ctx context.Context, packet *Packet, dbConnection, clientConnection net.Conn) (err error) { | ||
handler.resetQueryHandler() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -905,6 +905,76 @@ def execute_prepared_statement(self, query, args=None): | |
return cursor.fetchall() | ||
|
||
|
||
class MysqlConnectorCExecutor(QueryExecutor): | ||
def _result_to_dict(self, description, data): | ||
"""convert list of tuples of rows to list of dicts""" | ||
columns_name = [bytes(i[0], encoding='utf8').decode('utf8') for i in description] | ||
result = [] | ||
for row in data: | ||
row_data = {column_name: value | ||
for column_name, value in zip(columns_name, row)} | ||
result.append(row_data) | ||
return result | ||
|
||
def execute(self, query, args=None): | ||
if args is None: | ||
args = [] | ||
with contextlib.closing(mysql.connector.connect( | ||
use_unicode=False, raw=self.connection_args.raw, charset='ascii', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why we don't use unicode and use charset ascii instead of utf8? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we can use Unicode. It affects how the non-binary string will be returned. So then in
but can just do (column will be passed in Unicode):
Regarding the
|
||
host=self.connection_args.host, port=self.connection_args.port, | ||
user=self.connection_args.user, | ||
password=self.connection_args.password, | ||
database=self.connection_args.dbname, | ||
ssl_ca=self.connection_args.ssl_ca, | ||
ssl_cert=self.connection_args.ssl_cert, | ||
ssl_key=self.connection_args.ssl_key, | ||
ssl_disabled=not TEST_WITH_TLS)) as connection: | ||
|
||
with contextlib.closing(connection.cursor()) as cursor: | ||
cursor.execute(query, args) | ||
data = cursor.fetchall() | ||
result = self._result_to_dict(cursor.description, data) | ||
return result | ||
|
||
def execute_prepared_statement(self, query, args=None): | ||
if args is None: | ||
args = [] | ||
with contextlib.closing(mysql.connector.connect( | ||
use_unicode=False, charset='ascii', | ||
host=self.connection_args.host, port=self.connection_args.port, | ||
user=self.connection_args.user, | ||
password=self.connection_args.password, | ||
database=self.connection_args.dbname, | ||
ssl_ca=self.connection_args.ssl_ca, | ||
ssl_cert=self.connection_args.ssl_cert, | ||
ssl_key=self.connection_args.ssl_key, | ||
ssl_disabled=not TEST_WITH_TLS)) as connection: | ||
|
||
with contextlib.closing(connection.cursor(prepared=True)) as cursor: | ||
cursor.execute(query, args) | ||
data = cursor.fetchall() | ||
result = self._result_to_dict(cursor.description, data) | ||
return result | ||
|
||
def execute_prepared_statement_no_result(self, query, args=None): | ||
if args is None: | ||
args = [] | ||
with contextlib.closing(mysql.connector.connect( | ||
use_unicode=False, charset='ascii', | ||
host=self.connection_args.host, port=self.connection_args.port, | ||
user=self.connection_args.user, | ||
password=self.connection_args.password, | ||
database=self.connection_args.dbname, | ||
ssl_ca=self.connection_args.ssl_ca, | ||
ssl_cert=self.connection_args.ssl_cert, | ||
ssl_key=self.connection_args.ssl_key, | ||
ssl_disabled=not TEST_WITH_TLS)) as connection: | ||
|
||
with contextlib.closing(connection.cursor(prepared=True)) as cursor: | ||
cursor.execute(query, args) | ||
connection.commit() | ||
|
||
|
||
class MysqlExecutor(QueryExecutor): | ||
def _result_to_dict(self, description, data): | ||
"""convert list of tuples of rows to list of dicts""" | ||
|
@@ -919,7 +989,7 @@ def _result_to_dict(self, description, data): | |
def execute(self, query, args=None): | ||
if args is None: | ||
args = [] | ||
with contextlib.closing(mysql.connector.Connect( | ||
with contextlib.closing(mysql.connector.connection.MySQLConnection( | ||
use_unicode=False, raw=self.connection_args.raw, charset='ascii', | ||
host=self.connection_args.host, port=self.connection_args.port, | ||
user=self.connection_args.user, | ||
|
@@ -939,7 +1009,7 @@ def execute(self, query, args=None): | |
def execute_prepared_statement(self, query, args=None): | ||
if args is None: | ||
args = [] | ||
with contextlib.closing(mysql.connector.Connect( | ||
with contextlib.closing(mysql.connector.connection.MySQLConnection( | ||
use_unicode=False, charset='ascii', | ||
host=self.connection_args.host, port=self.connection_args.port, | ||
user=self.connection_args.user, | ||
|
@@ -959,7 +1029,7 @@ def execute_prepared_statement(self, query, args=None): | |
def execute_prepared_statement_no_result(self, query, args=None): | ||
if args is None: | ||
args = [] | ||
with contextlib.closing(mysql.connector.Connect( | ||
with contextlib.closing(mysql.connector.connection.MySQLConnection( | ||
use_unicode=False, charset='ascii', | ||
host=self.connection_args.host, port=self.connection_args.port, | ||
user=self.connection_args.user, | ||
|
@@ -4689,6 +4759,24 @@ def executePreparedStatement(self, query, args=None): | |
).execute_prepared_statement(query, args=args) | ||
|
||
|
||
class TestMysqlConnectorCBinaryPreparedStatement(BasePrepareStatementMixin, BaseTestCase): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add some comment with description what we expected this connector will do? it will help to understand at the future if this test will fail after updating driver or something else. and explain what we expected and should reproduce. because after a year if this test will fail, we will not remember what behavior we expected and what to do to back it again There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure |
||
def checkSkip(self): | ||
if not TEST_MYSQL: | ||
self.skipTest("run test only for mysql") | ||
elif not TEST_WITH_TLS: | ||
self.skipTest("running tests only with TLS") | ||
|
||
def executePreparedStatement(self, query, args=None): | ||
return MysqlConnectorCExecutor( | ||
ConnectionArgs(host='localhost', port=self.ACRASERVER_PORT, | ||
user=DB_USER, password=DB_USER_PASSWORD, | ||
dbname=DB_NAME, ssl_ca=TEST_TLS_CA, | ||
ssl_key=TEST_TLS_CLIENT_KEY, | ||
raw=True, | ||
ssl_cert=TEST_TLS_CLIENT_CERT) | ||
).execute_prepared_statement(query, args=args) | ||
|
||
|
||
class TestMysqlBinaryPreparedStatementWholeCell(TestMysqlBinaryPreparedStatement): | ||
WHOLECELL_MODE = True | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.