From 02eac6bb1d68cc16bf2c1c8b8925a88898feb8b0 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 1 Apr 2016 11:39:34 -0700 Subject: [PATCH 1/2] Add metadata example code --- Documentation/grpc-metadata.md | 6 +- examples/metadata/helloworld/helloworld.pb.go | 368 ++++++++++++++++++ examples/metadata/helloworld/helloworld.proto | 71 ++++ .../helloworld_metadata_client/main.go | 294 ++++++++++++++ .../helloworld_metadata_server/main.go | 236 +++++++++++ 5 files changed, 971 insertions(+), 4 deletions(-) create mode 100644 examples/metadata/helloworld/helloworld.pb.go create mode 100644 examples/metadata/helloworld/helloworld.proto create mode 100644 examples/metadata/helloworld_metadata_client/main.go create mode 100644 examples/metadata/helloworld_metadata_server/main.go diff --git a/Documentation/grpc-metadata.md b/Documentation/grpc-metadata.md index b387e880527..74baa411829 100644 --- a/Documentation/grpc-metadata.md +++ b/Documentation/grpc-metadata.md @@ -76,8 +76,7 @@ func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeRespo ## Sending and receiving metadata - client side -[//]: # "TODO: uncomment next line after example source added" -[//]: # "Real metadata sending and receiving examples are available [here](TODO:example_dir)." +Client side metadata sending and receiving examples are available [here](../examples/metadata/helloworld_metadata_client/main.go). ### Sending metadata @@ -138,8 +137,7 @@ trailer := stream.Trailer() ## Sending and receiving metadata - server side -[//]: # "TODO: uncomment next line after example source added" -[//]: # "Real metadata sending and receiving examples are available [here](TODO:example_dir)." +Server side metadata sending and receiving examples are available [here](../examples/metadata/helloworld_metadata_server/main.go). ### Receiving metadata diff --git a/examples/metadata/helloworld/helloworld.pb.go b/examples/metadata/helloworld/helloworld.pb.go new file mode 100644 index 00000000000..c4a7ced7aaa --- /dev/null +++ b/examples/metadata/helloworld/helloworld.pb.go @@ -0,0 +1,368 @@ +// Code generated by protoc-gen-go. +// source: helloworld.proto +// DO NOT EDIT! + +/* +Package helloworld is a generated protocol buffer package. + +It is generated from these files: + helloworld.proto + +It has these top-level messages: + HelloRequest + HelloReply + StreamingHelloRequest + StreamingHelloReply +*/ +package helloworld + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +const _ = proto.ProtoPackageIsVersion1 + +// The request message containing the user's name. +type HelloRequest struct { + Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` +} + +func (m *HelloRequest) Reset() { *m = HelloRequest{} } +func (m *HelloRequest) String() string { return proto.CompactTextString(m) } +func (*HelloRequest) ProtoMessage() {} +func (*HelloRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +// The response message containing the greetings +type HelloReply struct { + Message string `protobuf:"bytes,1,opt,name=message" json:"message,omitempty"` +} + +func (m *HelloReply) Reset() { *m = HelloReply{} } +func (m *HelloReply) String() string { return proto.CompactTextString(m) } +func (*HelloReply) ProtoMessage() {} +func (*HelloReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +// The request message containing a list of names +type StreamingHelloRequest struct { + Names []string `protobuf:"bytes,1,rep,name=names" json:"names,omitempty"` +} + +func (m *StreamingHelloRequest) Reset() { *m = StreamingHelloRequest{} } +func (m *StreamingHelloRequest) String() string { return proto.CompactTextString(m) } +func (*StreamingHelloRequest) ProtoMessage() {} +func (*StreamingHelloRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +// The response message containing a list of greetings +type StreamingHelloReply struct { + Messages []string `protobuf:"bytes,1,rep,name=messages" json:"messages,omitempty"` +} + +func (m *StreamingHelloReply) Reset() { *m = StreamingHelloReply{} } +func (m *StreamingHelloReply) String() string { return proto.CompactTextString(m) } +func (*StreamingHelloReply) ProtoMessage() {} +func (*StreamingHelloReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func init() { + proto.RegisterType((*HelloRequest)(nil), "helloworld.HelloRequest") + proto.RegisterType((*HelloReply)(nil), "helloworld.HelloReply") + proto.RegisterType((*StreamingHelloRequest)(nil), "helloworld.StreamingHelloRequest") + proto.RegisterType((*StreamingHelloReply)(nil), "helloworld.StreamingHelloReply") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion1 + +// Client API for Greeter service + +type GreeterClient interface { + // Sends a greeting + SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) + // Sends a list of names, receives a list of replies + ServerStreamingSayHello(ctx context.Context, in *StreamingHelloRequest, opts ...grpc.CallOption) (Greeter_ServerStreamingSayHelloClient, error) + // Sends a list of requests, receives a reply with list of greetings + ClientStreamingSayHello(ctx context.Context, opts ...grpc.CallOption) (Greeter_ClientStreamingSayHelloClient, error) + // Keep sending and receiving request and reply + BidirectionalStreamingSayHello(ctx context.Context, opts ...grpc.CallOption) (Greeter_BidirectionalStreamingSayHelloClient, error) +} + +type greeterClient struct { + cc *grpc.ClientConn +} + +func NewGreeterClient(cc *grpc.ClientConn) GreeterClient { + return &greeterClient{cc} +} + +func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) { + out := new(HelloReply) + err := grpc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *greeterClient) ServerStreamingSayHello(ctx context.Context, in *StreamingHelloRequest, opts ...grpc.CallOption) (Greeter_ServerStreamingSayHelloClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Greeter_serviceDesc.Streams[0], c.cc, "/helloworld.Greeter/ServerStreamingSayHello", opts...) + if err != nil { + return nil, err + } + x := &greeterServerStreamingSayHelloClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Greeter_ServerStreamingSayHelloClient interface { + Recv() (*HelloReply, error) + grpc.ClientStream +} + +type greeterServerStreamingSayHelloClient struct { + grpc.ClientStream +} + +func (x *greeterServerStreamingSayHelloClient) Recv() (*HelloReply, error) { + m := new(HelloReply) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *greeterClient) ClientStreamingSayHello(ctx context.Context, opts ...grpc.CallOption) (Greeter_ClientStreamingSayHelloClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Greeter_serviceDesc.Streams[1], c.cc, "/helloworld.Greeter/ClientStreamingSayHello", opts...) + if err != nil { + return nil, err + } + x := &greeterClientStreamingSayHelloClient{stream} + return x, nil +} + +type Greeter_ClientStreamingSayHelloClient interface { + Send(*HelloRequest) error + CloseAndRecv() (*StreamingHelloReply, error) + grpc.ClientStream +} + +type greeterClientStreamingSayHelloClient struct { + grpc.ClientStream +} + +func (x *greeterClientStreamingSayHelloClient) Send(m *HelloRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *greeterClientStreamingSayHelloClient) CloseAndRecv() (*StreamingHelloReply, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(StreamingHelloReply) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *greeterClient) BidirectionalStreamingSayHello(ctx context.Context, opts ...grpc.CallOption) (Greeter_BidirectionalStreamingSayHelloClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Greeter_serviceDesc.Streams[2], c.cc, "/helloworld.Greeter/BidirectionalStreamingSayHello", opts...) + if err != nil { + return nil, err + } + x := &greeterBidirectionalStreamingSayHelloClient{stream} + return x, nil +} + +type Greeter_BidirectionalStreamingSayHelloClient interface { + Send(*HelloRequest) error + Recv() (*HelloReply, error) + grpc.ClientStream +} + +type greeterBidirectionalStreamingSayHelloClient struct { + grpc.ClientStream +} + +func (x *greeterBidirectionalStreamingSayHelloClient) Send(m *HelloRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *greeterBidirectionalStreamingSayHelloClient) Recv() (*HelloReply, error) { + m := new(HelloReply) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Server API for Greeter service + +type GreeterServer interface { + // Sends a greeting + SayHello(context.Context, *HelloRequest) (*HelloReply, error) + // Sends a list of names, receives a list of replies + ServerStreamingSayHello(*StreamingHelloRequest, Greeter_ServerStreamingSayHelloServer) error + // Sends a list of requests, receives a reply with list of greetings + ClientStreamingSayHello(Greeter_ClientStreamingSayHelloServer) error + // Keep sending and receiving request and reply + BidirectionalStreamingSayHello(Greeter_BidirectionalStreamingSayHelloServer) error +} + +func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { + s.RegisterService(&_Greeter_serviceDesc, srv) +} + +func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { + in := new(HelloRequest) + if err := dec(in); err != nil { + return nil, err + } + out, err := srv.(GreeterServer).SayHello(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + +func _Greeter_ServerStreamingSayHello_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(StreamingHelloRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(GreeterServer).ServerStreamingSayHello(m, &greeterServerStreamingSayHelloServer{stream}) +} + +type Greeter_ServerStreamingSayHelloServer interface { + Send(*HelloReply) error + grpc.ServerStream +} + +type greeterServerStreamingSayHelloServer struct { + grpc.ServerStream +} + +func (x *greeterServerStreamingSayHelloServer) Send(m *HelloReply) error { + return x.ServerStream.SendMsg(m) +} + +func _Greeter_ClientStreamingSayHello_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(GreeterServer).ClientStreamingSayHello(&greeterClientStreamingSayHelloServer{stream}) +} + +type Greeter_ClientStreamingSayHelloServer interface { + SendAndClose(*StreamingHelloReply) error + Recv() (*HelloRequest, error) + grpc.ServerStream +} + +type greeterClientStreamingSayHelloServer struct { + grpc.ServerStream +} + +func (x *greeterClientStreamingSayHelloServer) SendAndClose(m *StreamingHelloReply) error { + return x.ServerStream.SendMsg(m) +} + +func (x *greeterClientStreamingSayHelloServer) Recv() (*HelloRequest, error) { + m := new(HelloRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Greeter_BidirectionalStreamingSayHello_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(GreeterServer).BidirectionalStreamingSayHello(&greeterBidirectionalStreamingSayHelloServer{stream}) +} + +type Greeter_BidirectionalStreamingSayHelloServer interface { + Send(*HelloReply) error + Recv() (*HelloRequest, error) + grpc.ServerStream +} + +type greeterBidirectionalStreamingSayHelloServer struct { + grpc.ServerStream +} + +func (x *greeterBidirectionalStreamingSayHelloServer) Send(m *HelloReply) error { + return x.ServerStream.SendMsg(m) +} + +func (x *greeterBidirectionalStreamingSayHelloServer) Recv() (*HelloRequest, error) { + m := new(HelloRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _Greeter_serviceDesc = grpc.ServiceDesc{ + ServiceName: "helloworld.Greeter", + HandlerType: (*GreeterServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SayHello", + Handler: _Greeter_SayHello_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "ServerStreamingSayHello", + Handler: _Greeter_ServerStreamingSayHello_Handler, + ServerStreams: true, + }, + { + StreamName: "ClientStreamingSayHello", + Handler: _Greeter_ClientStreamingSayHello_Handler, + ClientStreams: true, + }, + { + StreamName: "BidirectionalStreamingSayHello", + Handler: _Greeter_BidirectionalStreamingSayHello_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, +} + +var fileDescriptor0 = []byte{ + // 256 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x12, 0xc8, 0x48, 0xcd, 0xc9, + 0xc9, 0x2f, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x42, 0x88, + 0x28, 0x29, 0x71, 0xf1, 0x78, 0x80, 0x78, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x42, + 0x5c, 0x2c, 0x79, 0x89, 0xb9, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60, 0xb6, 0x92, + 0x1a, 0x17, 0x17, 0x54, 0x4d, 0x41, 0x4e, 0xa5, 0x90, 0x04, 0x17, 0x7b, 0x6e, 0x6a, 0x71, 0x71, + 0x62, 0x3a, 0x4c, 0x11, 0x8c, 0xab, 0xa4, 0xcb, 0x25, 0x1a, 0x5c, 0x52, 0x94, 0x9a, 0x98, 0x9b, + 0x99, 0x97, 0x8e, 0x62, 0xa8, 0x08, 0x17, 0x2b, 0xc8, 0xa0, 0x62, 0xa0, 0x06, 0x66, 0xa0, 0x06, + 0x08, 0x47, 0xc9, 0x90, 0x4b, 0x18, 0x5d, 0x39, 0xc8, 0x7c, 0x29, 0x2e, 0x0e, 0xa8, 0x81, 0x30, + 0xf5, 0x70, 0xbe, 0xd1, 0x75, 0x26, 0x2e, 0x76, 0xf7, 0xa2, 0xd4, 0xd4, 0x92, 0xd4, 0x22, 0x21, + 0x3b, 0x2e, 0x8e, 0xe0, 0xc4, 0x4a, 0xb0, 0x46, 0x21, 0x09, 0x3d, 0x24, 0x4f, 0x22, 0x5b, 0x2d, + 0x25, 0x86, 0x45, 0x06, 0x68, 0x8b, 0x12, 0x83, 0x50, 0x04, 0x97, 0x78, 0x70, 0x6a, 0x51, 0x59, + 0x6a, 0x11, 0xdc, 0x11, 0x70, 0xe3, 0x14, 0x91, 0x35, 0x61, 0xf5, 0x12, 0x6e, 0x73, 0x0d, 0x18, + 0x41, 0x26, 0x3b, 0xe7, 0x64, 0xa6, 0xe6, 0x95, 0x60, 0x9a, 0x8c, 0xdb, 0xa1, 0xf2, 0xf8, 0xec, + 0x04, 0x9b, 0xac, 0x01, 0x32, 0x59, 0xce, 0x29, 0x33, 0x25, 0xb3, 0x28, 0x35, 0xb9, 0x24, 0x33, + 0x3f, 0x2f, 0x31, 0x87, 0x14, 0x0b, 0x70, 0xba, 0x58, 0x83, 0xd1, 0x80, 0x31, 0x89, 0x0d, 0x9c, + 0x34, 0x8c, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0xa1, 0xde, 0x85, 0xa2, 0x2e, 0x02, 0x00, 0x00, +} diff --git a/examples/metadata/helloworld/helloworld.proto b/examples/metadata/helloworld/helloworld.proto new file mode 100644 index 00000000000..194da9bbfc2 --- /dev/null +++ b/examples/metadata/helloworld/helloworld.proto @@ -0,0 +1,71 @@ +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package helloworld; + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} + +// The request message containing a list of names +message StreamingHelloRequest { + repeated string names = 1; +} + +// The response message containing a list of greetings +message StreamingHelloReply { + repeated string messages = 1; +} + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello(HelloRequest) returns (HelloReply) { + } + // Sends a list of names, receives a list of replies + rpc ServerStreamingSayHello(StreamingHelloRequest) + returns (stream HelloReply) { + } + // Sends a list of requests, receives a reply with list of greetings + rpc ClientStreamingSayHello(stream HelloRequest) + returns (StreamingHelloReply) { + } + // Keep sending and receiving request and reply + rpc BidirectionalStreamingSayHello(stream HelloRequest) + returns (stream HelloReply) { + } +} diff --git a/examples/metadata/helloworld_metadata_client/main.go b/examples/metadata/helloworld_metadata_client/main.go new file mode 100644 index 00000000000..c53503d161b --- /dev/null +++ b/examples/metadata/helloworld_metadata_client/main.go @@ -0,0 +1,294 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package main + +import ( + "io" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + pb "google.golang.org/grpc/examples/metadata/helloworld" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" +) + +const ( + address = "localhost:9527" + timestampFormat = time.StampNano // "Jan _2 15:04:05.000" +) + +func unaryCallWithMetadata(c pb.GreeterClient, name string) { + grpclog.Printf("------------ unary ------------") + // create metadata and context + md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) + ctx := metadata.NewContext(context.Background(), md) + + // call RPC + var header, trailer metadata.MD + r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name}, grpc.Header(&header), grpc.Trailer(&trailer)) + if err != nil { + grpclog.Fatalf("failed to call SayHello: %v", err) + } + + if t, ok := header["timestamp"]; ok { + grpclog.Printf("timestamp from header:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } + if l, ok := header["location"]; ok { + grpclog.Printf("location from header:") + for i, e := range l { + grpclog.Printf(" %d. %s", i, e) + } + } + grpclog.Printf("message:") + grpclog.Printf(" - %s", r.Message) + if t, ok := trailer["timestamp"]; ok { + grpclog.Printf("timestamp from trailer:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } +} + +func serverStreamingWithMetadata(c pb.GreeterClient, names []string) { + grpclog.Printf("------------ server streaming ------------") + // create metadata and context + md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) + ctx := metadata.NewContext(context.Background(), md) + + // call RPC + stream, err := c.ServerStreamingSayHello(ctx, &pb.StreamingHelloRequest{Names: names}) + if err != nil { + grpclog.Fatalf("failed to call ServerStreamingSayHello: %v", err) + } + + // read header + header, err := stream.Header() + if err != nil { + grpclog.Fatalf("failed to get header from stream: %v", err) + } + if t, ok := header["timestamp"]; ok { + grpclog.Printf("timestamp from header:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } + if l, ok := header["location"]; ok { + grpclog.Printf("location from header:") + for i, e := range l { + grpclog.Printf(" %d. %s", i, e) + } + } + + // read response + var rpcStatus error + grpclog.Printf("message:") + for { + r, err := stream.Recv() + if err != nil { + rpcStatus = err + break + } + grpclog.Printf(" - %s", r.Message) + } + if rpcStatus != io.EOF { + grpclog.Fatalf("failed to finish server streaming: %v", rpcStatus) + } + + // read trailer + trailer := stream.Trailer() + if t, ok := trailer["timestamp"]; ok { + grpclog.Printf("timestamp from trailer:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } +} + +func clientStreamWithMetadata(c pb.GreeterClient, names []string) { + grpclog.Printf("------------ client streaming ------------") + // create metadata and context + md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) + ctx := metadata.NewContext(context.Background(), md) + + // call RPC + stream, err := c.ClientStreamingSayHello(ctx) + if err != nil { + grpclog.Fatalf("failed to call ClientStreamingSayHello: %v\n", err) + } + + // read header + header, err := stream.Header() + if err != nil { + grpclog.Fatalf("failed to get header from stream: %v", err) + } + if t, ok := header["timestamp"]; ok { + grpclog.Printf("timestamp from header:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } + if l, ok := header["location"]; ok { + grpclog.Printf("location from header:") + for i, e := range l { + grpclog.Printf(" %d. %s", i, e) + } + } + + // send request to stream + for _, name := range names { + if err := stream.Send(&pb.HelloRequest{Name: name}); err != nil { + grpclog.Fatalf("failed to send streaming: %v\n", err) + } + } + + // read response + r, err := stream.CloseAndRecv() + if err != nil { + grpclog.Fatalf("failed to CloseAndRecv: %v\n", err) + } + grpclog.Printf("message:") + for _, m := range r.Messages { + grpclog.Printf(" - %s\n", m) + } + + // read trailer + trailer := stream.Trailer() + if t, ok := trailer["timestamp"]; ok { + grpclog.Printf("timestamp from trailer:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } +} + +func bidirectionalWithMetadata(c pb.GreeterClient, names []string) { + grpclog.Printf("------------ bidirectional ------------") + // create metadata and context + md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) + ctx := metadata.NewContext(context.Background(), md) + + // call RPC + stream, err := c.BidirectionalStreamingSayHello(ctx) + if err != nil { + grpclog.Fatalf("failed to call BidirectionalStreamingSayHello: %v\n", err) + } + + go func() { + // read header + header, err := stream.Header() + if err != nil { + grpclog.Fatalf("failed to get header from stream: %v", err) + } + if t, ok := header["timestamp"]; ok { + grpclog.Printf("timestamp from header:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } + if l, ok := header["location"]; ok { + grpclog.Printf("location from header:") + for i, e := range l { + grpclog.Printf(" %d. %s", i, e) + } + } + + // send request + for _, name := range names { + if err := stream.Send(&pb.HelloRequest{Name: name}); err != nil { + grpclog.Fatalf("failed to send streaming: %v\n", err) + } + } + stream.CloseSend() + }() + + // read response + var rpcStatus error + grpclog.Printf("message:") + for { + r, err := stream.Recv() + if err != nil { + rpcStatus = err + break + } + grpclog.Printf(" - %s", r.Message) + } + if rpcStatus != io.EOF { + grpclog.Fatalf("failed to finish server streaming: %v", rpcStatus) + } + + // read trailer + trailer := stream.Trailer() + if t, ok := trailer["timestamp"]; ok { + grpclog.Printf("timestamp from trailer:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } + +} + +var names = []string{ + "Anne", + "Hope", + "Margeret", + "Jamar", + "Judson", + "Carrol", +} + +func main() { + // Set up a connection to the server. + conn, err := grpc.Dial(address, grpc.WithInsecure()) + if err != nil { + grpclog.Fatalf("did not connect: %v", err) + } + defer conn.Close() + + c := pb.NewGreeterClient(conn) + + unaryCallWithMetadata(c, names[0]) + time.Sleep(1 * time.Second) + + serverStreamingWithMetadata(c, names) + time.Sleep(1 * time.Second) + + clientStreamWithMetadata(c, names) + time.Sleep(1 * time.Second) + + bidirectionalWithMetadata(c, names) +} diff --git a/examples/metadata/helloworld_metadata_server/main.go b/examples/metadata/helloworld_metadata_server/main.go new file mode 100644 index 00000000000..c52dd23e252 --- /dev/null +++ b/examples/metadata/helloworld_metadata_server/main.go @@ -0,0 +1,236 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package main + +import ( + "fmt" + "io" + "math/rand" + "net" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + pb "google.golang.org/grpc/examples/metadata/helloworld" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" +) + +const ( + port = ":9527" + timestampFormat = time.StampNano + smallDuration = time.Second +) + +var greetingWords = []string{ + "Aloha", + "Ahoy", + "Bonjour", + "G'day", + "Hello", + "Hey", + "Hi", + "Hola", + "Howdy", + "Sup", + "What's up", + "Yo", +} + +type server struct{} + +// SayHello implements unary call handler with metadata handling. +func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { + grpclog.Printf("----------- SayHello -----------") + // create trailer, using defer to record timestamp of function return + defer func() { + trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) + grpc.SetTrailer(ctx, trailer) + }() + + // read metadata from client + md, ok := metadata.FromContext(ctx) + if !ok { + return nil, grpc.Errorf(codes.DataLoss, "SayHello: failed to get metadata") + } + if t, ok := md["timestamp"]; ok { + grpclog.Printf("timestamp from metadata:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } + + // create and send header + header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)}) + grpc.SendHeader(ctx, header) + + grpclog.Printf("request received: %v, sending greeting", in) + + return &pb.HelloReply{Message: fmt.Sprintf("%s, %s", greetingWords[rand.Intn(len(greetingWords))], in.Name)}, nil +} + +// ServerStreamingSayHello implements server streaming handler with metadata handling. +func (s *server) ServerStreamingSayHello(in *pb.StreamingHelloRequest, stream pb.Greeter_ServerStreamingSayHelloServer) error { + grpclog.Printf("----------- ServerStreamingSayHello -----------") + // create trailer, using defer to record timestamp of function return + defer func() { + trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) + stream.SetTrailer(trailer) + }() + + // read metadata from client + md, ok := metadata.FromContext(stream.Context()) + if !ok { + return grpc.Errorf(codes.DataLoss, "ServerStreamingSayHello: failed to get metadata") + } + if t, ok := md["timestamp"]; ok { + grpclog.Printf("timestamp from metadata:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } + + // create and send header + header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)}) + stream.SendHeader(header) + + grpclog.Printf("request received: %v\n", in) + + // read request and send response + for _, name := range in.Names { + grpclog.Printf("sending greeting for %v\n", name) + err := stream.Send(&pb.HelloReply{Message: fmt.Sprintf("%s, %s", greetingWords[rand.Intn(len(greetingWords))], name)}) + if err != nil { + return err + } + } + return nil +} + +// ClientStreamingSayHello implements client streaming handler with metadata handling +func (s *server) ClientStreamingSayHello(stream pb.Greeter_ClientStreamingSayHelloServer) error { + grpclog.Printf("----------- ClientStreamingSayHello -----------") + // create trailer, using defer to record timestamp of function return + defer func() { + trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) + stream.SetTrailer(trailer) + }() + + // read metadata from client + md, ok := metadata.FromContext(stream.Context()) + if !ok { + return grpc.Errorf(codes.DataLoss, "ServerStreamingSayHello: failed to get metadata") + } + if t, ok := md["timestamp"]; ok { + grpclog.Printf("timestamp from metadata:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } + + // create and send header + header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)}) + stream.SendHeader(header) + + // read request and send response + var messages []string + for { + in, err := stream.Recv() + if err == io.EOF { + grpclog.Printf("sending all greetings") + return stream.SendAndClose(&pb.StreamingHelloReply{Messages: messages}) + } + grpclog.Printf("request received: %v, building greeting", in) + if err != nil { + return err + } + messages = append(messages, fmt.Sprintf("%s, %s", greetingWords[rand.Intn(len(greetingWords))], in.Name)) + } +} + +// BidirectionalStreamingSayHello implements bidirectional streaming handler with metadata handling +func (s *server) BidirectionalStreamingSayHello(stream pb.Greeter_BidirectionalStreamingSayHelloServer) error { + grpclog.Printf("----------- BidirectionalStreamingSayHello -----------") + // create trailer, using defer to record timestamp of function return + defer func() { + trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) + stream.SetTrailer(trailer) + }() + + // read metadata from client + md, ok := metadata.FromContext(stream.Context()) + if !ok { + return grpc.Errorf(codes.DataLoss, "BidirectionalStreamingSayHello: failed to get metadata") + } + + if t, ok := md["timestamp"]; ok { + grpclog.Printf("timestamp from metadata:") + for i, e := range t { + grpclog.Printf(" %d. %s", i, e) + } + } + + // create and send header + header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)}) + stream.SendHeader(header) + + // read request and send response + for { + in, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + grpclog.Printf("request received %v, sending greeting", in) + if err := stream.Send(&pb.HelloReply{Message: fmt.Sprintf("%s, %s", greetingWords[rand.Intn(len(greetingWords))], in.Name)}); err != nil { + return err + } + } +} + +func main() { + rand.Seed(time.Now().UnixNano()) + lis, err := net.Listen("tcp", port) + if err != nil { + grpclog.Fatalf("failed to listen: %v", err) + } + grpclog.Printf("server listening at port %v", port) + + s := grpc.NewServer() + pb.RegisterGreeterServer(s, &server{}) + s.Serve(lis) +} From 208a0d739f83f55e64a91b30d85db6db8da70894 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 25 Apr 2016 10:44:27 -0700 Subject: [PATCH 2/2] Update proto generated code --- examples/metadata/helloworld/helloworld.pb.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/examples/metadata/helloworld/helloworld.pb.go b/examples/metadata/helloworld/helloworld.pb.go index c4a7ced7aaa..4cc81648bc6 100644 --- a/examples/metadata/helloworld/helloworld.pb.go +++ b/examples/metadata/helloworld/helloworld.pb.go @@ -87,7 +87,7 @@ var _ grpc.ClientConn // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion1 +const _ = grpc.SupportPackageIsVersion2 // Client API for Greeter service @@ -233,16 +233,22 @@ func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { s.RegisterService(&_Greeter_serviceDesc, srv) } -func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { +func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(HelloRequest) if err := dec(in); err != nil { return nil, err } - out, err := srv.(GreeterServer).SayHello(ctx, in) - if err != nil { - return nil, err + if interceptor == nil { + return srv.(GreeterServer).SayHello(ctx, in) } - return out, nil + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/helloworld.Greeter/SayHello", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest)) + } + return interceptor(ctx, in, info, handler) } func _Greeter_ServerStreamingSayHello_Handler(srv interface{}, stream grpc.ServerStream) error {