-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
165 lines (140 loc) · 3.46 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package diskv
import "shardmaster"
import "net/rpc"
import "time"
import "sync"
import "fmt"
import "crypto/rand"
import "math/big"
type Clerk struct {
mu sync.Mutex // one RPC at a time
sm *shardmaster.Clerk
config shardmaster.Config
// You'll have to modify Clerk.
}
func nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
func MakeClerk(shardmasters []string) *Clerk {
ck := new(Clerk)
ck.sm = shardmaster.MakeClerk(shardmasters)
// You'll have to modify MakeClerk.
return ck
}
//
// call() sends an RPC to the rpcname handler on server srv
// with arguments args, waits for the reply, and leaves the
// reply in reply. the reply argument should be a pointer
// to a reply structure.
//
// the return value is true if the server responded, and false
// if call() was not able to contact the server. in particular,
// the reply's contents are only valid if call() returned true.
//
// you should assume that call() will return an
// error after a while if the server is dead.
// don't provide your own time-out mechanism.
//
// please use call() to send all RPCs, in client.go and server.go.
// please don't change this function.
//
func call(srv string, rpcname string,
args interface{}, reply interface{}) bool {
c, errx := rpc.Dial("unix", srv)
if errx != nil {
return false
}
defer c.Close()
err := c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
//
// which shard is a key in?
// please use this function,
// and please do not change it.
//
func key2shard(key string) int {
shard := 0
if len(key) > 0 {
shard = int(key[0])
}
shard %= shardmaster.NShards
return shard
}
//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
func (ck *Clerk) Get(key string) string {
ck.mu.Lock()
defer ck.mu.Unlock()
// You'll have to modify Get().
for {
shard := key2shard(key)
gid := ck.config.Shards[shard]
servers, ok := ck.config.Groups[gid]
if ok {
// try each server in the shard's replication group.
for _, srv := range servers {
args := &GetArgs{}
args.Key = key
var reply GetReply
ok := call(srv, "DisKV.Get", args, &reply)
if ok && (reply.Err == OK || reply.Err == ErrNoKey) {
return reply.Value
}
if ok && (reply.Err == ErrWrongGroup) {
break
}
}
}
time.Sleep(100 * time.Millisecond)
// ask master for a new configuration.
ck.config = ck.sm.Query(-1)
}
}
// send a Put or Append request.
func (ck *Clerk) PutAppend(key string, value string, op string) {
ck.mu.Lock()
defer ck.mu.Unlock()
// You'll have to modify PutAppend().
for {
shard := key2shard(key)
gid := ck.config.Shards[shard]
servers, ok := ck.config.Groups[gid]
if ok {
// try each server in the shard's replication group.
for _, srv := range servers {
args := &PutAppendArgs{}
args.Key = key
args.Value = value
args.Op = op
var reply PutAppendReply
ok := call(srv, "DisKV.PutAppend", args, &reply)
if ok && reply.Err == OK {
return
}
if ok && (reply.Err == ErrWrongGroup) {
break
}
}
}
time.Sleep(100 * time.Millisecond)
// ask master for a new configuration.
ck.config = ck.sm.Query(-1)
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}