/
filer.go
315 lines (279 loc) · 12.4 KB
/
filer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
package command
import (
"fmt"
"net"
"net/http"
"os"
"runtime"
"sort"
"strings"
"time"
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
weed_server "github.com/chrislusf/seaweedfs/weed/server"
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
f FilerOptions
filerStartS3 *bool
filerS3Options S3Options
filerStartWebDav *bool
filerWebDavOptions WebDavOption
filerStartIam *bool
filerIamOptions IamOptions
)
type FilerOptions struct {
masters map[string]pb.ServerAddress
mastersString *string
ip *string
bindIp *string
port *int
portGrpc *int
publicPort *int
filerGroup *string
collection *string
defaultReplicaPlacement *string
disableDirListing *bool
maxMB *int
dirListingLimit *int
dataCenter *string
rack *string
enableNotification *bool
disableHttp *bool
cipher *bool
metricsHttpPort *int
saveToFilerLimit *int
defaultLevelDbDirectory *string
concurrentUploadLimitMB *int
debug *bool
debugPort *int
localSocket *string
showUIDirectoryDelete *bool
}
func init() {
cmdFiler.Run = runFiler // break init cycle
f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
f.filerGroup = cmdFiler.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection")
f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.portGrpc = cmdFiler.Flag.Int("port.grpc", 0, "filer server grpc listen port")
f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
f.maxMB = cmdFiler.Flag.Int("maxMB", 4, "split files larger than the limit")
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size")
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center")
f.rack = cmdFiler.Flag.String("rack", "", "prefer to write to volumes in this rack")
f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
filerS3Options.port = cmdFiler.Flag.Int("s3.port", 8333, "s3 server http listen port")
filerS3Options.portGrpc = cmdFiler.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port")
filerS3Options.domainName = cmdFiler.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
filerS3Options.allowDeleteBucketNotEmpty = cmdFiler.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
// start webdav on filer
filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway")
filerWebDavOptions.port = cmdFiler.Flag.Int("webdav.port", 7333, "webdav server http listen port")
filerWebDavOptions.collection = cmdFiler.Flag.String("webdav.collection", "", "collection to create the files")
filerWebDavOptions.replication = cmdFiler.Flag.String("webdav.replication", "", "replication to create the files")
filerWebDavOptions.disk = cmdFiler.Flag.String("webdav.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
filerWebDavOptions.tlsPrivateKey = cmdFiler.Flag.String("webdav.key.file", "", "path to the TLS private key file")
filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")
filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB")
// start iam on filer
filerStartIam = cmdFiler.Flag.Bool("iam", false, "whether to start IAM service")
filerIamOptions.ip = cmdFiler.Flag.String("iam.ip", *f.ip, "iam server http listen ip address")
filerIamOptions.port = cmdFiler.Flag.Int("iam.port", 8111, "iam server http listen port")
}
func filerLongDesc() string {
desc := `start a file server which accepts REST operation for any files.
//create or overwrite the file, the directories /path/to will be automatically created
POST /path/to/file
//get the file content
GET /path/to/file
//create or overwrite the file, the filename in the multipart request will be used
POST /path/to/
//return a json format subdirectory and files listing
GET /path/to/
The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order.
If the "filer.toml" is not found, an embedded filer store will be created under "-defaultStoreDir".
The example filer.toml configuration file can be generated by "weed scaffold -config=filer"
Supported Filer Stores:
`
storeNames := make([]string, len(filer.Stores))
for i, store := range filer.Stores {
storeNames[i] = "\t" + store.GetName()
}
sort.Strings(storeNames)
storeList := strings.Join(storeNames, "\n")
return desc + storeList
}
var cmdFiler = &Command{
UsageLine: "filer -port=8888 -master=<ip:port>[,<ip:port>]*",
Short: "start a file server that points to a master server, or a list of master servers",
Long: filerLongDesc(),
}
func runFiler(cmd *Command, args []string) bool {
if *f.debug {
go http.ListenAndServe(fmt.Sprintf(":%d", *f.debugPort), nil)
}
util.LoadConfiguration("security", false)
go stats_collect.StartMetricsServer(*f.metricsHttpPort)
filerAddress := util.JoinHostPort(*f.ip, *f.port)
startDelay := time.Duration(2)
if *filerStartS3 {
filerS3Options.filer = &filerAddress
filerS3Options.bindIp = f.bindIp
filerS3Options.localFilerSocket = f.localSocket
go func() {
time.Sleep(startDelay * time.Second)
filerS3Options.startS3Server()
}()
startDelay++
}
if *filerStartWebDav {
filerWebDavOptions.filer = &filerAddress
go func() {
time.Sleep(startDelay * time.Second)
filerWebDavOptions.startWebDav()
}()
startDelay++
}
if *filerStartIam {
filerIamOptions.filer = &filerAddress
filerIamOptions.masters = f.mastersString
go func() {
time.Sleep(startDelay * time.Second)
filerIamOptions.startIamServer()
}()
}
f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap()
f.startFiler()
return true
}
func (fo *FilerOptions) startFiler() {
defaultMux := http.NewServeMux()
publicVolumeMux := defaultMux
if *fo.publicPort != 0 {
publicVolumeMux = http.NewServeMux()
}
if *fo.portGrpc == 0 {
*fo.portGrpc = 10000 + *fo.port
}
if *fo.bindIp == "" {
*fo.bindIp = *fo.ip
}
defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc)
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{
Masters: fo.masters,
FilerGroup: *fo.filerGroup,
Collection: *fo.collection,
DefaultReplication: *fo.defaultReplicaPlacement,
DisableDirListing: *fo.disableDirListing,
MaxMB: *fo.maxMB,
DirListingLimit: *fo.dirListingLimit,
DataCenter: *fo.dataCenter,
Rack: *fo.rack,
DefaultLevelDbDir: defaultLevelDbDirectory,
DisableHttp: *fo.disableHttp,
Host: filerAddress,
Cipher: *fo.cipher,
SaveToFilerLimit: int64(*fo.saveToFilerLimit),
ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
ShowUIDirectoryDelete: *fo.showUIDirectoryDelete,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
}
if *fo.publicPort != 0 {
publicListeningAddress := util.JoinHostPort(*fo.bindIp, *fo.publicPort)
glog.V(0).Infoln("Start Seaweed filer server", util.Version(), "public at", publicListeningAddress)
publicListener, localPublicListner, e := util.NewIpAndLocalListeners(*fo.bindIp, *fo.publicPort, 0)
if e != nil {
glog.Fatalf("Filer server public listener error on port %d:%v", *fo.publicPort, e)
}
go func() {
if e := http.Serve(publicListener, publicVolumeMux); e != nil {
glog.Fatalf("Volume server fail to serve public: %v", e)
}
}()
if localPublicListner != nil {
go func() {
if e := http.Serve(localPublicListner, publicVolumeMux); e != nil {
glog.Errorf("Volume server fail to serve public: %v", e)
}
}()
}
}
glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.Version(), *fo.ip, *fo.port)
filerListener, filerLocalListener, e := util.NewIpAndLocalListeners(
*fo.bindIp, *fo.port,
time.Duration(10)*time.Second,
)
if e != nil {
glog.Fatalf("Filer listener error: %v", e)
}
// starting grpc server
grpcPort := *fo.portGrpc
grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*fo.bindIp, grpcPort, 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
if grpcLocalL != nil {
go grpcS.Serve(grpcLocalL)
}
go grpcS.Serve(grpcL)
httpS := &http.Server{Handler: defaultMux}
if runtime.GOOS != "windows" {
if *fo.localSocket == "" {
*fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port)
}
if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) {
glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error())
}
go func() {
// start on local unix socket
filerSocketListener, err := net.Listen("unix", *fo.localSocket)
if err != nil {
glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err)
}
httpS.Serve(filerSocketListener)
}()
}
if filerLocalListener != nil {
go func() {
if err := httpS.Serve(filerLocalListener); err != nil {
glog.Errorf("Filer Fail to serve: %v", e)
}
}()
}
if err := httpS.Serve(filerListener); err != nil {
glog.Fatalf("Filer Fail to serve: %v", e)
}
}