Skip to content

Commit

Permalink
Merge pull request #58 from xiaomuqiao/fix
Browse files Browse the repository at this point in the history
 Adjust name in resource;Reorganize and optimize log output
  • Loading branch information
Superxi911 committed Nov 17, 2016
2 parents fa33a3d + 1ac7e5b commit d64a57b
Show file tree
Hide file tree
Showing 15 changed files with 166 additions and 68 deletions.
2 changes: 1 addition & 1 deletion api/rest/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
var (
// remoteManager is remote api manager.
remoteManager *remote.Manager
resourceManager *resource.ResourceManager
resourceManager *resource.Manager
)

// Initialize initializes rest endpoints and all Cyclone managers. It register REST
Expand Down
4 changes: 2 additions & 2 deletions api/rest/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func generateVersionFromPRData(payload api.WebhookGithub) (name string, descript

// Generate name.
if nil != pullRequest["number"] {
name = fmt.Sprintf("pr#%v_%s", pullRequest["number"], commitID)
name = fmt.Sprintf("pr_%v_%s", pullRequest["number"], commitID)
} else {
name = fmt.Sprintf("pr_%s", commitID)
}
Expand Down Expand Up @@ -649,7 +649,7 @@ func generateVersionFromGitlabPRData(payload api.WebhookGitlab) (name string, de

// generate name
if nil != attributes["id"] {
name = fmt.Sprintf("pr#%v_%s", attributes["id"], commitID)
name = fmt.Sprintf("pr_%v_%s", attributes["id"], commitID)
} else {
name = fmt.Sprintf("pr_%s", commitID)
}
Expand Down
21 changes: 18 additions & 3 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func handleEvent(event *api.Event) error {

// postHookEvent is the event finished post hook.
func postHookEvent(event *api.Event) {
mapOperation[event.Operation].PostHook(event)

w, err := LoadWorker(event)
if err != nil {
log.Errorf("load worker err: %v", err)
Expand Down Expand Up @@ -111,8 +113,6 @@ func postHookEvent(event *api.Event) {
log.Errorf("release worker node resource err: %v", err)
}
}

mapOperation[event.Operation].PostHook(event)
}

// createServiceHander is the create service handler.
Expand Down Expand Up @@ -159,7 +159,22 @@ func createVersionHandler(event *api.Event) error {
return err
}

return w.DoWork(event)
err = w.DoWork(event)
if err != nil {
return err
}

if event.Service.Repository.Webhook == api.GITHUB {
remote, err := remoteManager.FindRemote(event.Service.Repository.Webhook)
if err != nil {
log.ErrorWithFields("Unable to get remote according coderepository", log.Fields{"user_id": event.Service.UserID})
} else {
if err = remote.PostCommitStatus(&event.Service, &event.Version); err != nil {
log.Errorf("Unable to post commit status to github: %v", err)
}
}
}
return nil
}

// createVersionPostHook is the create version post hook.
Expand Down
20 changes: 11 additions & 9 deletions event/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/caicloud/cyclone/etcd"
"github.com/caicloud/cyclone/pkg/log"
"github.com/caicloud/cyclone/remote"
"github.com/caicloud/cyclone/resource"
"golang.org/x/net/context"
)

Expand All @@ -43,6 +44,8 @@ var (

// remote api manager
remoteManager *remote.Manager

resourceManager *resource.Manager
)

// Init init event manager
Expand Down Expand Up @@ -74,6 +77,7 @@ func Init(certPath string, registry api.RegistryCompose) {
go handlePendingEvents()

remoteManager = remote.NewManager()
resourceManager = resource.NewManager()
}

// watchEtcd watch unfinished events status change in etcd
Expand Down Expand Up @@ -232,16 +236,14 @@ func (el *List) loadListFromEtcd(etcd *etcd.Client) {
event, err := loadEventFromJSON(jsonEvent)
if err != nil {
log.Errorf("load event from etcd err: %v", err)
return
continue
}
log.Infof("load event to list: %s", event.EventID)
w, err := LoadWorker(&event)
if err != nil {
log.Errorf("load worker from event err: %v", err)
return
if event.Status == api.EventStatusPending {
el.addUnfinshedEvent(&event)
} else {
go CheckWorkerTimeOut(event)
}
go CheckWorkerTimeOut(event, w)
el.addUnfinshedEvent(&event)
}
}

Expand Down Expand Up @@ -359,13 +361,13 @@ func handlePendingEvents() {
event := *pendingEvents.GetFront()
err := handleEvent(&event)
if err != nil {
if err == Err_Unable_Support {
if err == resource.ErrUnableSupport {
log.Info("Waiting for resource to be relaesed...")
time.Sleep(time.Second * 10)
continue
}
// worker busy
if err == Err_Worker_Busy {
if err == ErrWorkerBusy {
log.Info("All system worker are busy, wait for 10 seconds")
time.Sleep(time.Second * 10)
continue
Expand Down
11 changes: 4 additions & 7 deletions event/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/caicloud/cyclone/docker"
"github.com/caicloud/cyclone/pkg/log"
"github.com/caicloud/cyclone/pkg/osutil"
"github.com/caicloud/cyclone/resource"
"github.com/caicloud/cyclone/store"
docker_client "github.com/fsouza/go-dockerclient"
)
Expand Down Expand Up @@ -63,9 +62,7 @@ const (
)

var (
Err_Worker_Busy = errors.New("Get worker docker host busy")
Err_Unable_Support = errors.New("Unable to support the request resource")
resourceManager = resource.NewManager()
ErrWorkerBusy = errors.New("Get worker docker host busy")
)

// RegistryCompose that compose the info about the registry
Expand Down Expand Up @@ -137,7 +134,7 @@ func GetWorkerDockerHost(event *api.Event) (string, error) {
}
if len(workerNodes) == 0 {
log.Errorf("Get worker docker host busy")
return "", Err_Worker_Busy
return "", ErrWorkerBusy
}

err = resourceManager.ApplyResource(event)
Expand Down Expand Up @@ -188,7 +185,7 @@ func (w *Worker) DoWork(event *api.Event) (err error) {
event.WorkerInfo.DueTime = time.Now().Add(time.Duration(WORKER_TIMEOUT))
err = SaveEventToEtcd(event)
log.Infof("save event worker info: %s, %v", w.containerID, err)
go CheckWorkerTimeOut(*event, w)
go CheckWorkerTimeOut(*event)
return nil
}

Expand All @@ -209,7 +206,7 @@ func (w *Worker) Fire() error {
}

// CheckWorkerTimeOut ensures that the events are not timed out.
func CheckWorkerTimeOut(e api.Event, w *Worker) {
func CheckWorkerTimeOut(e api.Event) {
var eventCopy api.Event
eventCopy = e
event := &eventCopy
Expand Down
27 changes: 13 additions & 14 deletions resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package resource

import (
"errors"
"fmt"
"sync"

Expand All @@ -26,7 +27,7 @@ import (
"github.com/caicloud/cyclone/store"
)

type ResourceManager struct {
type Manager struct {
memoryuser float64
memorycontainer float64
cpuuser float64
Expand All @@ -37,18 +38,19 @@ type ResourceManager struct {
}

var (
resourceManager *ResourceManager
ErrUnableSupport = errors.New("Unable to support the request resource")
resourceManager *Manager
)

// NewManager creates a new resource manager with default resource.
func NewManager() *ResourceManager {
func NewManager() *Manager {
if resourceManager == nil {
memoryuser := osutil.GetFloat64Env("MEMORYFORUSER", 4294967296.0) //4G
cpuuser := osutil.GetFloat64Env("CPUFORUSER", 4096.0)
memorycontainer := osutil.GetFloat64Env("MEMORYFORCONTAINER", 536870912.0) //512M
cpucontainer := osutil.GetFloat64Env("CPUFORCONTAINER", 512.0)

resourceManager = &ResourceManager{
resourceManager = &Manager{
memoryuser: memoryuser,
memorycontainer: memorycontainer,
cpuuser: cpuuser,
Expand All @@ -59,7 +61,7 @@ func NewManager() *ResourceManager {
}

// ApplyResource uses to apply resource for container.
func (resm *ResourceManager) ApplyResource(event *api.Event) error {
func (resm *Manager) ApplyResource(event *api.Event) error {
resm.lock.Lock()
defer resm.lock.Unlock()

Expand All @@ -83,7 +85,6 @@ func (resm *ResourceManager) ApplyResource(event *api.Event) error {
if resource.TotalResource.Memory < event.Version.BuildResource.Memory ||
resource.TotalResource.CPU < event.Version.BuildResource.CPU {
errResource := fmt.Errorf("the total resource < the request resource")
//steplog.InsertStepLog(event, steplog.ApplyResource, steplog.Stop, errResource)
log.Errorf("Unable to support the request resource %+v, because > the total resource", event.Service.UserID)
return errResource
}
Expand All @@ -103,16 +104,14 @@ func (resm *ResourceManager) ApplyResource(event *api.Event) error {
if resource.TotalResource.Memory < event.Version.BuildResource.Memory ||
resource.TotalResource.CPU < event.Version.BuildResource.CPU {
errResource := fmt.Errorf("the total resource < the request resource")
//steplog.InsertStepLog(event, steplog.ApplyResource, steplog.Stop, errResource)
log.Errorf("Unable to support the request resource %+v, because > the total resource", event.Service.UserID)
return errResource
}

if resource.LeftResource.Memory < event.Version.BuildResource.Memory ||
resource.LeftResource.CPU < event.Version.BuildResource.CPU {
errResource := fmt.Errorf("Unable to support the request resource")
log.Infof("Unable to support the request resource %+v", event.Service.UserID)
return errResource
return ErrUnableSupport
}

resource.LeftResource.Memory = resource.LeftResource.Memory - event.Version.BuildResource.Memory
Expand All @@ -129,7 +128,7 @@ func (resm *ResourceManager) ApplyResource(event *api.Event) error {
}

// ReleaseResource uses to add resource into db.
func (resm *ResourceManager) ReleaseResource(event *api.Event) error {
func (resm *Manager) ReleaseResource(event *api.Event) error {
ds := store.NewStore()
defer ds.Close()
resource, err := ds.FindResourceByID(event.Service.UserID)
Expand All @@ -148,21 +147,21 @@ func (resm *ResourceManager) ReleaseResource(event *api.Event) error {
}

// GetMemorycontainer func that get the default memory for container.
func (resm *ResourceManager) GetMemorycontainer() float64 {
func (resm *Manager) GetMemorycontainer() float64 {
return resm.memorycontainer
}

// GetCpucontainer func that get the default cpu for container.
func (resm *ResourceManager) GetCpucontainer() float64 {
func (resm *Manager) GetCpucontainer() float64 {
return resm.cpucontainer
}

// GetMemoryuser func that get the default memory for user.
func (resm *ResourceManager) GetMemoryuser() float64 {
func (resm *Manager) GetMemoryuser() float64 {
return resm.memoryuser
}

// GetCpuuser func that get the default cpu for user.
func (resm *ResourceManager) GetCpuuser() float64 {
func (resm *Manager) GetCpuuser() float64 {
return resm.cpuuser
}
14 changes: 9 additions & 5 deletions vendor/github.com/fsouza/go-dockerclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 42 additions & 8 deletions websocket/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"github.com/satori/go.uuid"
)

const (
DOCKER_IMAGE_LOG_FLAG = "layer"
)

//AnalysisMessage analysis message receive from the web client.
func AnalysisMessage(dp *DataPacket) bool {
sReceiveFrom := dp.GetReceiveFrom()
Expand Down Expand Up @@ -174,16 +178,46 @@ func PushTopic(wss *WSSession, pWatchLog *WatchLogPacket) {
}
}

byrLog := PacketPushLog(pWatchLog.Api, pWatchLog.UserId,
pWatchLog.ServiceId, pWatchLog.VersionId, string(msg.Value),
uuid.NewV4().String())
dpPacket := &DataPacket{
byrFrame: byrLog,
nFrameLen: len(byrLog),
sSendTo: wss.sSessionID,
str := string(msg.Value)
array := strings.Split(str, "\n")
for _, arr := range array {
if arr != "\r" && arr != "" {
if isDockerImageOperationLog(arr) {
// In order to achieve overlapping the log according to the same layer id,
// so extracted the layer id from the log into the ID section
// in the websockect package, then the UI received the webpacket can overlap
// the log according to the ID.
tmpss := strings.Split(arr, ":")
tmps := strings.Split(tmpss[0], " ")
tmp := tmps[1]
byrLog := PacketPushLog(pWatchLog.Api, pWatchLog.UserId,
pWatchLog.ServiceId, pWatchLog.VersionId, arr[6:], tmp)
dpPacket := &DataPacket{
byrFrame: byrLog,
nFrameLen: len(byrLog),
sSendTo: wss.sSessionID,
}
wss.Send(dpPacket)
} else {
number := uuid.NewV4().String()
byrLog := PacketPushLog(pWatchLog.Api, pWatchLog.UserId,
pWatchLog.ServiceId, pWatchLog.VersionId, arr,
number)
dpPacket := &DataPacket{
byrFrame: byrLog,
nFrameLen: len(byrLog),
sSendTo: wss.sSessionID,
}
wss.Send(dpPacket)
}
}
}
wss.Send(dpPacket)
time.Sleep(time.Millisecond * 100)
}
log.Infof("stop push %s to %s", sTopic, wss.GetSessionID())
}

// isDockerImageOperationLog check the log whether is the log of pulling or pushing docker image.
func isDockerImageOperationLog(log string) bool {
return strings.HasPrefix(log, DOCKER_IMAGE_LOG_FLAG)
}
Loading

0 comments on commit d64a57b

Please sign in to comment.