/
cp.go
130 lines (112 loc) · 3.45 KB
/
cp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package utils
import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"github.com/duc-cnzj/mars/v4/internal/contracts"
"github.com/duc-cnzj/mars/v4/internal/utils/recovery"
app "github.com/duc-cnzj/mars/v4/internal/app/helper"
"github.com/duc-cnzj/mars/v4/internal/mlog"
"github.com/dustin/go-humanize"
"github.com/mholt/archiver/v3"
)
type defaultArchiver struct{}
func NewDefaultArchiver() contracts.Archiver {
return &defaultArchiver{}
}
func (m *defaultArchiver) Archive(sources []string, destination string) error {
return archiver.Archive(sources, destination)
}
func (m *defaultArchiver) Open(path string) (io.ReadCloser, error) {
return os.Open(path)
}
func (m *defaultArchiver) Remove(path string) error {
return os.Remove(path)
}
type fileCopier struct {
archiver contracts.Archiver
executor contracts.RemoteExecutor
}
func NewFileCopier(executor contracts.RemoteExecutor, archiver contracts.Archiver) contracts.PodFileCopier {
return &fileCopier{executor: executor, archiver: archiver}
}
func (fc *fileCopier) Copy(namespace, pod, container, fpath, targetContainerDir string, clientSet kubernetes.Interface, config *restclient.Config) (*contracts.CopyFileToPodResult, error) {
var (
errbf, outbf = bytes.NewBuffer([]byte{}), bytes.NewBuffer([]byte{})
reader, outStream = io.Pipe()
uploader = app.Uploader()
localUploader = app.LocalUploader()
)
if targetContainerDir == "" {
targetContainerDir = "/tmp"
}
st, err := uploader.Stat(fpath)
if err != nil {
return nil, err
}
if st.Size() > app.Config().MaxUploadSize() {
return nil, fmt.Errorf("最大不得超过 %s, 你上传的文件大小是 %s", humanize.Bytes(app.Config().MaxUploadSize()), humanize.Bytes(uint64(st.Size())))
}
baseName := filepath.Base(fpath)
path := filepath.Join(filepath.Dir(fpath), baseName+".tar.gz")
mlog.Debugf("[CopyFileToPod]: %v", path)
var localPath string = fpath
// 如果是非 local 类型的,需要远程下载到 local 进行打包,再上传到容器
if uploader.Type() != contracts.Local {
read, err := uploader.Read(fpath)
if err != nil {
return nil, err
}
defer read.Close()
if localUploader.Exists(localPath) {
localUploader.Delete(localPath)
}
put, err := localUploader.Put(localPath, read)
if err != nil {
return nil, err
}
localPath = put.Path()
defer localUploader.Delete(localPath)
}
if err := fc.archiver.Archive([]string{localPath}, path); err != nil {
return nil, err
}
defer fc.archiver.Remove(path)
src, err := fc.archiver.Open(path)
if err != nil {
return nil, err
}
wg := sync.WaitGroup{}
wg.Add(1)
defer wg.Wait()
go func(reader *io.PipeReader, outStream *io.PipeWriter, src io.ReadCloser) {
defer func() {
reader.Close()
outStream.Close()
src.Close()
wg.Done()
}()
defer recovery.HandlePanic("CopyFileToPod")
if _, err := io.Copy(outStream, src); err != nil {
mlog.Error(err)
}
}(reader, outStream, src)
err = fc.executor.
WithCommand([]string{"tar", "-zmxf", "-", "-C", targetContainerDir}).
WithMethod("POST").
WithContainer(namespace, pod, container).
Execute(context.TODO(), clientSet, config, reader, outbf, errbf, false, nil)
return &contracts.CopyFileToPodResult{
TargetDir: targetContainerDir,
ErrOut: errbf.String(),
StdOut: outbf.String(),
ContainerPath: filepath.Join(targetContainerDir, baseName),
FileName: baseName,
}, err
}