Skip to content

Commit

Permalink
delete dir in repair (#1196)
Browse files Browse the repository at this point in the history
* delete dir in repair

* replace ioutil

* fix unit test

* fix list worker test

* fix rename dir error
  • Loading branch information
Hitenjain14 authored and YarikRevich committed Aug 30, 2023
1 parent da1e646 commit f18a038
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 30 deletions.
2 changes: 1 addition & 1 deletion zboxcore/allocationchange/renameobject.go
Expand Up @@ -47,7 +47,7 @@ func (ch *RenameFileChange) ProcessChange(rootRef *fileref.Ref, _ map[string]str
if ch.ObjectTree.GetType() == fileref.FILE {
affectedRef = &(ch.ObjectTree.(*fileref.FileRef)).Ref
} else {
affectedRef = ch.ObjectTree.(*fileref.Ref)
err = errors.New("invalid_rename_op", "Object to rename is not a file use move instead")
}

affectedRef.Path = pathutil.Join(parentPath, ch.NewName)
Expand Down
7 changes: 6 additions & 1 deletion zboxcore/sdk/allocation_test.go
Expand Up @@ -1601,7 +1601,9 @@ func TestAllocation_ListDirFromAuthTicket(t *testing.T) {
DataShards: 2,
ParityShards: 2,
}

if tt.parameters.expectedResult != nil {
tt.parameters.expectedResult.deleteMask = zboxutil.NewUint128(1).Lsh(uint64(a.DataShards + a.ParityShards)).Sub64(1)
}
if tt.setup != nil {
if teardown := tt.setup(t, tt.name, a, &mockClient); teardown != nil {
defer teardown(t)
Expand Down Expand Up @@ -1881,6 +1883,9 @@ func TestAllocation_listDir(t *testing.T) {
DataShards: 2,
ParityShards: 2,
}
if tt.parameters.expectedResult != nil {
tt.parameters.expectedResult.deleteMask = zboxutil.NewUint128(1).Lsh(uint64(a.DataShards + a.ParityShards)).Sub64(1)
}
a.InitAllocation()
sdkInitialized = true
for i := 0; i < numBlobbers; i++ {
Expand Down
8 changes: 6 additions & 2 deletions zboxcore/sdk/listworker.go
Expand Up @@ -58,6 +58,7 @@ type ListResult struct {
UpdatedAt common.Timestamp `json:"updated_at"`
Children []*ListResult `json:"list"`
Consensus `json:"-"`
deleteMask zboxutil.Uint128 `json:"-"`
}

func (req *ListRequest) getListInfoFromBlobber(blobber *blockchain.StorageNode, blobberIdx int, rspCh chan<- *listResponse) {
Expand Down Expand Up @@ -146,7 +147,9 @@ func (req *ListRequest) getlistFromBlobbers() []*listResponse {

func (req *ListRequest) GetListFromBlobbers() (*ListResult, error) {
lR := req.getlistFromBlobbers()
result := &ListResult{}
result := &ListResult{
deleteMask: zboxutil.NewUint128(1).Lsh(uint64(len(req.blobbers))).Sub64(1),
}
selected := make(map[string]*ListResult)
childResultMap := make(map[string]*ListResult)
var err error
Expand All @@ -157,6 +160,7 @@ func (req *ListRequest) GetListFromBlobbers() (*ListResult, error) {
if ti.err != nil {
err = ti.err
errNum++
result.deleteMask = result.deleteMask.And(zboxutil.NewUint128(1).Lsh(uint64(ti.blobberIdx)).Not())
continue
}
if ti.ref == nil {
Expand Down Expand Up @@ -193,7 +197,7 @@ func (req *ListRequest) GetListFromBlobbers() (*ListResult, error) {
}
}

if errNum == len(lR) {
if errNum >= req.consensusThresh && !req.forRepair {
return nil, err
}

Expand Down
5 changes: 3 additions & 2 deletions zboxcore/sdk/listworker_test.go
Expand Up @@ -299,8 +299,9 @@ func TestListRequest_GetListFromBlobbers(t *testing.T) {
}
got, _ := req.GetListFromBlobbers()
expectedResult := &ListResult{
Type: mockType,
Size: 0,
Type: mockType,
Size: 0,
deleteMask: zboxutil.NewUint128(1).Lsh(uint64(len(req.blobbers))).Sub64(1),
}
if !tt.wantErr {
require.EqualValues(expectedResult, got)
Expand Down
15 changes: 15 additions & 0 deletions zboxcore/sdk/repairworker.go
Expand Up @@ -79,6 +79,21 @@ func (r *RepairRequest) iterateDir(a *Allocation, dir *ListResult) {
return
}
}
if len(dir.Children) == 0 {
if dir.deleteMask.CountOnes() > 0 {
l.Logger.Info("Deleting minority shards for the path :", zap.Any("path", dir.Path))
consensus := dir.deleteMask.CountOnes()
if consensus < a.DataShards {

err := a.deleteFile(dir.Path, 0, consensus, dir.deleteMask)
if err != nil {
l.Logger.Error("repair_file_failed", zap.Error(err))
return
}
r.filesRepaired++
}
}
}
for _, childDir := range dir.Children {
if r.checkForCancel(a) {
return
Expand Down
38 changes: 14 additions & 24 deletions zboxcore/sdk/rollback.go
Expand Up @@ -5,7 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"mime/multipart"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -73,7 +73,7 @@ func GetWritemarker(allocID, allocTx, id, baseUrl string) (*LatestPrevWriteMarke
time.Sleep(time.Duration(r) * time.Second)
continue
}
body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("writemarker error response %s with status %d", body, resp.StatusCode)
Expand Down Expand Up @@ -182,7 +182,7 @@ func (rb *RollbackBlobber) processRollback(ctx context.Context, tx string) error
return
}

respBody, err = ioutil.ReadAll(resp.Body)
respBody, err = io.ReadAll(resp.Body)
if err != nil {
l.Logger.Error("Response read: ", err)
return
Expand Down Expand Up @@ -245,8 +245,11 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, error) {

versionMap := make(map[string][]*RollbackBlobber)

var prevVersion string
var latestVersion string
var (
prevVersion string
latestVersion string
highestTS int64
)

for rb := range markerChan {

Expand All @@ -256,10 +259,14 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, error) {

version := rb.lpm.LatestWM.FileMetaRoot

if highestTS < rb.lpm.LatestWM.Timestamp {
prevVersion = latestVersion
highestTS = rb.lpm.LatestWM.Timestamp
latestVersion = version
}

if prevVersion == "" {
prevVersion = version
} else if prevVersion != version && latestVersion == "" {
latestVersion = version
}

if _, ok := versionMap[version]; !ok {
Expand All @@ -274,23 +281,6 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, error) {
return Commit, nil
}

maxTimestamp := int64(0)
for _, rb := range versionMap[latestVersion] {
if rb.lpm.LatestWM.Timestamp > maxTimestamp {
maxTimestamp = rb.lpm.LatestWM.Timestamp
}
}
toFlip := false
for _, rb := range versionMap[prevVersion] {
if rb.lpm.LatestWM.Timestamp > maxTimestamp {
toFlip = true
break
}
}
if toFlip {
prevVersion, latestVersion = latestVersion, prevVersion
}

req := a.DataShards

if len(versionMap[latestVersion]) > req {
Expand Down

0 comments on commit f18a038

Please sign in to comment.