/
pool.go
161 lines (136 loc) · 3.75 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package pool
import (
"errors"
"fmt"
"sync"
"time"
"github.com/code-to-go/safepool/core"
"github.com/code-to-go/safepool/security"
"github.com/code-to-go/safepool/sql"
"github.com/code-to-go/safepool/storage"
)
type Bandwidth int
const (
LowBandwidth Bandwidth = iota
MediumBandwidth
HighBandwith
)
var AvailableBandwidth Bandwidth = HighBandwith
var HouseKeepingPeriods = map[Bandwidth]time.Duration{
LowBandwidth: time.Hour,
MediumBandwidth: 20 * time.Minute,
HighBandwith: 5 * time.Minute,
}
var ErrNoStorage = errors.New("no Storage available")
var ErrNotTrusted = errors.New("the author is not a trusted user")
var ErrNotAuthorized = errors.New("no authorization for this file")
var ErrAlreadyExist = errors.New("pool already exists")
var ErrInvalidToken = errors.New("provided token is invalid: missing name or configs")
var ErrInvalidId = errors.New("provided id not a valid ed25519 public key")
var ErrInvalidConfig = errors.New("provided config is invalid: missing name or configs")
var ErrInvalidName = errors.New("provided pool has invalid name")
var ErrNoSyncClock = errors.New("cannot sync with global time server")
type Consumer interface {
TimeOffset(s *Pool) time.Time
Accept(s *Pool, h Head) bool
}
type Config struct {
Name string `json:"name"`
Public []string `json:"public"`
Private []string `json:"private"`
Apps []string `json:"apps"`
LifeSpanHours int `json:"lifeSpan"`
}
type Pool struct {
Name string `json:"name"`
Id uint64 `json:"id"`
Self security.Identity `json:"self"`
Apps []string `json:"apps"`
LifeSpanHours int `json:"lifeSpanHours"`
Trusted bool `json:"trusted"`
Connection string `json:"connection"`
e storage.Storage
exchangers []storage.Storage
masterKeyId uint64
masterKey []byte
lastAccessSync time.Time
lastReadAccessFile string
lastReplica time.Time
lastReplicaSlot string
quitReplica chan bool
ctime int64
mutex sync.Mutex
}
type Head struct {
Id uint64 `json:"id"`
Name string `json:"name"`
Size int64 `json:"size"`
Hash []byte `json:"hash"`
ModTime time.Time `json:"modTime"`
AuthorId string `json:"authorId"`
Signature []byte `json:"signature"`
Meta []byte `json:"meta"`
CTime int64 `json:"-"`
Slot string `json:"-"`
}
const (
ID_CREATE = 0x0
ID_FORCE_CREATE = 0x1
)
var ForceCreation = false
var CacheSizeMB = 16
var FeedDateFormat = "20060102"
func List() []string {
names, _ := sqlListPool()
return names
}
type AcceptFunc func(feed Head)
const All = ""
func (p *Pool) List(ctime int64) ([]Head, error) {
hs, err := sqlGetFeeds(p.Name, ctime)
if core.IsErr(err, "cannot read Pool feeds: %v") {
return nil, err
}
return hs, err
}
func (p *Pool) Close() {
p.mutex.Lock()
p.stopReplica()
for _, e := range p.exchangers {
_ = e.Close()
}
p.mutex.Unlock()
}
func (p *Pool) Delete() {
p.mutex.Lock()
for _, e := range p.exchangers {
e.Delete(p.Name)
}
p.mutex.Unlock()
}
func (p *Pool) Users() ([]security.Identity, error) {
identities, _, err := p.sqlGetAccesses(false)
return identities, err
}
func (p *Pool) Leave() error {
err := sqlReset(p.Name)
if core.IsErr(err, "cannot reset pool %s: %v", p) {
return err
}
sql.DelConfigs(fmt.Sprintf("pool/%s", p.Name))
return nil
}
func (p *Pool) ToString() string {
return fmt.Sprintf("%s [%v]", p.Name, p.e)
}
var ctimeLock sync.Mutex
func (p *Pool) getCTime() int64 {
var ctime int64
ctimeLock.Lock()
for ctime <= p.ctime {
ctime = time.Now().UnixMicro()
}
p.ctime = ctime
ctimeLock.Unlock()
return ctime
}