Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

新增my2sql工具插件模块 #1224 #1314

Merged
merged 9 commits into from
Jan 5, 2022
5 changes: 5 additions & 0 deletions common/templates/base.html
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@
<a href="/binlog2sql/">Binlog2SQL</a>
</li>
{% endif %}
{% if perms.sql.menu_my2sql %}
<li>
<a href="/my2sql/">My2SQL</a>
</li>
{% endif %}
{% if perms.sql.menu_schemasync %}
<li>
<a href="/schemasync/">SchemaSync</a>
Expand Down
11 changes: 11 additions & 0 deletions common/templates/config.html
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,17 @@ <h4 style="color: darkgrey"><b>其他配置</b></h4>
placeholder="binlog2sql调用路径,类似/opt/binlog2sql.py">
</div>
</div>
<div class="form-group">
<label for="my2sql"
class="col-sm-4 control-label">MY2SQL</label>
<div class="col-sm-5">
<input type="text" class="form-control"
id="my2sql"
key="my2sql"
value="{{ config.my2sql }}"
placeholder="my2sql调用路径,类似/opt/archery/src/plugins/my2sql">
</div>
</div>
<div class="form-group">
<label for="default_auth_group"
class="col-sm-4 control-label">DEFAULT_AUTH_GROUP</label>
Expand Down
143 changes: 140 additions & 3 deletions sql/binlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import time
import traceback
import shlex

import simplejson as json
from django.conf import settings
Expand All @@ -15,7 +16,8 @@
from sql.engines import get_engine

from sql.plugins.binglog2sql import Binlog2Sql
from sql.notify import notify_for_binlog2sql
from sql.plugins.my2sql import My2SQL
from sql.notify import notify_for_binlog2sql, notify_for_my2sql
from .models import Instance

