Skip to content

Commit

Permalink
Merge branch 'V3.7' of https://github.com/goodrain/rainbond
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoujunhaowzh committed Oct 12, 2018
2 parents ce2e7eb + 7631beb commit 01e44f7
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 16 deletions.
1 change: 0 additions & 1 deletion db/mysql/dao/event.go
Expand Up @@ -150,7 +150,6 @@ func (c *NotificationEventDaoImpl) AddModel(mo model.Interface) error {
return err
}
} else {
logrus.Infoln("event result is exist")
return c.UpdateModel(mo)
}
return nil
Expand Down
24 changes: 12 additions & 12 deletions grctl/cmd/service.go
Expand Up @@ -20,7 +20,6 @@ package cmd

import (
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"net/url"
Expand All @@ -35,6 +34,7 @@ import (
"github.com/apcera/termtables"
"github.com/goodrain/rainbond/api/util"
"github.com/goodrain/rainbond/cmd/grctl/option"
eventdb "github.com/goodrain/rainbond/eventlog/db"
"github.com/goodrain/rainbond/grctl/clients"
coreutil "github.com/goodrain/rainbond/util"
"github.com/gorilla/websocket"
Expand Down Expand Up @@ -138,7 +138,7 @@ func NewCmdService() cli.Command {
return c
}

func GetEventLogf(eventID, server string) {
func GetEventLogf(eventID, server string) error {

//if c.String("event_log_server") != "" {
// server = c.String("event_log_server")
Expand All @@ -148,7 +148,7 @@ func GetEventLogf(eventID, server string) {
con, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
logrus.Errorf("dial websocket endpoint %s error. %s", u.String(), err.Error())
//return err
return err
}
defer con.Close()

Expand All @@ -158,7 +158,7 @@ func GetEventLogf(eventID, server string) {
_, message, err := con.ReadMessage()
if err != nil {
logrus.Println("read proxy websocket message error: ", err)
return
return err
}
time := gjson.GetBytes(message, "time").String()
m := gjson.GetBytes(message, "message").String()
Expand Down Expand Up @@ -197,15 +197,15 @@ func getEventLog(c *cli.Context) error {
fmt.Printf("[%s](%s) %s \n", strings.ToUpper(level), time, m)
}
} else {
ts := c.Args().Get(1)
tas := strings.Split(ts, "/")
dl, err := clients.RegionClient.Tenants(tas[0]).Services(tas[1]).EventLog(eventID, "debug")
logdb := &eventdb.EventFilePlugin{
HomePath: "/grdata/logs/",
}
list, err := logdb.GetMessages(eventID, "debug")
if err != nil {
return err
}
for _, v := range dl {
aa, _ := json.Marshal(v)
fmt.Println(string(aa))
for _, l := range list {
fmt.Println(l.Time + ":" + l.Message)
}
}
return nil
Expand All @@ -231,7 +231,7 @@ func stopTenantService(c *cli.Context) error {
if c.String("event_log_server") != "" {
server = c.String("event_log_server")
}
GetEventLogf(eventID, server)
return GetEventLogf(eventID, server)
}
if err != nil {
logrus.Error("停止应用失败:" + err.Error())
Expand Down Expand Up @@ -265,7 +265,7 @@ func startService(c *cli.Context) error {
if c.String("event_log_server") != "" {
server = c.String("event_log_server")
}
GetEventLogf(eventID, server)
return GetEventLogf(eventID, server)
}
//err = region.StopService(service["service_id"].(string), service["deploy_version"].(string))
if err != nil {
Expand Down
27 changes: 25 additions & 2 deletions worker/appm/manager.go
Expand Up @@ -102,8 +102,8 @@ func NewManager(conf option.Config, statusManager *client.AppRuntimeSyncClient)
}
cacheManager := NewCacheManager()
return &manager{kubeclient: clientset, conf: conf,
dbmanager: db.GetManager(),
statusCache: cacheManager,
dbmanager: db.GetManager(),
statusCache: cacheManager,
statusManager: statusManager,
}, nil
}
Expand Down Expand Up @@ -266,3 +266,26 @@ func (m *manager) SyncData() {
//step2 :同步tenant_service_pod
//TODO:
}

func DeletePods(m *manager, service *model.TenantServices, logger event.Logger) error {
podList, err := m.kubeclient.Pods(service.ServiceID).List(metav1.ListOptions{LabelSelector: fmt.Sprintf("name=%s,creator=%s,version=%s", service.ServiceAlias, "RainBond", service.DeployVersion)})
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Error("get service pods error.", err.Error())
logger.Error("从集群中查询该应用的pod失败", map[string]string{"step": "worker-appm", "status": "error"})
return err
}
}

for _, v := range podList.Items {
err := m.kubeclient.Pods(service.ServiceID).Delete(v.Name, &metav1.DeleteOptions{});
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Error("delete service pod error.", err.Error())
logger.Error("从集群中删除应用的pod失败", map[string]string{"step": "worker-appm", "status": "error"})
}
}
}
logger.Info("根据资源标签移除残留的pod资源完成", map[string]string{"step": "worker-appm", "status": "starting"})
return nil
}
27 changes: 27 additions & 0 deletions worker/appm/manager_deployment.go
Expand Up @@ -195,6 +195,33 @@ func (m *manager) StopDeployment(serviceID string, logger event.Logger) error {
}
}
}

