Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

插件执行命令采用传参替代字符串拼接 #1840

Merged
merged 2 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions sql/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def archive(archive_id):
pt_archiver = PtArchiver()
# 准备参数
source = (
rf"h={s_ins.host},u={s_ins.user},p='{s_ins.password}',"
rf"h={s_ins.host},u={s_ins.user},p={s_ins.password},"
rf"P={s_ins.port},D={src_db_name},t={src_table_name},A={s_charset}"
)
args = {
Expand Down Expand Up @@ -387,13 +387,13 @@ def archive(archive_id):
if args_check_result["status"] == 1:
return JsonResponse(args_check_result)
# 参数转换
cmd_args = pt_archiver.generate_args2cmd(args, shell=True)
cmd_args = pt_archiver.generate_args2cmd(args)
# 执行命令,获取结果
select_cnt = 0
insert_cnt = 0
delete_cnt = 0
with FuncTimer() as t:
p = pt_archiver.execute_cmd(cmd_args, shell=True)
p = pt_archiver.execute_cmd(cmd_args)
stdout = ""
for line in iter(p.stdout.readline, ""):
if re.match(r"^SELECT\s(\d+)$", line, re.I):
Expand Down Expand Up @@ -442,11 +442,12 @@ def archive(archive_id):
update_fields=["last_archive_time"]
)
# 替换密码信息后保存
shell_cmd = " ".join(cmd_args)
ArchiveLog.objects.create(
archive=archive_info,
cmd=cmd_args.replace(s_ins.password, "***").replace(d_ins.password, "***")
cmd=shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***")
if mode == "dest"
else cmd_args.replace(s_ins.password, "***"),
else shell_cmd.replace(s_ins.password, "***"),
condition=condition,
mode=mode,
no_delete=no_delete,
Expand Down
37 changes: 21 additions & 16 deletions sql/binlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,21 @@ def my2sql(request):
my2sql = My2SQL()

# 准备参数
instance_password = shlex.quote(str(instance.password))
args = {
"conn_options": rf"-host {shlex.quote(str(instance.host))} -user {shlex.quote(str(instance.user))} \
-password ''{instance_password}'' -port {shlex.quote(str(instance.port))} ",
"host": instance.host,
"user": instance.user,
"password": instance.password,
"port": instance.port,
"work-type": work_type,
"start-file": start_file,
"start-pos": start_pos,
"stop-file": end_file,
"stop-pos": end_pos,
"start-datetime": '"' + start_time + '"',
"stop-datetime": '"' + stop_time + '"',
"start-datetime": start_time,
"stop-datetime": stop_time,
"databases": " ".join(only_schemas),
"tables": ",".join(only_tables),
"sql": ",".join(sql_type),
"instance": instance,
"threads": threads,
"add-extraInfo": extra_info,
"ignore-primaryKey-forInsert": ignore_primary_key,
Expand All @@ -169,11 +169,11 @@ def my2sql(request):
json.dumps(args_check_result), content_type="application/json"
)
# 参数转换
cmd_args = my2sql.generate_args2cmd(args, shell=True)
cmd_args = my2sql.generate_args2cmd(args)

# 执行命令
try:
p = my2sql.execute_cmd(cmd_args, shell=True)
p = my2sql.execute_cmd(cmd_args)
# 读取前num行后结束
rows = []
n = 1
Expand Down Expand Up @@ -207,7 +207,8 @@ def my2sql(request):

# 异步保存到文件
if save_sql:
args.pop("conn_options")
args.update({"instance": instance})
args.pop("password")
args.pop("output-toScreen")
async_task(
my2sql_file,
Expand All @@ -233,17 +234,21 @@ def my2sql_file(args, user):
:return:
"""
my2sql = My2SQL()
instance = args.get("instance")
instance_password = shlex.quote(f'"{str(instance.password)}"')
conn_options = rf"-host {shlex.quote(str(instance.host))} -user {shlex.quote(str(instance.user))} \
-password '{instance_password}' -port {shlex.quote(str(instance.port))} "
args["conn_options"] = conn_options
instance = args.pop("instance")
args.update(
{
"host": instance.host,
"user": instance.user,
"password": instance.password,
"port": instance.port,
}
)
path = os.path.join(settings.BASE_DIR, "downloads/my2sql/")
os.makedirs(path, exist_ok=True)

# 参数转换
args["output-dir"] = path
cmd_args = my2sql.generate_args2cmd(args, shell=True)
cmd_args = my2sql.generate_args2cmd(args)
# 使用output-dir参数执行命令保存sql
my2sql.execute_cmd(cmd_args, shell=True)
my2sql.execute_cmd(cmd_args)
return user, path
28 changes: 6 additions & 22 deletions sql/instance.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# -*- coding: UTF-8 -*-
import shlex

import MySQLdb
import os
Expand All @@ -11,7 +10,6 @@
from django.http import HttpResponse
from django.views.decorators.cache import cache_page

from common.config import SysConfig
from common.utils.extend_json_encoder import ExtendJSONEncoder
from common.utils.convert import Convert
from sql.engines import get_engine
Expand Down Expand Up @@ -225,36 +223,22 @@ def schemasync(request):
target_db_name = "*"

# 取出该实例的连接方式
instance_info = Instance.objects.get(instance_name=instance_name)
target_instance_info = Instance.objects.get(instance_name=target_instance_name)
instance = Instance.objects.get(instance_name=instance_name)
target_instance = Instance.objects.get(instance_name=target_instance_name)

# 提交给SchemaSync获取对比结果
schema_sync = SchemaSync()
# 准备参数
tag = int(time.time())
output_directory = os.path.join(settings.BASE_DIR, "downloads/schemasync/")
os.makedirs(output_directory, exist_ok=True)
db_name = shlex.quote(db_name)
target_db_name = shlex.quote(target_db_name)
args = {
"sync-auto-inc": sync_auto_inc,
"sync-comments": sync_comments,
"tag": tag,
"output-directory": output_directory,
"source": r"mysql://{user}:{pwd}@{host}:{port}/{database}".format(
user=shlex.quote(str(instance_info.user)),
pwd=shlex.quote(str(instance_info.password)),
host=shlex.quote(str(instance_info.host)),
port=shlex.quote(str(instance_info.port)),
database=db_name,
),
"target": r"mysql://{user}:{pwd}@{host}:{port}/{database}".format(
user=shlex.quote(str(target_instance_info.user)),
pwd=shlex.quote(str(target_instance_info.password)),
host=shlex.quote(str(target_instance_info.host)),
port=shlex.quote(str(target_instance_info.port)),
database=target_db_name,
),
"source": f"mysql://{instance.user}:{instance.password}@{instance.host}:{instance.port}/{db_name}",
"target": f"mysql://{target_instance.user}:{target_instance.password}@{target_instance.host}:{target_instance.port}/{target_db_name}",
}
# 参数检查
args_check_result = schema_sync.check_args(args)
Expand All @@ -263,10 +247,10 @@ def schemasync(request):
json.dumps(args_check_result), content_type="application/json"
)
# 参数转换
cmd_args = schema_sync.generate_args2cmd(args, shell=True)
cmd_args = schema_sync.generate_args2cmd(args)
# 执行命令
try:
stdout, stderr = schema_sync.execute_cmd(cmd_args, shell=True).communicate()
stdout, stderr = schema_sync.execute_cmd(cmd_args).communicate()
diff_stdout = f"{stdout}{stderr}"
except RuntimeError as e:
diff_stdout = str(e)
Expand Down
56 changes: 0 additions & 56 deletions sql/plugins/my2sql.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# -*- coding: UTF-8 -*-
from common.config import SysConfig
from sql.plugins.plugin import Plugin
import shlex


class My2SQL(Plugin):
Expand All @@ -10,58 +9,3 @@ def __init__(self):
self.required_args = []
self.disable_args = []
super(Plugin, self).__init__()

def generate_args2cmd(self, args, shell):
"""
转换请求参数为命令行
:param args:
:param shell:
:return:
"""
conn_options = ["conn_options"]
args_options = [
"work-type",
"threads",
"start-file",
"stop-file",
"start-pos",
"stop-pos",
"databases",
"tables",
"sql",
"output-dir",
]
no_args_options = [
"output-toScreen",
"add-extraInfo",
"ignore-primaryKey-forInsert",
"full-columns",
"do-not-add-prifixDb",
"file-per-table",
]
datetime_options = ["start-datetime", "stop-datetime"]
if shell:
cmd_args = f"{shlex.quote(str(self.path))}" if self.path else ""
for name, value in args.items():
if name in conn_options:
cmd_args += f" {value}"
elif name in args_options and value:
cmd_args += f" -{name} {shlex.quote(str(value))}"
elif name in datetime_options and value:
cmd_args += f" -{name} '{shlex.quote(str(value))}'"
elif name in no_args_options and value:
cmd_args += f" -{name}"
else:
cmd_args = [self.path]
for name, value in args.items():
if name in conn_options:
cmd_args.append(f"{value}")
elif name in args_options:
cmd_args.append(f"-{name}")
cmd_args.append(f"{value}")
elif name in datetime_options:
cmd_args.append(f"-{name}")
cmd_args.append(f"'{value}'")
elif name in no_args_options:
cmd_args.append(f"-{name}")
return cmd_args
14 changes: 11 additions & 3 deletions sql/plugins/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,30 @@ def check_args(self, args):
}
return args_check_result

def generate_args2cmd(self, args, shell):
def generate_args2cmd(self, args):
"""
将请求参数转换为命令行参数
:return:
"""
cmd_args = [self.path]
for arg, value in args.items():
if not value:
continue
cmd_args.append(f"-{arg}")
if not isinstance(value, bool):
cmd_args.append(f"{value}")
return cmd_args

@staticmethod
def execute_cmd(cmd_args, shell):
def execute_cmd(cmd_args):
"""
执行命令并且返回process
:return:
"""
try:
p = subprocess.Popen(
cmd_args,
shell=shell,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
Expand Down
50 changes: 9 additions & 41 deletions sql/plugins/pt_archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,16 @@ def __init__(self):
self.disable_args = ["analyze"]
super(Plugin, self).__init__()

def generate_args2cmd(self, args, shell):
def generate_args2cmd(self, args):
"""
转换请求参数为命令行
:param args:
:param shell:
将请求参数转换为命令行参数
:return:
"""
k_options = [
"no-version-check",
"statistics",
"bulk-insert",
"bulk-delete",
"purge",
"no-delete",
]
kv_options = [
"source",
"dest",
"file",
"where",
"progress",
"charset",
"limit",
"txn-size",
"sleep",
]
if shell:
cmd_args = self.path if self.path else ""
for name, value in args.items():
if name in k_options and value:
cmd_args += f" --{name}"
elif name in kv_options:
if name == "where":
cmd_args += f' --{name} "{value}"'
else:
cmd_args += f" --{name} {value}"
else:
cmd_args = [self.path]
for name, value in args.items():
if name in k_options and value:
cmd_args.append(f"--{name}")
elif name in kv_options:
cmd_args.append(f"--{name}")
cmd_args.append(f"{value}")
cmd_args = [self.path]
for arg, value in args.items():
if not value:
continue
cmd_args.append(f"--{arg}")
if not isinstance(value, bool):
cmd_args.append(f"{value}")
return cmd_args
Loading