logger = logging.getLogger('default')
Expand Down Expand Up @@ -112,7 +114,8 @@ def binlog2sql(request):
# 提交给binlog2sql进行解析
binlog2sql = Binlog2Sql()
# 准备参数
args = {"conn_options": fr"-h{instance.host} -u{instance.user} -p'{instance.password}' -P{instance.port} ",
args = {"conn_options": fr"-h{shlex.quote(str(instance.host))} -u{shlex.quote(str(instance.user))} \
-p'{shlex.quote(str(instance.password))}' -P{shlex.quote(str(instance.port))} ",
"stop_never": False,
"no-primary-key": no_pk,
"flashback": flashback,
Expand Down Expand Up @@ -190,7 +193,8 @@ def binlog2sql_file(args, user):
"""
binlog2sql = Binlog2Sql()
instance = args.get('instance')
conn_options = fr"-h{instance.host} -u{instance.user} -p'{instance.password}' -P{instance.port}"
conn_options = fr"-h{shlex.quote(str(instance.host))} -u{shlex.quote(str(instance.user))} \
-p'{shlex.quote(str(instance.password))}' -P{shlex.quote(str(instance.port))}"
args['conn_options'] = conn_options
timestamp = int(time.time())
path = os.path.join(settings.BASE_DIR, 'downloads/binlog2sql/')
Expand All @@ -208,3 +212,136 @@ def binlog2sql_file(args, user):
for c in iter(p.stdout.readline, ''):
f.write(c)
return user, filename


@permission_required('sql.menu_my2sql', raise_exception=True)
def my2sql(request):
"""
通过解析binlog获取SQL--使用my2sql
:param request:
:return:
"""
instance_name = request.POST.get('instance_name')
save_sql = True if request.POST.get('save_sql') == 'true' else False
instance = Instance.objects.get(instance_name=instance_name)
work_type = 'rollback' if request.POST.get('rollback') == 'true' else '2sql'
num = 30 if request.POST.get('num') == '' else int(request.POST.get('num'))
threads = 4 if request.POST.get('threads') == '' else int(request.POST.get('threads'))
start_file = request.POST.get('start_file')
start_pos = request.POST.get('start_pos') if request.POST.get('start_pos') == '' else int(
request.POST.get('start_pos'))
end_file = request.POST.get('end_file')
end_pos = request.POST.get('end_pos') if request.POST.get('end_pos') == '' else int(request.POST.get('end_pos'))
stop_time = request.POST.get('stop_time')
start_time = request.POST.get('start_time')
only_schemas = request.POST.getlist('only_schemas')
only_tables = request.POST.getlist('only_tables[]')
sql_type = [] if request.POST.getlist('sql_type[]') == [] else request.POST.getlist('sql_type[]')
extra_info = True if request.POST.get('extra_info') == 'true' else False
ignore_primary_key = True if request.POST.get('ignore_primary_key') == 'true' else False
full_columns = True if request.POST.get('full_columns') == 'true' else False
no_db_prefix = True if request.POST.get('no_db_prefix') == 'true' else False
file_per_table = True if request.POST.get('file_per_table') == 'true' else False

result = {'status': 0, 'msg': 'ok', 'data': []}

# 提交给my2sql进行解析
my2sql = My2SQL()

# 准备参数
args = {"conn_options": fr"-host {shlex.quote(str(instance.host))} -user {shlex.quote(str(instance.user))} \
-password '{shlex.quote(str(instance.password))}' -port {shlex.quote(str(instance.port))} ",
"work-type": work_type,
"start-file": start_file,
"start-pos": start_pos,
"stop-file": end_file,
"stop-pos": end_pos,
"start-datetime": start_time,
"stop-datetime": stop_time,
"databases": ' '.join(only_schemas),
"tables": ','.join(only_tables),
"sql": ','.join(sql_type),
"instance": instance,
"threads": threads,
"add-extraInfo": extra_info,
"ignore-primaryKey-forInsert": ignore_primary_key,
"full-columns": full_columns,
"do-not-add-prifixDb": no_db_prefix,
"file-per-table": file_per_table,
"output-toScreen": True
}

# 参数检查
args_check_result = my2sql.check_args(args)
if args_check_result['status'] == 1:
return HttpResponse(json.dumps(args_check_result), content_type='application/json')
# 参数转换
cmd_args = my2sql.generate_args2cmd(args, shell=True)

# 执行命令
try:
p = my2sql.execute_cmd(cmd_args, shell=True)
# 读取前num行后结束
rows = []
n = 1
for line in iter(p.stdout.readline, ''):
if n <= num and isinstance(line, str):
if line[0:6].upper() in ('INSERT', 'DELETE', 'UPDATE'):
n = n + 1
row_info = {}
try:
row_info['sql'] = line + ';'
except IndexError:
row_info['sql'] = line + ';'
rows.append(row_info)
else:
break

if rows.__len__() == 0:
# 判断是否有异常
stderr = p.stderr.read()
if stderr and isinstance(stderr, str):
result['status'] = 1
result['msg'] = stderr
return HttpResponse(json.dumps(result), content_type='application/json')
# 终止子进程
p.kill()
result['data'] = rows
except Exception as e:
logger.error(traceback.format_exc())
result['status'] = 1
result['msg'] = str(e)

# 异步保存到文件
if save_sql:
args.pop('conn_options')
args.pop('output-toScreen')
async_task(my2sql_file, args=args, user=request.user, hook=notify_for_my2sql, timeout=-1,
task_name=f'my2sql-{time.time()}')

# 返回查询结果
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type='application/json')


def my2sql_file(args, user):
"""
用于异步保存binlog解析的文件
:param args: 参数
:param user: 操作用户对象,用户消息推送
:return:
"""
my2sql = My2SQL()
instance = args.get('instance')
conn_options = fr"-host {shlex.quote(str(instance.host))} -user {shlex.quote(str(instance.user))} \
-password '{shlex.quote(str(instance.password))}' -port {shlex.quote(str(instance.port))} "
args['conn_options'] = conn_options
path = os.path.join(settings.BASE_DIR, 'downloads/my2sql/')
os.makedirs(path, exist_ok=True)

# 参数转换
args["output-dir"] = path
cmd_args = my2sql.generate_args2cmd(args, shell=True)
# 使用output-dir参数执行命令保存sql
my2sql.execute_cmd(cmd_args, shell=True)
return user, path
2 changes: 1 addition & 1 deletion sql/fixtures/auth_group.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ where codename in ('menu_dashboard','menu_sqlcheck','menu_sqlworkflow','menu_sql
insert into auth_group_permissions (group_id, permission_id)
select 3,id
from auth_permission
where codename in ('menu_dashboard','menu_sqlcheck','menu_sqlworkflow','menu_sqlanalyze','menu_query','menu_sqlquery','menu_queryapplylist','menu_sqloptimize','menu_sqladvisor','menu_slowquery','menu_instance','menu_instance_list','menu_dbdiagnostic','menu_database','menu_instance_account','menu_param','menu_data_dictionary','menu_tools','menu_archive','menu_binlog2sql','menu_schemasync','menu_system','menu_document','sql_submit','sql_review','sql_execute_for_resource_group','sql_execute','sql_analyze','optimize_sqladvisor','optimize_sqltuning','optimize_soar','query_applypriv','query_mgtpriv','query_review','query_submit','query_all_instances','query_resource_group_instance','process_view','process_kill','tablespace_view','trx_view','trxandlocks_view','instance_account_manage','param_view','param_edit','data_dictionary_export','archive_apply','archive_review','archive_mgt');
where codename in ('menu_dashboard','menu_sqlcheck','menu_sqlworkflow','menu_sqlanalyze','menu_query','menu_sqlquery','menu_queryapplylist','menu_sqloptimize','menu_sqladvisor','menu_slowquery','menu_instance','menu_instance_list','menu_dbdiagnostic','menu_database','menu_instance_account','menu_param','menu_data_dictionary','menu_tools','menu_archive','menu_binlog2sql','menu_my2sql','menu_schemasync','menu_system','menu_document','sql_submit','sql_review','sql_execute_for_resource_group','sql_execute','sql_analyze','optimize_sqladvisor','optimize_sqltuning','optimize_soar','query_applypriv','query_mgtpriv','query_review','query_submit','query_all_instances','query_resource_group_instance','process_view','process_kill','tablespace_view','trx_view','trxandlocks_view','instance_account_manage','param_view','param_edit','data_dictionary_export','archive_apply','archive_review','archive_mgt');

-- PM
insert into auth_group_permissions (group_id, permission_id)
Expand Down
1 change: 1 addition & 0 deletions sql/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ class Meta:
('menu_tools', '菜单 工具插件'),
('menu_archive', '菜单 数据归档'),
('menu_binlog2sql', '菜单 Binlog2SQL'),
('menu_my2sql', '菜单 My2SQL'),
('menu_schemasync', '菜单 SchemaSync'),
('menu_system', '菜单 系统管理'),
('menu_document', '菜单 相关文档'),
Expand Down
20 changes: 20 additions & 0 deletions sql/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,23 @@ def notify_for_binlog2sql(task):
# 发送
msg_to = [task.kwargs['user']]
__send(msg_title, msg_content, msg_to)


def notify_for_my2sql(task):
"""
my2sql执行结束的通知
:param task:
:return:
"""
# 判断是否开启消息通知,未开启直接返回
if not __notify_cnf_status():
return None
if task.success:
msg_title = '[Archery 通知]My2SQL执行结束'
msg_content = f'解析的SQL文件在{task.result[1]}目录下,请前往查看'
else:
msg_title = '[Archery 通知]My2SQL执行失败'
msg_content = f'{task.result}'
# 发送
msg_to = [task.kwargs['user']]
__send(msg_title, msg_content, msg_to)
52 changes: 52 additions & 0 deletions sql/plugins/my2sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: UTF-8 -*-
from common.config import SysConfig
from sql.plugins.plugin import Plugin
import shlex


class My2SQL(Plugin):

def __init__(self):
self.path = SysConfig().get('my2sql')
self.required_args = []
self.disable_args = []
super(Plugin, self).__init__()

def generate_args2cmd(self, args, shell):
nick2wang marked this conversation as resolved.
Show resolved Hide resolved
"""
转换请求参数为命令行
:param args:
:param shell:
:return:
"""
conn_options = ['conn_options']
args_options = ['work-type', 'threads', 'start-file', 'stop-file', 'start-pos',
'stop-pos', 'databases', 'tables', 'sql', 'output-dir']
no_args_options = ['output-toScreen', 'add-extraInfo', 'ignore-primaryKey-forInsert',
'full-columns', 'do-not-add-prifixDb', 'file-per-table']
datetime_options = ['start-datetime', 'stop-datetime']
if shell:
cmd_args = f'{shlex.quote(str(self.path))}' if self.path else ''
for name, value in args.items():
if name in conn_options:
cmd_args += f' {value}'
elif name in args_options and value:
cmd_args += f' -{name} {shlex.quote(str(value))}'
elif name in datetime_options and value:
cmd_args += f" -{name} '{shlex.quote(str(value))}'"
elif name in no_args_options and value:
cmd_args += f' -{name}'
else:
cmd_args = [self.path]
for name, value in args.items():
if name in conn_options:
cmd_args.append(f'{value}')
elif name in args_options:
cmd_args.append(f'-{name}')
cmd_args.append(f'{value}')
elif name in datetime_options:
cmd_args.append(f'-{name}')
cmd_args.append(f"'{value}'")
elif name in no_args_options:
cmd_args.append(f'-{name}')
return cmd_args
31 changes: 31 additions & 0 deletions sql/plugins/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from django.contrib.auth import get_user_model

from sql.plugins.binglog2sql import Binlog2Sql
from sql.plugins.my2sql import My2SQL
from sql.plugins.schemasync import SchemaSync
from sql.plugins.soar import Soar
from sql.plugins.sqladvisor import SQLAdvisor
Expand Down Expand Up @@ -199,6 +200,36 @@ def test_binlog2ql_generate_args2cmd(self):
cmd_args = binlog2sql.generate_args2cmd(args, True)
self.assertIsInstance(cmd_args, str)

def test_my2sql_generate_args2cmd(self):
"""
测试my2sql参数转换
:return:
"""
args = {'conn_options': "-host mysql -user root -password '123456' -port 3306 ",
'work-type': '2sql',
'start-file': 'mysql-bin.000043',
'start-pos': 111,
'stop-file': '',
'stop-pos': '',
'start-datetime': '',
'stop-datetime': '',
'databases': 'account_center',
'tables': 'ac_apps',
'sql': 'update',
"threads": 1,
"add-extraInfo": "false",
"ignore-primaryKey-forInsert": "false",
"full-columns": "false",
"do-not-add-prifixDb": "false",
"file-per-table": "false"}
self.sys_config.set('my2sql', '/opt/archery/src/plugins/my2sql')
self.sys_config.get_all_config()
my2sql = My2SQL()
cmd_args = my2sql.generate_args2cmd(args, False)
self.assertIsInstance(cmd_args, list)
cmd_args = my2sql.generate_args2cmd(args, True)
self.assertIsInstance(cmd_args, str)

def test_pt_archiver_generate_args2cmd(self):
"""
测试pt_archiver参数转换
Expand Down
Loading