forked from taggledevel2/ratchet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sftp_reader.go
140 lines (122 loc) · 4.06 KB
/
sftp_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package processors
import (
"github.com/dailyburn/ratchet/data"
"github.com/dailyburn/ratchet/util"
"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
)
// SftpReader reads a single object at a given path, or walks through the
// directory specified by the path (SftpReader.Walk must be set to true).
//
// To only send full paths (and not file contents), set FileNamesOnly to true.
// If FileNamesOnly is set to true, DeleteObjects will be ignored.
type SftpReader struct {
IoReader // embeds IoReader
parameters *util.SftpParameters
client *sftp.Client
DeleteObjects bool
Walk bool
FileNamesOnly bool
initialized bool
CloseOnFinish bool
}
// NewSftpReader instantiates a new sftp reader, a connection to the remote server is delayed until data is recv'd by the reader
// By default, the connection to the remote client will be closed in the Finish() func.
// Set CloseOnFinish to false to manage the connection manually.
func NewSftpReader(server string, username string, path string, authMethods ...ssh.AuthMethod) *SftpReader {
r := SftpReader{
parameters: &util.SftpParameters{
Server: server,
Username: username,
Path: path,
AuthMethods: authMethods,
},
initialized: false,
DeleteObjects: false,
FileNamesOnly: false,
CloseOnFinish: true,
}
r.IoReader.LineByLine = true
return &r
}
// NewSftpReaderByClient instantiates a new sftp reader using an existing connection to the remote server.
// By default, the connection to the remote client will *not* be closed in the Finish() func.
// Set CloseOnFinish to true to have this processor clean up the connection when it's done.
func NewSftpReaderByClient(client *sftp.Client, path string) *SftpReader {
r := SftpReader{
parameters: &util.SftpParameters{Path: path},
client: client,
initialized: true,
DeleteObjects: false,
FileNamesOnly: false,
CloseOnFinish: false,
}
r.IoReader.LineByLine = true
return &r
}
// ProcessData optionally walks through the tree to send each object separately, or sends the single
// object upstream
func (r *SftpReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) {
r.ensureInitialized(killChan)
if r.Walk {
r.walk(outputChan, killChan)
} else {
r.sendObject(r.parameters.Path, outputChan, killChan)
}
}
// Finish optionally closes open references to the remote server
func (r *SftpReader) Finish(outputChan chan data.JSON, killChan chan error) {
if r.CloseOnFinish {
r.CloseClient()
}
}
// CloseClient allows you to manually close the connection to the remote client (as the remote client
// itself is not exported)
func (r *SftpReader) CloseClient() {
r.client.Close()
}
func (r *SftpReader) String() string {
return "SftpReader"
}
func (r *SftpReader) ensureInitialized(killChan chan error) {
if r.initialized {
return
}
client, err := util.SftpClient(r.parameters.Server, r.parameters.Username, r.parameters.AuthMethods)
util.KillPipelineIfErr(err, killChan)
r.client = client
r.initialized = true
}
func (r *SftpReader) walk(outputChan chan data.JSON, killChan chan error) {
walker := r.client.Walk(r.parameters.Path)
for walker.Step() {
util.KillPipelineIfErr(walker.Err(), killChan)
if !walker.Stat().IsDir() {
r.sendObject(walker.Path(), outputChan, killChan)
}
}
}
func (r *SftpReader) sendObject(path string, outputChan chan data.JSON, killChan chan error) {
if r.FileNamesOnly {
r.sendFilePath(path, outputChan, killChan)
} else {
r.sendFile(path, outputChan, killChan)
}
}
func (r *SftpReader) sendFilePath(path string, outputChan chan data.JSON, killChan chan error) {
sftpPath := util.SftpPath{Path: path}
d, err := data.NewJSON(sftpPath)
util.KillPipelineIfErr(err, killChan)
outputChan <- d
}
func (r *SftpReader) sendFile(path string, outputChan chan data.JSON, killChan chan error) {
file, err := r.client.Open(path)
util.KillPipelineIfErr(err, killChan)
defer file.Close()
r.IoReader.Reader = file
r.IoReader.ProcessData(nil, outputChan, killChan)
if r.DeleteObjects {
err = r.client.Remove(path)
util.KillPipelineIfErr(err, killChan)
}
}