Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql_replication: add channel parameter #63271

Merged
merged 2 commits into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
minor_changes:
- mysql_replication - add ``channel`` parameter (https://github.com/ansible/ansible/issues/29311).
67 changes: 53 additions & 14 deletions lib/ansible/modules/database/mysql/mysql_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@
- For more information see U(https://mariadb.com/kb/en/library/multi-source-replication/).
type: str
version_added: "2.10"
channel:
description:
- Name of replication channel.
- Multi-source replication is supported from MySQL 5.7.
- For more information see U(https://dev.mysql.com/doc/refman/8.0/en/replication-multi-source.html).
type: str
version_added: "2.10"

extends_documentation_fragment:
- mysql
Expand Down Expand Up @@ -169,6 +176,11 @@
mysql_replication:
mode: startslave
connection_name: master-1

- name: Stop replication in channel master-1
mysql_replication:
mode: stopslave
channel: master-1
'''

RETURN = r'''
Expand Down Expand Up @@ -196,20 +208,29 @@ def get_master_status(cursor):
return masterstatus


def get_slave_status(cursor, connection_name=''):
def get_slave_status(cursor, connection_name='', channel=''):
if connection_name:
cursor.execute("SHOW SLAVE '%s' STATUS" % connection_name)
query = "SHOW SLAVE '%s' STATUS" % connection_name
else:
cursor.execute("SHOW SLAVE STATUS")
query = "SHOW SLAVE STATUS"

if channel:
query += " FOR CHANNEL '%s'" % channel

cursor.execute(query)
slavestatus = cursor.fetchone()
return slavestatus


def stop_slave(cursor, connection_name=''):
def stop_slave(cursor, connection_name='', channel=''):
if connection_name:
query = "STOP SLAVE '%s'" % connection_name
else:
query = 'STOP SLAVE'

if channel:
query += " FOR CHANNEL '%s'" % channel

try:
executed_queries.append(query)
cursor.execute(query)
Expand All @@ -219,11 +240,15 @@ def stop_slave(cursor, connection_name=''):
return stopped


def reset_slave(cursor, connection_name=''):
def reset_slave(cursor, connection_name='', channel=''):
if connection_name:
query = "RESET SLAVE '%s'" % connection_name
else:
query = 'RESET SLAVE'

if channel:
query += " FOR CHANNEL '%s'" % channel

try:
executed_queries.append(query)
cursor.execute(query)
Expand All @@ -233,11 +258,15 @@ def reset_slave(cursor, connection_name=''):
return reset


def reset_slave_all(cursor, connection_name=''):
def reset_slave_all(cursor, connection_name='', channel=''):
if connection_name:
query = "RESET SLAVE '%s' ALL" % connection_name
else:
query = 'RESET SLAVE ALL'

if channel:
query += " FOR CHANNEL '%s'" % channel

try:
executed_queries.append(query)
cursor.execute(query)
Expand All @@ -247,11 +276,15 @@ def reset_slave_all(cursor, connection_name=''):
return reset


def start_slave(cursor, connection_name=''):
def start_slave(cursor, connection_name='', channel=''):
if connection_name:
query = "START SLAVE '%s'" % connection_name
else:
query = 'START SLAVE'

if channel:
query += " FOR CHANNEL '%s'" % channel

try:
executed_queries.append(query)
cursor.execute(query)
Expand All @@ -261,11 +294,15 @@ def start_slave(cursor, connection_name=''):
return started


def changemaster(cursor, chm, connection_name=''):
def changemaster(cursor, chm, connection_name='', channel=''):
if connection_name:
query = "CHANGE MASTER '%s' TO %s" % (connection_name, ','.join(chm))
else:
query = 'CHANGE MASTER TO %s' % ','.join(chm)

if channel:
query += " FOR CHANNEL '%s'" % channel

executed_queries.append(query)
cursor.execute(query)

Expand Down Expand Up @@ -304,6 +341,7 @@ def main():
master_use_gtid=dict(type='str', choices=['current_pos', 'slave_pos', 'disabled']),
master_delay=dict(type='int'),
connection_name=dict(type='str'),
channel=dict(type='str'),
)
)
mode = module.params["mode"]
Expand Down Expand Up @@ -334,6 +372,7 @@ def main():
else:
master_use_gtid = module.params["master_use_gtid"]
connection_name = module.params["connection_name"]
channel = module.params['channel']

if mysql_driver is None:
module.fail_json(msg=mysql_driver_fail_msg)
Expand Down Expand Up @@ -363,7 +402,7 @@ def main():
module.exit_json(queries=executed_queries, **status)

elif mode in "getslave":
status = get_slave_status(cursor, connection_name)
status = get_slave_status(cursor, connection_name, channel)
if not isinstance(status, dict):
status = dict(Is_Slave=False, msg="Server is not configured as mysql slave")
else:
Expand Down Expand Up @@ -410,33 +449,33 @@ def main():
if master_use_gtid is not None:
chm.append("MASTER_USE_GTID=%s" % master_use_gtid)
try:
changemaster(cursor, chm, connection_name)
changemaster(cursor, chm, connection_name, channel)
except mysql_driver.Warning as e:
result['warning'] = to_native(e)
except Exception as e:
module.fail_json(msg='%s. Query == CHANGE MASTER TO %s' % (to_native(e), chm))
result['changed'] = True
module.exit_json(queries=executed_queries, **result)
elif mode in "startslave":
started = start_slave(cursor, connection_name)
started = start_slave(cursor, connection_name, channel)
if started is True:
module.exit_json(msg="Slave started ", changed=True, queries=executed_queries)
else:
module.exit_json(msg="Slave already started (Or cannot be started)", changed=False, queries=executed_queries)
elif mode in "stopslave":
stopped = stop_slave(cursor, connection_name)
stopped = stop_slave(cursor, connection_name, channel)
if stopped is True:
module.exit_json(msg="Slave stopped", changed=True, queries=executed_queries)
else:
module.exit_json(msg="Slave already stopped", changed=False, queries=executed_queries)
elif mode in "resetslave":
reset = reset_slave(cursor, connection_name)
reset = reset_slave(cursor, connection_name, channel)
if reset is True:
module.exit_json(msg="Slave reset", changed=True, queries=executed_queries)
else:
module.exit_json(msg="Slave already reset", changed=False, queries=executed_queries)
elif mode in "resetslaveall":
reset = reset_slave_all(cursor, connection_name)
reset = reset_slave_all(cursor, connection_name, channel)
if reset is True:
module.exit_json(msg="Slave reset", changed=True, queries=executed_queries)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ test_master_delay: 60
replication_user: replication_user
replication_pass: replication_pass
dump_path: /tmp/dump.sql
test_channel: test_channel-1
4 changes: 4 additions & 0 deletions test/integration/targets/mysql_replication/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@
# Tests of master_delay parameter:
- import_tasks: mysql_replication_master_delay.yml
when: ansible_distribution == 'CentOS' and ansible_distribution_major_version >= '7'

# Tests of channel parameter:
- import_tasks: mysql_replication_channel.yml
when: ansible_distribution == 'CentOS' and ansible_distribution_major_version >= '7'
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright: (c) 2019, Andrew Klychkov (@Andersson007) <aaklychkov@mail.ru>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

# Needs for further tests:
- name: Stop slave
mysql_replication:
login_host: 127.0.0.1
login_port: "{{ standby_port }}"
mode: stopslave

- name: Reset slave all
mysql_replication:
login_host: 127.0.0.1
login_port: "{{ standby_port }}"
mode: resetslaveall

# Get master log file and log pos:
- name: Get master status
mysql_replication:
login_host: 127.0.0.1
login_port: "{{ master_port }}"
mode: getmaster
register: master_status

# Test changemaster mode:
- name: Run replication with channel
mysql_replication:
login_host: 127.0.0.1
login_port: "{{ standby_port }}"
mode: changemaster
master_host: 127.0.0.1
master_port: "{{ master_port }}"
master_user: "{{ replication_user }}"
master_password: "{{ replication_pass }}"
master_log_file: "{{ master_status.File }}"
master_log_pos: "{{ master_status.Position }}"
channel: "{{ test_channel }}"
register: result

- assert:
that:
- result is changed
- result.queries == ["CHANGE MASTER TO MASTER_HOST='127.0.0.1',MASTER_USER='replication_user',MASTER_PASSWORD='********',MASTER_PORT=3306,MASTER_LOG_FILE='{{ master_status.File }}',MASTER_LOG_POS={{ master_status.Position }} FOR CHANNEL '{{ test_channel }}'"]

# Test startslave mode:
- name: Start slave with channel
mysql_replication:
login_host: 127.0.0.1
login_port: "{{ standby_port }}"
mode: startslave
channel: '{{ test_channel }}'
register: result

- assert:
that:
- result is changed
- result.queries == ["START SLAVE FOR CHANNEL '{{ test_channel }}'"]

# Test getslave mode:
- name: Get standby status with channel
mysql_replication:
login_host: 127.0.0.1
login_port: "{{ standby_port }}"
mode: getslave
channel: '{{ test_channel }}'
register: slave_status

- assert:
that:
- slave_status.Is_Slave == true
- slave_status.Master_Host == '127.0.0.1'
- slave_status.Exec_Master_Log_Pos == master_status.Position
- slave_status.Master_Port == {{ master_port }}
- slave_status.Last_IO_Errno == 0
- slave_status.Last_IO_Error == ''
- slave_status.Channel_Name == '{{ test_channel }}'
- slave_status is not changed

# Test stopslave mode:
- name: Stop slave with channel
mysql_replication:
login_host: 127.0.0.1
login_port: "{{ standby_port }}"
mode: stopslave
channel: '{{ test_channel }}'
register: result

- assert:
that:
- result is changed
- result.queries == ["STOP SLAVE FOR CHANNEL '{{ test_channel }}'"]

# Test reset
- name: Reset slave with channel
mysql_replication:
login_host: 127.0.0.1
login_port: "{{ standby_port }}"
mode: resetslave
channel: '{{ test_channel }}'
register: result

- assert:
that:
- result is changed
- result.queries == ["RESET SLAVE FOR CHANNEL '{{ test_channel }}'"]

# Test reset all
- name: Reset slave all with channel
mysql_replication:
login_host: 127.0.0.1
login_port: "{{ standby_port }}"
mode: resetslaveall
channel: '{{ test_channel }}'
register: result

- assert:
that:
- result is changed
- result.queries == ["RESET SLAVE ALL FOR CHANNEL '{{ test_channel }}'"]
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@
master_port: "{{ master_port }}"
master_user: "{{ replication_user }}"
master_password: "{{ replication_pass }}"
master_log_file: mysql-bin.000001
master_log_pos: '{{ master_status.Position }}'
master_log_file: "{{ master_status.File }}"
master_log_pos: "{{ master_status.Position }}"
register: result

- assert:
that:
- result is changed
- result.queries == ["CHANGE MASTER TO MASTER_HOST='127.0.0.1',MASTER_USER='replication_user',MASTER_PASSWORD='********',MASTER_PORT=3306,MASTER_LOG_FILE='mysql-bin.000001',MASTER_LOG_POS={{ master_status.Position }}"]
- result.queries == ["CHANGE MASTER TO MASTER_HOST='127.0.0.1',MASTER_USER='replication_user',MASTER_PASSWORD='********',MASTER_PORT=3306,MASTER_LOG_FILE='{{ master_status.File }}',MASTER_LOG_POS={{ master_status.Position }}"]

# Test startslave mode:
- name: Start slave
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
repo_link: http://repo.mysql.com/mysql-community-release-el7-5.noarch.rpm
repo_link: https://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm
repo_name: mysql-community
mysql_package_name: mysql-community-server

master_port: 3306
standby_port: 3307
master_datadir: /var/lib/mysql_master
master_cnf: /etc/my-1.cnf
standby_cnf: /etc/my-2.cnf
standby_datadir: /var/lib/mysql_standby
standby_logdir: /var/log/mysql_standby
default_logdir: /var/log/mysql
mysql_safe_err_log: /var/log/mysql/mysql_safe-err.log
mysqld_err_log: '{{ default_logdir }}/mysql-err.log'