-
Notifications
You must be signed in to change notification settings - Fork 254
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
SSL Support (plus mysql_clear_password plugin for RDS) (#280)
* Added SSL Support again * Issue #265 - _process_auth implementation * Added cleartext plugin test
- Loading branch information
Showing
2 changed files
with
134 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from aiomysql import create_pool | ||
|
||
import pytest | ||
|
||
|
||
@pytest.mark.run_loop | ||
async def test_tls_connect(mysql_server, loop): | ||
async with create_pool(**mysql_server['conn_params'], | ||
loop=loop) as pool: | ||
async with pool.get() as conn: | ||
async with conn.cursor() as cur: | ||
# Run simple command | ||
await cur.execute("SHOW DATABASES;") | ||
value = await cur.fetchall() | ||
|
||
values = [item[0] for item in value] | ||
# Spot check the answers, we should at least have mysql | ||
# and information_schema | ||
assert 'mysql' in values, \ | ||
'Could not find the "mysql" table' | ||
assert 'information_schema' in values, \ | ||
'Could not find the "mysql" table' | ||
|
||
# Check TLS variables | ||
await cur.execute("SHOW STATUS LIKE '%Ssl_version%';") | ||
value = await cur.fetchone() | ||
|
||
# The context has TLS | ||
assert value[1].startswith('TLS'), \ | ||
'Not connected to the database with TLS' | ||
|
||
|
||
# MySQL will get you to renegotiate if sent a cleartext password | ||
@pytest.mark.run_loop | ||
async def test_auth_plugin_renegotiation(mysql_server, loop): | ||
async with create_pool(**mysql_server['conn_params'], | ||
auth_plugin='mysql_clear_password', | ||
loop=loop) as pool: | ||
async with pool.get() as conn: | ||
async with conn.cursor() as cur: | ||
# Run simple command | ||
await cur.execute("SHOW DATABASES;") | ||
value = await cur.fetchall() | ||
|
||
assert len(value), 'No databases found' | ||
|
||
assert conn._client_auth_plugin == 'mysql_clear_password', \ | ||
'Client did not try clear password auth' | ||
assert conn._server_auth_plugin == 'mysql_native_password', \ | ||
'Server did not ask for native auth' | ||
assert conn._auth_plugin_used == b'mysql_native_password', \ | ||
'Client did not renegotiate with native auth' |