Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions shock-server/controller/node/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,9 @@ func IndexTypedRequest(ctx context.Context) {
return
}

if err := n.SetIndexInfo(idxType, idxInfo); err != nil {
logger.Error("err@node.SetIndexInfo: " + err.Error())
n.SetIndexInfo(idxType, idxInfo)
if err := n.Save(); err != nil {
logger.Error("err@node.Save: " + err.Error())
}

if conf.LOG_PERF {
Expand Down
2 changes: 1 addition & 1 deletion shock-server/controller/node/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (cr *NodeController) Replace(id string, ctx context.Context) error {
}
}

err = n.Update(params, files)
err = n.Update(params, files, false)
if err != nil {
err_msg := "err@node_Update: " + id + ": " + err.Error()
logger.Error(err_msg)
Expand Down
4 changes: 2 additions & 2 deletions shock-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func mapRoutes() {
return nil
})

goweb.Map("/openparts", func(ctx context.Context) error {
ids := node.LockMgr.GetNodes()
goweb.Map("/locked", func(ctx context.Context) error {
ids := node.LockMgr.GetLocked()
return responder.RespondWithData(ctx, ids)
})

Expand Down
2 changes: 2 additions & 0 deletions shock-server/node/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func (nr *NodeReaper) Handle() {
logger.Error(err_msg)
}
}
// remove old nodes from Locker, value is hours old
LockMgr.RemoveOldNodes(1)
}
}

Expand Down
6 changes: 3 additions & 3 deletions shock-server/node/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func (node *Node) SetFile(file FormFile) (err error) {
if err != nil {
return
}

os.Rename(file.Path, node.FilePath())
node.File.Name = file.Name
node.File.Size = fileStat.Size()
Expand All @@ -38,7 +39,7 @@ func (node *Node) SetFile(file FormFile) (err error) {
Format: "dynamic",
CreatedOn: time.Now(),
}
err = node.Save()

return
}

Expand Down Expand Up @@ -115,7 +116,6 @@ func (node *Node) SetFileFromSubset(subsetIndices FormFile) (err error) {
CreatedOn: time.Now(),
}

err = node.Save()
return
}

Expand Down Expand Up @@ -210,7 +210,7 @@ func (node *Node) SetFileFromPath(path string, action string) (err error) {
} else {
node.File.Path = path
}
err = node.Save()

return
}

Expand Down
105 changes: 105 additions & 0 deletions shock-server/node/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package node

import (
"fmt"
"sync"
"time"
)

var (
LockMgr = NewLocker()
)

func NewLocker() *Locker {
return &Locker{
nodes: map[string]*NodeLock{},
}
}

type Locker struct {
nodes map[string]*NodeLock
sync.Mutex
}

type NodeLock struct {
isLocked bool
updated time.Time
writeLock chan int
}

func (n *NodeLock) init() {
n.isLocked = false
n.updated = time.Now()
n.writeLock = make(chan int, 1)
n.writeLock <- 1 // Put the initial value into the channel
}

func (n *NodeLock) lock(id string) (err error) {
select {
case <-n.writeLock: // Grab the ticket - here is where we wait
case <-time.After(time.Minute * 30):
err = fmt.Errorf("Timeout!! Waited 30 mins on lock for node %s", id)
return
}
n.isLocked = true
n.updated = time.Now()
return
}

func (n *NodeLock) unlock() {
n.isLocked = false
n.updated = time.Now()
n.writeLock <- 1 // Release the ticket
}

func (l *Locker) LockNode(id string) (err error) {
// add if missing, may happen if shock restarted
if _, ok := l.nodes[id]; !ok {
l.AddNode(id)
}
err = l.nodes[id].lock(id)
return
}

func (l *Locker) UnlockNode(id string) {
// skip missing id
if _, ok := l.nodes[id]; ok {
l.nodes[id].unlock()
}
}

func (l *Locker) GetLocked() (ids []string) {
l.Lock()
for id, n := range l.nodes {
if n.isLocked {
ids = append(ids, id)
}
}
l.Unlock()
return
}

func (l *Locker) AddNode(id string) {
l.Lock()
l.nodes[id] = new(NodeLock)
l.nodes[id].init()
l.Unlock()
}

func (l *Locker) RemoveNode(id string) {
l.Lock()
delete(l.nodes, id)
l.Unlock()
}

func (l *Locker) RemoveOldNodes(hours int) {
currTime := time.Now()
expireTime := currTime.Add(time.Duration(hours*-1) * time.Hour)
l.Lock()
for id, n := range l.nodes {
if (!n.isLocked) && n.updated.Before(expireTime) {
delete(l.nodes, id)
}
}
l.Unlock()
}
75 changes: 37 additions & 38 deletions shock-server/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"io/ioutil"
"os"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -130,13 +131,13 @@ func CreateNodeUpload(u *user.User, params map[string]string, files FormFiles) (
return
}

