/
content.go
86 lines (77 loc) · 2.78 KB
/
content.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package contentmgr
import (
"context"
"sync"
"github.com/application-research/estuary/util"
"github.com/ipfs/go-cid"
"go.opentelemetry.io/otel/trace"
"gorm.io/gorm"
"github.com/application-research/estuary/config"
"github.com/application-research/estuary/node"
"github.com/application-research/estuary/shuttle"
"github.com/application-research/filclient"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
blocks "github.com/ipfs/go-block-format"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
)
type IManager interface {
GarbageCollect(ctx context.Context) error
RemoveContent(ctx context.Context, contID uint, now bool) error
RefreshContent(ctx context.Context, cont uint64) error
OffloadContents(ctx context.Context, conts []uint64) (int, error)
ClearUnused(ctx context.Context, spaceRequest int64, loc string, users []uint, dryrun bool) (*collectionResult, error)
GetRemovalCandidates(ctx context.Context, all bool, loc string, users []uint) ([]removalCandidateInfo, error)
UnpinContent(ctx context.Context, contid uint) error
GetContent(id uint64) (*util.Content, error)
TryRetrieve(ctx context.Context, maddr address.Address, c cid.Cid, ask *retrievalmarket.QueryResponse) error
RecordRetrievalFailure(rfr *util.RetrievalFailureRecord) error
RefreshContentForCid(ctx context.Context, c cid.Cid) (blocks.Block, error)
}
type manager struct {
db *gorm.DB
fc *filclient.FilClient
blockstore node.EstuaryBlockstore
node *node.Node
cfg *config.Estuary
log *zap.SugaredLogger
shuttleMgr shuttle.IManager
tracer trace.Tracer
notifyBlockstore *node.NotifyBlockstore
retrLk sync.Mutex
retrievalsInProgress map[uint64]*util.RetrievalProgress
contentLk sync.RWMutex
inflightCids map[cid.Cid]uint
inflightCidsLk sync.Mutex
}
func NewManager(
db *gorm.DB,
fc *filclient.FilClient,
tbs *util.TrackingBlockstore,
nd *node.Node,
cfg *config.Estuary,
log *zap.SugaredLogger,
shuttleMgr shuttle.IManager,
) IManager {
return &manager{
db: db,
fc: fc,
blockstore: tbs.Under().(node.EstuaryBlockstore),
node: nd,
cfg: cfg,
log: log,
shuttleMgr: shuttleMgr,
tracer: otel.Tracer("content"),
notifyBlockstore: nd.NotifBlockstore,
retrievalsInProgress: make(map[uint64]*util.RetrievalProgress),
inflightCids: make(map[cid.Cid]uint),
}
}
func (m *manager) GetContent(id uint64) (*util.Content, error) {
var content util.Content
if err := m.db.First(&content, "id = ?", id).Error; err != nil {
return nil, err
}
return &content, nil
}