//清理集群内可能遗留的资源
deletePodsErr := DeletePods(m, service, logger);
if deletePodsErr != nil {
return deletePodsErr
}
rcList, err := m.kubeclient.AppsV1beta1().Deployments(service.ServiceID).List(metav1.ListOptions{LabelSelector: fmt.Sprintf("name=%s,creator=%s,version=%s", service.ServiceAlias, "RainBond", service.DeployVersion)})
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Error("get service Deployments error.", err.Error())
logger.Error("从集群中查询该应用的Deployments失败", map[string]string{"step": "worker-appm", "status": "error"})
return err
}
}
for _, v := range rcList.Items {
err := m.kubeclient.AppsV1beta1().Deployments(service.ServiceID).Delete(v.Name, &metav1.DeleteOptions{});
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Error("delete service Deployments error.", err.Error())
logger.Error("从集群中删除应用的Deployments失败", map[string]string{"step": "worker-appm", "status": "error"})
return err
}
}
}

logger.Info("根据资源标签移除残留的Deployments资源完成", map[string]string{"step": "worker-appm", "status": "starting"})

return nil
}

Expand Down
26 changes: 26 additions & 0 deletions worker/appm/manager_rc.go
Expand Up @@ -207,6 +207,32 @@ func (m *manager) StopReplicationController(serviceID string, logger event.Logge
}
}
logger.Info("移除残留的Pod实例完成", map[string]string{"step": "worker-appm", "status": "starting"})

//清理集群内可能遗留的资源
deletePodsErr := DeletePods(m, service, logger);
if deletePodsErr != nil {
return deletePodsErr
}
rcList, err := m.kubeclient.ReplicationControllers(service.ServiceID).List(metav1.ListOptions{LabelSelector: fmt.Sprintf("name=%s,creator=%s,version=%s", service.ServiceAlias, "RainBond", service.DeployVersion)})
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Error("get service ReplicationController error.", err.Error())
logger.Error("从集群中查询该应用的ReplicationController失败", map[string]string{"step": "worker-appm", "status": "error"})
return err
}
}
for _, v := range rcList.Items {
err := m.kubeclient.ReplicationControllers(service.ServiceID).Delete(v.Name, &metav1.DeleteOptions{});
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Error("delete service ReplicationController error.", err.Error())
logger.Error("从集群中删除应用的ReplicationController失败", map[string]string{"step": "worker-appm", "status": "error"})
return err
}
}
}

logger.Info("根据资源标签移除残留的rc资源完成", map[string]string{"step": "worker-appm", "status": "starting"})
return nil
}

Expand Down
27 changes: 27 additions & 0 deletions worker/appm/manager_statefulset.go
Expand Up @@ -204,6 +204,33 @@ func (m *manager) StopStatefulSet(serviceID string, logger event.Logger) error {
}
}
}

//清理集群内可能遗留的资源
deletePodsErr := DeletePods(m, service, logger);
if deletePodsErr != nil {
return deletePodsErr
}
rcList, err := m.kubeclient.AppsV1beta1().StatefulSets(service.ServiceID).List(metav1.ListOptions{LabelSelector: fmt.Sprintf("name=%s,creator=%s,version=%s", service.ServiceAlias, "RainBond", service.DeployVersion)})
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Error("get service StatefulSets error.", err.Error())
logger.Error("从集群中查询该应用的StatefulSets失败", map[string]string{"step": "worker-appm", "status": "error"})
return err
}
}
for _, v := range rcList.Items {
err := m.kubeclient.AppsV1beta1().StatefulSets(service.ServiceID).Delete(v.Name, &metav1.DeleteOptions{});
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Error("delete service StatefulSets error.", err.Error())
logger.Error("从集群中删除应用的StatefulSets失败", map[string]string{"step": "worker-appm", "status": "error"})
return err
}
}
}

logger.Info("根据资源标签移除残留的StatefulSets资源完成", map[string]string{"step": "worker-appm", "status": "starting"})

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion worker/handle/manager.go
Expand Up @@ -22,8 +22,8 @@ import (
"context"
"time"

"github.com/goodrain/rainbond/cmd/worker/option"
status "github.com/goodrain/rainbond/appruntimesync/client"
"github.com/goodrain/rainbond/cmd/worker/option"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/event"
"github.com/goodrain/rainbond/worker/discover/model"
Expand Down Expand Up @@ -151,6 +151,7 @@ func (m *Manager) stopExec(task *model.Task) int {
}
if curStatus == status.CLOSED {
logger.Info("应用已关闭,请勿重复操作", map[string]string{"step": "last", "status": "success"})
db.GetManager().K8sDeployReplicationDao().DeleteK8sDeployReplicationByService(body.ServiceID)
event.GetManager().ReleaseLogger(logger)
return 0
}
Expand Down

0 comments on commit 01e44f7

Please sign in to comment.