forked from simonedegiacomi/gphotosuploader
/
uploader.go
188 lines (152 loc) · 5.12 KB
/
uploader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package utils
import (
"fmt"
"log"
"os"
"path/filepath"
"sync"
"github.com/GaPhi/gphotosuploader/api"
"github.com/GaPhi/gphotosuploader/auth"
)
// Simple client used to implement the tool that can upload multiple photos or videos at once
type ConcurrentUploader struct {
credentials auth.CookieCredentials
// Optional field to specify the destination album
albumId string
// Buffered channel to limit concurrent uploads
concurrentLimiter chan bool
// Map of uploaded files (used as a set)
uploadedFiles map[string]bool
// Waiting group used for the implementation of the Wait method
waitingGroup sync.WaitGroup
// Flag to indicate if the client is waiting for all the upload to finish
waiting bool
// Flag to indicate that no other upload shall be attempted
stopUploads bool
CompletedUploads chan string
IgnoredUploads chan string
Errors chan error
}
// Creates a new ConcurrentUploader using the specified credentials.
// The second argument is the id of the album in which images are going to be added when uploaded. Use an empty string
// if you don't want to move the images in to a specific album. The third argument is the maximum number of concurrent
// uploads (which must not be 0).
func NewUploader(credentials auth.CookieCredentials, albumId string, maxConcurrentUploads int) (*ConcurrentUploader, error) {
if maxConcurrentUploads <= 0 {
return nil, fmt.Errorf("maxConcurrentUploads must be greater than zero")
}
return &ConcurrentUploader{
credentials: credentials,
albumId: albumId,
concurrentLimiter: make(chan bool, maxConcurrentUploads),
uploadedFiles: make(map[string]bool),
CompletedUploads: make(chan string),
IgnoredUploads: make(chan string),
Errors: make(chan error),
}, nil
}
// Add files to the list of already uploaded files
func (u *ConcurrentUploader) AddUploadedFiles(files ...string) {
for _, name := range files {
u.uploadedFiles[name] = true
}
}
// Enqueue a new upload. You must not call this method while waiting for some uploads to finish (The method return an
// error if you try to do it).
// Due to the fact that this method is asynchronous, if nil is return it doesn't mean the the upload was completed:
// for that use the Errors and CompletedUploads channels
func (u *ConcurrentUploader) EnqueueUpload(filePath string) error {
if u.waiting {
return fmt.Errorf("can't add new uploads while waiting queued uploads to finish")
}
// We need to use the absolute path of the file, to avoid multiple uploads of the same file if the tool is executed
// from different directories
if !filepath.IsAbs(filePath) {
if abs, err := filepath.Abs(filePath); err != nil {
log.Printf("uploader: Can't get the absolute path of file to upload, using relative path. Error: %v\n", err)
} else {
filePath = abs
}
}
if u.wasFileAlreadyUploaded(filePath) {
u.IgnoredUploads <- filePath
return nil
}
// Check if the file is an image or a video
if valid, err := IsImageOrVideo(filePath); err != nil {
u.sendError(filePath, err)
return nil
} else if !valid {
u.IgnoredUploads <- filePath
return nil
}
if u.stopUploads {
u.Errors <- fmt.Errorf("stopping uploads (%v)", filePath)
return nil
}
started := make(chan bool)
go u.uploadFile(filePath, started)
<-started
return nil
}
func (u *ConcurrentUploader) wasFileAlreadyUploaded(filePath string) bool {
_, uploaded := u.uploadedFiles[filePath]
return uploaded
}
func (u *ConcurrentUploader) uploadFile(filePath string, started chan bool) {
u.joinGroupAndWaitForTurn(started)
defer u.leaveGroupAndNotifyNextUpload()
// Open the file
file, err := os.Open(filePath)
if err != nil {
u.sendError(filePath, err)
return
}
defer func(file *os.File) {
_ = file.Close()
}(file)
// Create options
options, err := api.NewUploadOptionsFromFile(file)
if err != nil {
u.sendError(filePath, err)
return
}
options.AlbumId = u.albumId
// Create a new upload
upload, err := api.NewUpload(options, u.credentials)
if err != nil {
u.sendError(filePath, err)
return
}
// Try to upload the image
if u.stopUploads {
u.sendError(filePath, fmt.Errorf("stopping uploads"))
} else if _, err := upload.Upload(); err != nil {
u.stopUploads = true
u.sendError(filePath, err)
} else {
u.uploadedFiles[filePath] = true
u.CompletedUploads <- filePath
}
}
func (u *ConcurrentUploader) sendError(filePath string, err error) {
u.Errors <- fmt.Errorf("Error with '%s': %s\n", filePath, err)
}
func (u *ConcurrentUploader) joinGroupAndWaitForTurn(started chan bool) {
u.waitingGroup.Add(1)
started <- true
// Insert something in the channel. We remove values from it only when we complete an upload, blocking the
// goroutines if we exceed the maxConcurrentUpload
u.concurrentLimiter <- true
}
func (u *ConcurrentUploader) leaveGroupAndNotifyNextUpload() {
u.waitingGroup.Done()
// Remove a value to empty the channel or to unlock a waiting gorutine
<-u.concurrentLimiter
}
// Blocks this goroutine until all the upload are completed. You can not add uploads when a goroutine call this method
func (u *ConcurrentUploader) WaitUploadsCompleted() {
u.waiting = true
u.waitingGroup.Wait()
u.waiting = false
}