forked from seaweedfs/seaweedfs
/
lookup.go
139 lines (122 loc) · 3.4 KB
/
lookup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package operation
import (
"context"
"encoding/json"
"errors"
"fmt"
"google.golang.org/grpc"
"math/rand"
"net/url"
"strings"
"time"
"github.com/bary321/seaweedfs-1/weed/pb/master_pb"
"github.com/bary321/seaweedfs-1/weed/util"
)
type Location struct {
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
}
type LookupResult struct {
VolumeId string `json:"volumeId,omitempty"`
Locations []Location `json:"locations,omitempty"`
Error string `json:"error,omitempty"`
}
func (lr *LookupResult) String() string {
return fmt.Sprintf("VolumeId:%s, Locations:%v, Error:%s", lr.VolumeId, lr.Locations, lr.Error)
}
var (
vc VidCache // caching of volume locations, re-check if after 10 minutes
)
func Lookup(server string, vid string) (ret *LookupResult, err error) {
locations, cache_err := vc.Get(vid)
if cache_err != nil {
if ret, err = do_lookup(server, vid); err == nil {
vc.Set(vid, ret.Locations, 10*time.Minute)
}
} else {
ret = &LookupResult{VolumeId: vid, Locations: locations}
}
return
}
func do_lookup(server string, vid string) (*LookupResult, error) {
values := make(url.Values)
values.Add("volumeId", vid)
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
if err != nil {
return nil, err
}
var ret LookupResult
err = json.Unmarshal(jsonBlob, &ret)
if err != nil {
return nil, err
}
if ret.Error != "" {
return nil, errors.New(ret.Error)
}
return &ret, nil
}
func LookupFileId(server string, fileId string) (fullUrl string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return "", errors.New("Invalid fileId " + fileId)
}
lookup, lookupError := Lookup(server, parts[0])
if lookupError != nil {
return "", lookupError
}
if len(lookup.Locations) == 0 {
return "", errors.New("File Not Found")
}
return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil
}
// LookupVolumeIds find volume locations by cache and actual lookup
func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
ret := make(map[string]LookupResult)
var unknown_vids []string
//check vid cache first
for _, vid := range vids {
locations, cache_err := vc.Get(vid)
if cache_err == nil {
ret[vid] = LookupResult{VolumeId: vid, Locations: locations}
} else {
unknown_vids = append(unknown_vids, vid)
}
}
//return success if all volume ids are known
if len(unknown_vids) == 0 {
return ret, nil
}
//only query unknown_vids
err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeIds: unknown_vids,
}
resp, grpcErr := masterClient.LookupVolume(context.Background(), req)
if grpcErr != nil {
return grpcErr
}
//set newly checked vids to cache
for _, vidLocations := range resp.VolumeIdLocations {
var locations []Location
for _, loc := range vidLocations.Locations {
locations = append(locations, Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
})
}
if vidLocations.Error != "" {
vc.Set(vidLocations.VolumeId, locations, 10*time.Minute)
}
ret[vidLocations.VolumeId] = LookupResult{
VolumeId: vidLocations.VolumeId,
Locations: locations,
Error: vidLocations.Error,
}
}
return nil
})
if err != nil {
return nil, err
}
return ret, nil
}