/
content.go
107 lines (94 loc) · 2.65 KB
/
content.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package shuttle
import (
"context"
rpcevent "github.com/application-research/estuary/shuttle/rpc/event"
"github.com/application-research/estuary/util"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
)
func (m *manager) ConsolidateContent(ctx context.Context, loc string, contents []util.Content) error {
m.log.Debugf("attempting to send consolidate content cmd to %s", loc)
tc := &rpcevent.TakeContent{}
for _, c := range contents {
pr, err := m.AddrInfo(c.Location)
if err != nil {
return err
}
ct := rpcevent.ContentFetch{
ID: c.ID,
Cid: c.Cid.CID,
UserID: c.UserID,
}
if pr != nil {
ct.Peers = []*peer.AddrInfo{pr}
}
tc.Contents = append(tc.Contents, ct)
}
return m.sendRPCMessage(ctx, loc, &rpcevent.Command{
Op: rpcevent.CMD_TakeContent,
Params: rpcevent.CmdParams{
TakeContent: tc,
},
})
}
func (m *manager) PinContent(ctx context.Context, loc string, cont util.Content, origins []*peer.AddrInfo) error {
return m.sendRPCMessage(ctx, loc, &rpcevent.Command{
Op: rpcevent.CMD_AddPin,
Params: rpcevent.CmdParams{
AddPin: &rpcevent.AddPin{
DBID: cont.ID,
UserId: cont.UserID,
Cid: cont.Cid.CID,
Peers: origins,
},
},
})
}
func (m *manager) CommPContent(ctx context.Context, loc string, data cid.Cid) error {
m.log.Infof("sending commp")
return m.sendRPCMessage(ctx, loc, &rpcevent.Command{
Op: rpcevent.CMD_ComputeCommP,
Params: rpcevent.CmdParams{
ComputeCommP: &rpcevent.ComputeCommP{
Data: data,
},
},
})
}
func (m *manager) UnpinContent(ctx context.Context, loc string, conts []uint64) error {
return m.sendRPCMessage(ctx, loc, &rpcevent.Command{
Op: rpcevent.CMD_UnpinContent,
Params: rpcevent.CmdParams{
UnpinContent: &rpcevent.UnpinContent{
Contents: conts,
},
},
})
}
func (m *manager) AggregateContent(ctx context.Context, loc string, zone *util.Content, zoneContents []util.Content) error {
var aggrConts []rpcevent.AggregateContent
for _, c := range zoneContents {
aggrConts = append(aggrConts, rpcevent.AggregateContent{ID: c.ID, Name: c.Name, CID: c.Cid.CID})
}
return m.sendRPCMessage(ctx, loc, &rpcevent.Command{
Op: rpcevent.CMD_AggregateContent,
Params: rpcevent.CmdParams{
AggregateContent: &rpcevent.AggregateContents{
DBID: zone.ID,
UserID: zone.UserID,
Contents: aggrConts,
},
},
})
}
func (m *manager) SplitContent(ctx context.Context, loc string, cont uint64, size int64) error {
return m.sendRPCMessage(ctx, loc, &rpcevent.Command{
Op: rpcevent.CMD_SplitContent,
Params: rpcevent.CmdParams{
SplitContent: &rpcevent.SplitContent{
Content: cont,
Size: size,
},
},
})
}