err = node.Update(params, files)
// update saves node
err = node.Update(params, files, true)
if err != nil {
err = fmt.Errorf("(node.Update) %s", err.Error())
node.Rmdir()
return
}

err = node.Save()
return
}

Expand Down Expand Up @@ -219,9 +220,9 @@ func CreateNodesFromArchive(u *user.User, params map[string]string, files FormFi

// save nodes, only return those that were created / saved
for _, n := range tempNodes {
if err = n.Save(); err != nil {
if serr := n.Save(); serr != nil {
n.Rmdir()
return nil, err
continue
}
nodes = append(nodes, n)
}
Expand Down Expand Up @@ -264,6 +265,13 @@ func (node *Node) DynamicIndex(name string) (idx index.Index, err error) {
}

func (node *Node) Delete() (err error) {
// lock node
err = LockMgr.LockNode(node.Id)
if err != nil {
return
}
defer LockMgr.RemoveNode(node.Id)

// check to make sure this node isn't referenced by a vnode
virtualNodes := Nodes{}
if _, err = dbFind(bson.M{"file.virtual_parts": node.Id}, &virtualNodes, "", nil); err != nil {
Expand Down Expand Up @@ -306,7 +314,8 @@ func (node *Node) Delete() (err error) {
if err = dbDelete(bson.M{"id": node.Id}); err != nil {
return err
}
return node.Rmdir()
err = node.Rmdir()
return
}

func (node *Node) DeleteIndex(indextype string) (err error) {
Expand All @@ -319,22 +328,8 @@ func (node *Node) DeleteIndex(indextype string) (err error) {
return
}

func (node *Node) SetIndexInfo(indextype string, idxinfo IdxInfo) (err error) {
func (node *Node) SetIndexInfo(indextype string, idxinfo IdxInfo) {
node.Indexes[indextype] = idxinfo
err = node.Save()
return
}

func (node *Node) SetFileFormat(format string) (err error) {
node.File.Format = format
err = node.Save()
return
}

func (node *Node) SetPriority(priority int) (err error) {
node.Priority = priority
err = node.Save()
return
}

func (node *Node) SetExpiration(expire string) (err error) {
Expand All @@ -356,21 +351,6 @@ func (node *Node) SetExpiration(expire string) (err error) {
}

node.Expiration = currTime.Add(expireTime)
err = node.Save()
return
}

func (node *Node) RemoveExpiration() (err error) {
// reset to empty time
node.Expiration = time.Time{}
err = node.Save()
return
}

func (node *Node) ClearRevisions() (err error) {
// empty the revisions array
node.Revisions = []Node{}
err = node.Save()
return
}

Expand All @@ -384,7 +364,6 @@ func (node *Node) SetAttributes(attr FormFile) (err error) {
if err != nil {
return
}
err = node.Save()
return
}

Expand All @@ -393,6 +372,26 @@ func (node *Node) SetAttributesFromString(attributes string) (err error) {
if err != nil {
return
}
err = node.Save()
return
}

func (node *Node) UpdateDataTags(types string) {
tagslist := strings.Split(types, ",")
for _, newtag := range tagslist {
if contains(node.Tags, newtag) {
continue
}
node.Tags = append(node.Tags, newtag)
}
}

func (node *Node) UpdateLinkages(ltype string, ids string, operation string) {
var link linkage
link.Type = ltype
idList := strings.Split(ids, ",")
for _, id := range idList {
link.Ids = append(link.Ids, id)
}
link.Operation = operation
node.Linkages = append(node.Linkages, link)
}
13 changes: 1 addition & 12 deletions shock-server/node/parts.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,7 @@ func (node *Node) initParts(partsCount string, compressionFormat string) (err er
Parts: make([]partsFile, count),
Compression: compressionFormat,
}
if err = node.Save(); err != nil {
return err
}

// add node id to LockMgr
LockMgr.AddNode(node.Id)
return
}

Expand Down Expand Up @@ -82,7 +77,6 @@ func (node *Node) addVirtualParts(ids []string) (err error) {
} else {
return err
}
err = node.Save()
return
}

Expand Down Expand Up @@ -116,7 +110,6 @@ func (node *Node) addPart(n int, file *FormFile) (err error) {
if err = os.Rename(file.Path, fmt.Sprintf("%s/parts/%d", node.Path(), n+1)); err != nil {
return err
}
err = node.Save()
return
}

Expand All @@ -126,10 +119,6 @@ func (node *Node) closeParts(allowEmpty bool) (err error) {
if err = node.SetFileFromParts(allowEmpty); err != nil {
return err
}
if err = os.RemoveAll(node.Path() + "/parts/"); err != nil {
return err
}
// remove node id from LockMgr
LockMgr.RemoveNode(node.Id)
err = os.RemoveAll(node.Path() + "/parts/")
return
}
Loading