/
grpc_sending_files_client.go
135 lines (116 loc) · 3.49 KB
/
grpc_sending_files_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package main
import (
"io"
"os"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip"
"github.com/amazingchow/grpc-playground/grpc-file-transfer-tool/api"
"github.com/amazingchow/grpc-playground/grpc-file-transfer-tool/common"
)
// GRPCStreamClient gRPC流客户端
type GRPCStreamClient struct {
logger zerolog.Logger
cfg *GRPCStreamClientCfg
client api.GrpcStreamServiceClient
conn *grpc.ClientConn
}
// GRPCStreamClientCfg gRPC流客户端配置
type GRPCStreamClientCfg struct {
Address string `json:"address"`
ChunkSize int `json:"chunk_size"`
Compressed bool `json:"compressed"`
RootCert string `json:"root_cert"`
}
// NewGRPCStreamClient 返回GRPCStreamClient实例.
func NewGRPCStreamClient(cfg *GRPCStreamClientCfg) (*GRPCStreamClient, error) {
var (
opts = []grpc.DialOption{}
err error
)
if cfg.Address == "" {
return nil, errors.Errorf("address must be specified")
}
if cfg.ChunkSize <= 0 {
return nil, errors.Errorf("chunk_size must be specified")
} else if cfg.ChunkSize > (1 << 22) {
return nil, errors.Errorf("chunk_size must be less than 4MB")
}
if cfg.Compressed {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
}
if cfg.RootCert != "" {
creds, err := credentials.NewClientTLSFromFile(cfg.RootCert, "SummyChou") // change the serverNameOverride for yourself
if err != nil {
return nil, errors.Wrapf(err, "failed to create tls-grpc-client using root-cert '%s'", cfg.RootCert)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
cli := &GRPCStreamClient{}
cli.logger = zerolog.New(os.Stdout).With().Str("from", "grpc stream client").Logger()
cli.cfg = cfg
if cli.conn, err = grpc.Dial(cfg.Address, opts...); err != nil {
return nil, errors.Wrapf(err, "failed to create tls-grpc-connection with address %s", cfg.Address)
}
cli.client = api.NewGrpcStreamServiceClient(cli.conn)
return cli, nil
}
// Close 停止运行gRPC流客户端.
func (cli *GRPCStreamClient) Close() {
if cli.conn != nil {
cli.conn.Close() // nolint
}
}
// UploadFile 上传文件.
func (cli *GRPCStreamClient) UploadFile(ctx context.Context, fn string) (*common.Stats, error) {
var (
status *api.UploadStatus
stats = &common.Stats{}
)
fd, err := os.Open(fn)
if err != nil {
return nil, errors.Wrapf(err, "failed to open file '%s'", fn)
}
defer fd.Close()
stream, err := cli.client.Upload(ctx)
if err != nil {
return nil, errors.Wrapf(err, "failed to create upload stream for file %s", fn)
}
defer func() {
stream.CloseSend() // nolint
}()
// start to send
stats.StartedAt = time.Now()
buffer := make([]byte, cli.cfg.ChunkSize)
WRITE_LOOP:
for {
n, err := fd.Read(buffer)
if err != nil {
if err == io.EOF {
break WRITE_LOOP
}
return nil, errors.Wrapf(err, "failed unexpectedly while copying from file to buffer")
}
if err = stream.Send(&api.FileChunk{
Content: buffer[:n],
}); err != nil {
return nil, errors.Wrapf(err, "failed to send chunk via grpc stream")
}
}
// finish to receive
stats.FinishedAt = time.Now()
status, err = stream.CloseAndRecv()
if err != nil {
return nil, errors.Wrapf(err, "failed to receive upstream status response")
}
if status.Code != api.UploadStatusCode_STATUS_CODE_OK {
return nil, errors.Errorf("upload failed, msg: %s", status.Message)
}
return stats, nil
}