Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2348 #2349

Closed
wants to merge 10 commits into from
13 changes: 8 additions & 5 deletions sqle/driver/plugin_adapter_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
sqlDriver "database/sql/driver"
"errors"
"fmt"
"os"
"sync"

driverV2 "github.com/actiontech/sqle/sqle/driver/v2"
Expand All @@ -18,11 +19,12 @@ import (
)

type PluginProcessorV2 struct {
cfg func(cmdBase string, cmdArgs []string) *goPlugin.ClientConfig
cmdBase string
cmdArgs []string
client *goPlugin.Client
meta *driverV2.DriverMetas
cfg func(cmdBase string, cmdArgs []string) *goPlugin.ClientConfig
cmdBase string
cmdArgs []string
client *goPlugin.Client
meta *driverV2.DriverMetas
pluginPidFilePath string
sync.Mutex
}

Expand Down Expand Up @@ -142,6 +144,7 @@ func (d *PluginProcessorV2) Stop() error {
if d.client != nil {
d.client.Kill()
}
os.Remove(d.pluginPidFilePath)
d.Unlock()
return nil
}
Expand Down
111 changes: 108 additions & 3 deletions sqle/driver/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

"github.com/actiontech/sqle/sqle/config"
Expand Down Expand Up @@ -179,6 +181,19 @@ func (pm *pluginManager) Start(pluginDir string, pluginConfigList []config.Plugi

// register plugin
for _, p := range plugins {

pluginPidFilePath := filepath.Join(pluginDir, "pidfile", p.Name()+".pid")
process, err := GetProcessByPidFile(pluginPidFilePath)
if err != nil {
log.NewEntry().Warnf("get plugin %s process failed, error: %v", pluginPidFilePath, err)
}
if process != nil {
err = KillProcess(process)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

极端情况下如果有多个插件需要kill,每个插件等待2秒,会拖慢sqle的启动速度,建议做成多个插件并发kill。参考使用waitgroup

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已改用waitgroup形式

if err != nil {
log.NewEntry().Warnf("kill plugin process [%v] failed, error: %v", process.Pid, err)
}
}

cmdBase := filepath.Join(pluginDir, p.Name())
cmdArgs := make([]string, 0)

Expand All @@ -197,17 +212,20 @@ func (pm *pluginManager) Start(pluginDir string, pluginConfigList []config.Plugi
}

client := goPlugin.NewClient(getClientConfig(cmdBase, cmdArgs))
_, err := client.Client()
_, err = client.Client()
if err != nil {
return fmt.Errorf("plugin %v failed to start, error: %v Please check the sqled.log for more details", p.Name(), err)
}

err = WritePidFile(pluginPidFilePath, int64(client.ReattachConfig().Pid))
if err != nil {
return fmt.Errorf("write plugin %s pid file failed, error: %v", pluginPidFilePath, err)
}
var pp PluginProcessor
switch client.NegotiatedVersion() {
case driverV1.ProtocolVersion:
pp = &PluginProcessorV1{cfg: getClientConfig, cmdBase: cmdBase, cmdArgs: cmdArgs, client: client}
case driverV2.ProtocolVersion:
pp = &PluginProcessorV2{cfg: getClientConfig, cmdBase: cmdBase, cmdArgs: cmdArgs, client: client}
pp = &PluginProcessorV2{cfg: getClientConfig, cmdBase: cmdBase, cmdArgs: cmdArgs, client: client, pluginPidFilePath: pluginPidFilePath}
}
if err := pm.register(pp); err != nil {
stopErr := pp.Stop()
Expand Down Expand Up @@ -243,3 +261,90 @@ func (pm *pluginManager) OpenPlugin(l *logrus.Entry, pluginName string, cfg *dri
}
return pm.pluginProcessors[pluginName].Open(l, cfg)
}

// 根据pid文件获取进程信息
func GetProcessByPidFile(pluginPidFile string) (*os.Process, error) {
if _, err := os.Stat(pluginPidFile); err != nil {
if !os.IsNotExist(err) {
return nil, err
}
} else {
pidContent, err := os.ReadFile(pluginPidFile)
if err != nil {
return nil, err
}
if len(pidContent) == 0 {
return nil, nil
}
pid, err := strconv.Atoi(string(pidContent))
if err != nil {
return nil, err
}
// 获取进程
process, err := GetProcessByPid(pid)
if err != nil {
return nil, err
}
return process, nil
}
return nil, nil
}

// 根据pid获取进程信息,若进程已退出则返回nil
func GetProcessByPid(pid int) (*os.Process, error) {
process, err := os.FindProcess(pid)
if err != nil {
return nil, err
}
// 检查进程是否存在的方式
err = process.Signal(syscall.Signal(0))
if err != nil {
if errors.Is(err, os.ErrProcessDone) {
return nil, nil
}
return nil, err
}
return process, nil
}

// 退出进程
func KillProcess(process *os.Process) error {
doneChan := time.NewTicker(2 * time.Second)
defer doneChan.Stop()
for {
select {
case <-doneChan.C:
log.NewEntry().Warnf("stop plugin process [%v] failed, just kill it ", process.Pid)
err := process.Kill()
if err != nil {
return err
}
return nil
default:
err := process.Signal(syscall.SIGTERM)
if errors.Is(err, os.ErrProcessDone) {
return nil
}
}
}
}

func WritePidFile(pidFilePath string, pid int64) error {
_, err := os.Stat(pidFilePath)
if os.IsNotExist(err) {
if err := os.MkdirAll(filepath.Dir(pidFilePath), 0644); err != nil {
return err
}
}
file, err := os.OpenFile(pidFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer file.Close()

_, err = fmt.Fprintf(file, "%d", pid)
if err != nil {
return err
}
return nil
}
Loading