From eb031bc3b3d67c4dee89c096125e914713db69ce Mon Sep 17 00:00:00 2001 From: darjun Date: Sun, 10 May 2020 00:00:32 +0800 Subject: [PATCH] std - rpc --- README.md | 3 +- rpc/asynchronous/client.go | 56 ++++++++++++++++++++++ rpc/get-started/client.go | 38 +++++++++++++++ rpc/get-started/server/main.go | 42 ++++++++++++++++ rpc/name/client.go | 26 ++++++++++ rpc/name/server/main.go | 31 ++++++++++++ rpc/newserver/main.go | 29 ++++++++++++ rpc/servecodec/client.go | 66 ++++++++++++++++++++++++++ rpc/servecodec/server/main.go | 87 ++++++++++++++++++++++++++++++++++ rpc/serveconn/client.go | 26 ++++++++++ rpc/serveconn/server/main.go | 37 +++++++++++++++ rpc/tcp/client.go | 26 ++++++++++ rpc/tcp/server/main.go | 29 ++++++++++++ 13 files changed, 495 insertions(+), 1 deletion(-) create mode 100644 rpc/asynchronous/client.go create mode 100644 rpc/get-started/client.go create mode 100644 rpc/get-started/server/main.go create mode 100644 rpc/name/client.go create mode 100644 rpc/name/server/main.go create mode 100644 rpc/newserver/main.go create mode 100644 rpc/servecodec/client.go create mode 100644 rpc/servecodec/server/main.go create mode 100644 rpc/serveconn/client.go create mode 100644 rpc/serveconn/server/main.go create mode 100644 rpc/tcp/client.go create mode 100644 rpc/tcp/server/main.go diff --git a/README.md b/README.md index 8ad14d5..be342a8 100644 --- a/README.md +++ b/README.md @@ -41,4 +41,5 @@ 35. [zerolog](https://darjun.github.io/2020/04/24/godailylib/zerolog) 36. [nutsdb](https://darjun.github.io/2020/04/25/godailylib/nutsdb) 37. [sqlc](https://darjun.github.io/2020/04/28/godailylib/sqlc) -38. [xorm](https://darjun.github.io/2020/05/07/godailylib/xorm) \ No newline at end of file +38. [xorm](https://darjun.github.io/2020/05/07/godailylib/xorm) +39. [rpc](https://darjun.github.io/2020/05/08/godailylib/rpc) \ No newline at end of file diff --git a/rpc/asynchronous/client.go b/rpc/asynchronous/client.go new file mode 100644 index 0000000..21e3029 --- /dev/null +++ b/rpc/asynchronous/client.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + "log" + "net/rpc" + "time" +) + +type Args struct { + A, B int +} + +type Quotient struct { + Quo, Rem int +} + +func main() { + client, err := rpc.DialHTTP("tcp", ":1234") + if err != nil { + log.Fatal("dialing:", err) + } + + args1 := &Args{7, 8} + var reply int + multiplyReply := client.Go("Arith.Multiply", args1, &reply, nil) + + args2 := &Args{15, 6} + var quo Quotient + divideReply := client.Go("Arith.Divide", args2, &quo, nil) + + ticker := time.NewTicker(time.Millisecond) + defer ticker.Stop() + + var multiplyReplied, divideReplied bool + for !multiplyReplied || !divideReplied { + select { + case replyCall := <-multiplyReply.Done: + if err := replyCall.Error; err != nil { + fmt.Println("Multiply error:", err) + } else { + fmt.Printf("Multiply: %d*%d=%d\n", args1.A, args1.B, reply) + } + multiplyReplied = true + case replyCall := <-divideReply.Done: + if err := replyCall.Error; err != nil { + fmt.Println("Divide error:", err) + } else { + fmt.Printf("Divide: %d/%d=%d...%d\n", args2.A, args2.B, quo.Quo, quo.Rem) + } + divideReplied = true + case <-ticker.C: + fmt.Println("tick") + } + } +} diff --git a/rpc/get-started/client.go b/rpc/get-started/client.go new file mode 100644 index 0000000..39e4ad1 --- /dev/null +++ b/rpc/get-started/client.go @@ -0,0 +1,38 @@ +package main + +import ( + "fmt" + "log" + "net/rpc" +) + +type Args struct { + A, B int +} + +type Quotient struct { + Quo, Rem int +} + +func main() { + client, err := rpc.DialHTTP("tcp", ":1234") + if err != nil { + log.Fatal("dialing:", err) + } + + args := &Args{7, 8} + var reply int + err = client.Call("Arith.Multiply", args, &reply) + if err != nil { + log.Fatal("Multiply error:", err) + } + fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) + + args = &Args{15, 6} + var quo Quotient + err = client.Call("Arith.Divide", args, &quo) + if err != nil { + log.Fatal("Divide error:", err) + } + fmt.Printf("Divide: %d/%d=%d...%d\n", args.A, args.B, quo.Quo, quo.Rem) +} diff --git a/rpc/get-started/server/main.go b/rpc/get-started/server/main.go new file mode 100644 index 0000000..0f406d7 --- /dev/null +++ b/rpc/get-started/server/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "errors" + "log" + "net/http" + "net/rpc" +) + +type Args struct { + A, B int +} + +type Quotient struct { + Quo, Rem int +} + +type Arith int + +func (t *Arith) Multiply(args *Args, reply *int) error { + *reply = args.A * args.B + return nil +} + +func (t *Arith) Divide(args *Args, quo *Quotient) error { + if args.B == 0 { + return errors.New("divide by 0") + } + + quo.Quo = args.A / args.B + quo.Rem = args.A % args.B + return nil +} + +func main() { + arith := new(Arith) + rpc.Register(arith) + rpc.HandleHTTP() + if err := http.ListenAndServe(":1234", nil); err != nil { + log.Fatal("serve error:", err) + } +} diff --git a/rpc/name/client.go b/rpc/name/client.go new file mode 100644 index 0000000..9260766 --- /dev/null +++ b/rpc/name/client.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" + "log" + "net/rpc" +) + +type Args struct { + A, B int +} + +func main() { + client, err := rpc.DialHTTP("tcp", ":1234") + if err != nil { + log.Fatal("dialing:", err) + } + + args := &Args{7, 8} + var reply int + err = client.Call("math.Multiply", args, &reply) + if err != nil { + log.Fatal("Multiply error:", err) + } + fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) +} diff --git a/rpc/name/server/main.go b/rpc/name/server/main.go new file mode 100644 index 0000000..0473f90 --- /dev/null +++ b/rpc/name/server/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "log" + "net/http" + "net/rpc" +) + +type Args struct { + A, B int +} + +type Quotient struct { + Quo, Rem int +} + +type Arith int + +func (t *Arith) Multiply(args *Args, reply *int) error { + *reply = args.A * args.B + return nil +} + +func main() { + arith := new(Arith) + rpc.RegisterName("math", arith) + rpc.HandleHTTP() + if err := http.ListenAndServe(":1234", nil); err != nil { + log.Fatal("serve error:", err) + } +} diff --git a/rpc/newserver/main.go b/rpc/newserver/main.go new file mode 100644 index 0000000..607a986 --- /dev/null +++ b/rpc/newserver/main.go @@ -0,0 +1,29 @@ +package main + +import ( + "log" + "net/http" + "net/rpc" +) + +type Args struct { + A, B int +} + +type Arith int + +func (t *Arith) Multiply(args *Args, reply *int) error { + *reply = args.A * args.B + return nil +} + +func main() { + arith := new(Arith) + server := rpc.NewServer() + server.RegisterName("math", arith) + server.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath) + + if err := http.ListenAndServe(":1234", nil); err != nil { + log.Fatal("serve error:", err) + } +} diff --git a/rpc/servecodec/client.go b/rpc/servecodec/client.go new file mode 100644 index 0000000..13c26c5 --- /dev/null +++ b/rpc/servecodec/client.go @@ -0,0 +1,66 @@ +package main + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "log" + "net" + "net/rpc" +) + +type Args struct { + A, B int +} + +type JsonClientCodec struct { + rwc io.ReadWriteCloser + dec *json.Decoder + enc *json.Encoder + encBuf *bufio.Writer +} + +func NewJsonClientCodec(conn io.ReadWriteCloser) *JsonClientCodec { + encBuf := bufio.NewWriter(conn) + return &JsonClientCodec{conn, json.NewDecoder(conn), json.NewEncoder(encBuf), encBuf} +} + +func (c *JsonClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) { + if err = c.enc.Encode(r); err != nil { + return + } + if err = c.enc.Encode(body); err != nil { + return + } + return c.encBuf.Flush() +} + +func (c *JsonClientCodec) ReadResponseHeader(r *rpc.Response) error { + return c.dec.Decode(r) +} + +func (c *JsonClientCodec) ReadResponseBody(body interface{}) error { + return c.dec.Decode(body) +} + +func (c *JsonClientCodec) Close() error { + return c.rwc.Close() +} + +func main() { + conn, err := net.Dial("tcp", ":1234") + if err != nil { + log.Fatal("dial error:", err) + } + + client := rpc.NewClientWithCodec(NewJsonClientCodec(conn)) + + args := &Args{7, 8} + var reply int + err = client.Call("Arith.Multiply", args, &reply) + if err != nil { + log.Fatal("Multiply error:", err) + } + fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) +} diff --git a/rpc/servecodec/server/main.go b/rpc/servecodec/server/main.go new file mode 100644 index 0000000..271e0d8 --- /dev/null +++ b/rpc/servecodec/server/main.go @@ -0,0 +1,87 @@ +package main + +import ( + "bufio" + "encoding/json" + "io" + "log" + "net" + "net/rpc" +) + +type Args struct { + A, B int +} + +type Arith int + +func (t *Arith) Multiply(args *Args, reply *int) error { + *reply = args.A * args.B + return nil +} + +type JsonServerCodec struct { + rwc io.ReadWriteCloser + dec *json.Decoder + enc *json.Encoder + encBuf *bufio.Writer + closed bool +} + +func NewJsonServerCodec(conn io.ReadWriteCloser) *JsonServerCodec { + buf := bufio.NewWriter(conn) + return &JsonServerCodec{conn, json.NewDecoder(conn), json.NewEncoder(buf), buf, false} +} + +func (c *JsonServerCodec) ReadRequestHeader(r *rpc.Request) error { + return c.dec.Decode(r) +} + +func (c *JsonServerCodec) ReadRequestBody(body interface{}) error { + return c.dec.Decode(body) +} + +func (c *JsonServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) { + if err = c.enc.Encode(r); err != nil { + if c.encBuf.Flush() == nil { + log.Println("rpc: json error encoding response:", err) + c.Close() + } + return + } + if err = c.enc.Encode(body); err != nil { + if c.encBuf.Flush() == nil { + log.Println("rpc: json error encoding body:", err) + c.Close() + } + return + } + return c.encBuf.Flush() +} + +func (c *JsonServerCodec) Close() error { + if c.closed { + return nil + } + c.closed = true + return c.rwc.Close() +} + +func main() { + l, err := net.Listen("tcp", ":1234") + if err != nil { + log.Fatal("listen error:", err) + } + + arith := new(Arith) + rpc.Register(arith) + + for { + conn, err := l.Accept() + if err != nil { + log.Fatal("accept error:", err) + } + + go rpc.ServeCodec(NewJsonServerCodec(conn)) + } +} diff --git a/rpc/serveconn/client.go b/rpc/serveconn/client.go new file mode 100644 index 0000000..138f064 --- /dev/null +++ b/rpc/serveconn/client.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" + "log" + "net/rpc" +) + +type Args struct { + A, B int +} + +func main() { + client, err := rpc.Dial("tcp", ":1234") + if err != nil { + log.Fatal("dialing:", err) + } + + args := &Args{7, 8} + var reply int + err = client.Call("Arith.Multiply", args, &reply) + if err != nil { + log.Fatal("Multiply error:", err) + } + fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) +} diff --git a/rpc/serveconn/server/main.go b/rpc/serveconn/server/main.go new file mode 100644 index 0000000..d31d6d7 --- /dev/null +++ b/rpc/serveconn/server/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "log" + "net" + "net/rpc" +) + +type Args struct { + A, B int +} + +type Arith int + +func (t *Arith) Multiply(args *Args, reply *int) error { + *reply = args.A * args.B + return nil +} + +func main() { + l, err := net.Listen("tcp", ":1234") + if err != nil { + log.Fatal("listen error:", err) + } + + arith := new(Arith) + rpc.Register(arith) + + for { + conn, err := l.Accept() + if err != nil { + log.Fatal("accept error:", err) + } + + go rpc.ServeConn(conn) + } +} diff --git a/rpc/tcp/client.go b/rpc/tcp/client.go new file mode 100644 index 0000000..138f064 --- /dev/null +++ b/rpc/tcp/client.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" + "log" + "net/rpc" +) + +type Args struct { + A, B int +} + +func main() { + client, err := rpc.Dial("tcp", ":1234") + if err != nil { + log.Fatal("dialing:", err) + } + + args := &Args{7, 8} + var reply int + err = client.Call("Arith.Multiply", args, &reply) + if err != nil { + log.Fatal("Multiply error:", err) + } + fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) +} diff --git a/rpc/tcp/server/main.go b/rpc/tcp/server/main.go new file mode 100644 index 0000000..f1d31e7 --- /dev/null +++ b/rpc/tcp/server/main.go @@ -0,0 +1,29 @@ +package main + +import ( + "log" + "net" + "net/rpc" +) + +type Args struct { + A, B int +} + +type Arith int + +func (t *Arith) Multiply(args *Args, reply *int) error { + *reply = args.A * args.B + return nil +} + +func main() { + l, err := net.Listen("tcp", ":1234") + if err != nil { + log.Fatal("listen error:", err) + } + + arith := new(Arith) + rpc.Register(arith) + rpc.Accept(l) +}