Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const (
// WriteLock WriteMarker lock
type WriteLock struct {
AllocationID string `gorm:"primaryKey, column:allocation_id"`
SessionID string `gorm:"column:session_id"`
ConnectionID string `gorm:"column:connection_id"`
CreatedAt time.Time `gorm:"column:created_at"`
}

Expand Down
44 changes: 12 additions & 32 deletions code/go/0chain.net/blobbercore/writemarker/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/errors"
"github.com/0chain/gosdk/constants"
Expand All @@ -24,9 +23,8 @@ const (
)

type LockResult struct {
Status LockStatus `json:"status,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
RootNode *reference.HashNode `json:"root_node,omitempty"`
Status LockStatus `json:"status,omitempty"`
CreatedAt int64 `json:"created_at,omitempty"`
}

// Mutex WriteMarker mutex
Expand All @@ -35,16 +33,16 @@ type Mutex struct {
}

// Lock
func (m *Mutex) Lock(ctx context.Context, allocationID, sessionID string, requestTime *time.Time) (*LockResult, error) {
func (m *Mutex) Lock(ctx context.Context, allocationID, connectionID string, requestTime *time.Time) (*LockResult, error) {
m.Mutex.Lock()
defer m.Mutex.Unlock()

if len(allocationID) == 0 {
return nil, errors.Throw(constants.ErrInvalidParameter, "allocationID")
}

if len(sessionID) == 0 {
return nil, errors.Throw(constants.ErrInvalidParameter, "sessionID")
if len(connectionID) == 0 {
return nil, errors.Throw(constants.ErrInvalidParameter, "connectionID")
}

if requestTime == nil {
Expand All @@ -65,7 +63,7 @@ func (m *Mutex) Lock(ctx context.Context, allocationID, sessionID string, reques
if errors.Is(err, gorm.ErrRecordNotFound) {
lock = datastore.WriteLock{
AllocationID: allocationID,
SessionID: sessionID,
ConnectionID: connectionID,
CreatedAt: *requestTime,
}

Expand All @@ -74,15 +72,9 @@ func (m *Mutex) Lock(ctx context.Context, allocationID, sessionID string, reques
return nil, errors.ThrowLog(err.Error(), common.ErrBadDataStore)
}

rootNode, err := reference.LoadRootNode(ctx, allocationID)
if err != nil {
return nil, errors.ThrowLog(err.Error(), common.ErrBadDataStore)
}

return &LockResult{
Status: LockStatusOK,
CreatedAt: lock.CreatedAt,
RootNode: rootNode,
CreatedAt: lock.CreatedAt.Unix(),
}, nil

}
Expand All @@ -97,45 +89,33 @@ func (m *Mutex) Lock(ctx context.Context, allocationID, sessionID string, reques
// locked, but it is timeout
if now.After(timeout) {

lock.SessionID = sessionID
lock.ConnectionID = connectionID
lock.CreatedAt = *requestTime

err = db.Save(&lock).Error
if err != nil {
return nil, errors.ThrowLog(err.Error(), common.ErrBadDataStore)
}

rootNode, err := reference.LoadRootNode(ctx, allocationID)
if err != nil {
return nil, errors.ThrowLog(err.Error(), common.ErrBadDataStore)
}

return &LockResult{
Status: LockStatusOK,
CreatedAt: lock.CreatedAt,
RootNode: rootNode,
CreatedAt: lock.CreatedAt.Unix(),
}, nil

}

//try lock by same session, return old lock directly
if lock.SessionID == sessionID && lock.CreatedAt.Equal(*requestTime) {
rootNode, err := reference.LoadRootNode(ctx, allocationID)
if err != nil {
return nil, errors.ThrowLog(err.Error(), common.ErrBadDataStore)
}

if lock.ConnectionID == connectionID && lock.CreatedAt.Equal(*requestTime) {
return &LockResult{
Status: LockStatusOK,
CreatedAt: lock.CreatedAt,
RootNode: rootNode,
CreatedAt: lock.CreatedAt.Unix(),
}, nil
}

// pending
return &LockResult{
Status: LockStatusPending,
CreatedAt: lock.CreatedAt,
CreatedAt: lock.CreatedAt.Unix(),
}, nil

}
Expand Down
28 changes: 14 additions & 14 deletions code/go/0chain.net/blobbercore/writemarker/mutext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ func TestMutext_LockShouldWork(t *testing.T) {
tests := []struct {
name string
allocationID string
sessionID string
connectionID string
requestTime time.Time
mock func()
assert func(*testing.T, *LockResult, error)
}{
{
name: "Lock should work",
allocationID: "lock_allocation_id",
sessionID: "lock_session_id",
connectionID: "lock_connection_id",
requestTime: now,
mock: func() {

Expand All @@ -44,7 +44,7 @@ func TestMutext_LockShouldWork(t *testing.T) {
{
name: "retry lock by same request should work if it is not timeout",
allocationID: "lock_same_allocation_id",
sessionID: "lock_same_session_id",
connectionID: "lock_same_connection_id",
requestTime: now,
mock: func() {
gomocket.Catcher.NewMock().
Expand All @@ -53,7 +53,7 @@ func TestMutext_LockShouldWork(t *testing.T) {
WithReply([]map[string]interface{}{
{
"allocation_id": "lock_same_allocation_id",
"session_id": "lock_same_session_id",
"connection_id": "lock_same_connection_id",
"created_at": now,
},
})
Expand All @@ -66,7 +66,7 @@ func TestMutext_LockShouldWork(t *testing.T) {
{
name: "lock should be pending if it already is locked by other session ",
allocationID: "lock_allocation_id",
sessionID: "lock_pending_session_id",
connectionID: "lock_pending_connection_id",
requestTime: time.Now(),
mock: func() {
gomocket.Catcher.NewMock().
Expand All @@ -75,7 +75,7 @@ func TestMutext_LockShouldWork(t *testing.T) {
WithReply([]map[string]interface{}{
{
"allocation_id": "lock_allocation_id",
"session_id": "lock_session_id",
"connection_id": "lock_connection_id",
"created_at": time.Now().Add(-5 * time.Second),
},
})
Expand All @@ -88,7 +88,7 @@ func TestMutext_LockShouldWork(t *testing.T) {
{
name: "lock should ok if it is timeout",
allocationID: "lock_timeout_allocation_id",
sessionID: "lock_timeout_2nd_session_id",
connectionID: "lock_timeout_2nd_connection_id",
requestTime: now,
mock: func() {
gomocket.Catcher.NewMock().
Expand All @@ -97,7 +97,7 @@ func TestMutext_LockShouldWork(t *testing.T) {
WithReply([]map[string]interface{}{
{
"allocation_id": "lock_timeout_allocation_id",
"session_id": "lock_timeout_1st_session_id",
"connection_id": "lock_timeout_1st_connection_id",
"created_at": time.Now().Add(31 * time.Second),
},
})
Expand All @@ -116,7 +116,7 @@ func TestMutext_LockShouldWork(t *testing.T) {
if it.mock != nil {
it.mock()
}
r, err := m.Lock(context.TODO(), it.allocationID, it.sessionID, &it.requestTime)
r, err := m.Lock(context.TODO(), it.allocationID, it.connectionID, &it.requestTime)

it.assert(test, r, err)

Expand All @@ -139,15 +139,15 @@ func TestMutext_LockShouldNotWork(t *testing.T) {
tests := []struct {
name string
allocationID string
sessionID string
connectionID string
requestTime time.Time
mock func()
assert func(*testing.T, *LockResult, error)
}{
{
name: "Lock should not work if request_time is timeout",
allocationID: "lock_allocation_id",
sessionID: "lock_session_id",
connectionID: "lock_connection_id",
requestTime: time.Now().Add(31 * time.Second),
mock: func() {
config.Configuration.WriteMarkerLockTimeout = 30 * time.Second
Expand All @@ -160,7 +160,7 @@ func TestMutext_LockShouldNotWork(t *testing.T) {
{
name: "retry lock by same request should not work if it is timeout",
allocationID: "lock_same_timeout_allocation_id",
sessionID: "lock_same_timeout_session_id",
connectionID: "lock_same_timeout_connection_id",
requestTime: now,
mock: func() {
gomocket.Catcher.NewMock().
Expand All @@ -169,7 +169,7 @@ func TestMutext_LockShouldNotWork(t *testing.T) {
WithReply([]map[string]interface{}{
{
"allocation_id": "lock_same_timeout_allocation_id",
"session_id": "lock_same_timeout_session_id",
"connection_id": "lock_same_timeout_connection_id",
"created_at": now.Add(-config.Configuration.WriteMarkerLockTimeout),
},
})
Expand All @@ -188,7 +188,7 @@ func TestMutext_LockShouldNotWork(t *testing.T) {
if it.mock != nil {
it.mock()
}
r, err := m.Lock(context.TODO(), it.allocationID, it.sessionID, &it.requestTime)
r, err := m.Lock(context.TODO(), it.allocationID, it.connectionID, &it.requestTime)

it.assert(test, r, err)

Expand Down