-
Notifications
You must be signed in to change notification settings - Fork 0
/
googledriveapi_async.py
219 lines (192 loc) · 8 KB
/
googledriveapi_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import os
import sys
import argparse
import asyncio
import json
from collections import defaultdict
import aiohttp
from aiohttp import web
import aiofiles
from googleapiclient.discovery import build
from httplib2 import Http
from oauth2client import file, client, tools
from tqdm import tqdm
DEFAULT_CONCUR_REQ = 30
MAX_CONCUR_REQ = 50
class UploadError(Exception):
def __init__(self, file_path, folder_id):
self.file_path = file_path
self.folder_id = folder_id
def create_drive():
'''
* authorize access to user's google drive
* access information is stored as 'storage.json'
'''
SCOPES = 'https://www.googleapis.com/auth/drive.file'
store = file.Storage('storage.json')
creds = store.get()
if not creds or creds.invalid:
print("Access Grant Needed")
flow = client.flow_from_clientsecrets('client_secret_drive.json', SCOPES)
creds = tools.run_flow(flow, store)
DRIVE = build('drive', 'v3', http=creds.authorize(Http()))
return DRIVE
def get_token():
'''
* returns access token
'''
with open('storage.json', 'r') as f:
creds = json.load(f)
token = creds["access_token"]
return token
async def post_file(session, file_path, folder_id):
'''
* posts a single file to the designated folder
* args:
- session: aiohttp session
- file_path : absolute path of a file (e.g.) C:\Git\GoogleDriveAPI\test2.jpg
- folder_id: folder id of the designated folder in google drive (e.g.) '1Q6gaU4kHaLRN5psS4S_2Yx_*******'
'''
global token
file_name = file_path.split(os.path.sep)[-1]
url = "https://www.googleapis.com/upload/drive/v3/files"
file_metadata = {"name": file_name,
"parents": [folder_id],
}
data = aiohttp.FormData()
data.add_field(
"metadata",
json.dumps(file_metadata),
content_type="application/json; charset=UTF-8",
)
async with aiofiles.open(file_path, mode='rb') as f:
chuck = await f.read()
data.add_field("file", chuck)
headers = {"Authorization": "Bearer {}".format(token)}
params = {"uploadType": "multipart"}
async with session.post(url, data=data, params=params, headers=headers) as resp:
if resp.status == 200:
return
else:
raise aiohttp.web.HTTPException(headers=resp.headers, reason=resp.reason, text=resp.text)
async def upload_file(session, semaphore, file_path, folder_id):
'''
* uploads a file to the designated folder in google drive
* args:
- session: aiohttp session
- semaphore: aiohttp Semaphore object
- file_path : absolute path of a file (e.g.) C:\Git\GoogleDriveAPI\test2.jpg
- folder_id: folder id of the designated folder in google drive (e.g.) '1Q6gaU4kHaLRN5psS4S_2Yx_*******'
'''
async with semaphore:
try:
await post_file(session, file_path, folder_id)
except Exception as exc:
raise UploadError(file_path, folder_id) from exc
async def upload_files(file_paths, folder_name, folder_id, concur_req=DEFAULT_CONCUR_REQ):
'''
* uploads files to google drive
* returns list of tuples (path of file, folder_id) that failed to be uploaded
* args:
- file_paths: list of local file names, (e.g.) ['C:\Git\GoogleDriveAPI\hello.txt', ...]
- folder_name: local folder name, (e.g.) 'train2014'
- folder_id: target folder's id in google drive, (e.g.) '1FzI5QChbh4Q-nEQGRu8D-********'
- concur_req: maximum concurrent connections allowed
'''
print('Uploading {} files to {}...'.format(len(file_paths), folder_name))
sys.stdout.flush()
failed = []
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(concur_req)
jobs = [upload_file(session, semaphore, file_path, folder_id) for file_path in file_paths]
jobs = asyncio.as_completed(jobs)
jobs = tqdm(jobs, total=len(file_paths))
for job in jobs:
try:
await job
except UploadError as exc:
print(exc)
failed.append((exc.file_path, exc.folder_id))
return failed
def upload_folder(folder_path, folder_id, concur_req=DEFAULT_CONCUR_REQ, retry=True):
'''
* uploads folder to google drive
* stores dictionary (folder_id: [file_path, ...]) that failed to be uploaded as 'failed.json'
* args:
- fold_path: target folder's path (e.g.) 'C:\Git\GoogleDriveAPI\test'
- folder_id: target folder's id in google drive, (e.g.) '1FzI5QChbh4Q-nEQGRu8D-********'
- concur_req: maximum concurrent connections allowed
- retry: retries uploading failed uploads if set to True
'''
global drive
loop = asyncio.get_event_loop()
dic = {folder_path: folder_id}
failed = defaultdict(list)
for path, dirs, files in os.walk(folder_path):
current_folder_id = dic[path]
for dir in dirs:
file_metadata = {
'name': dir,
'mimeType': 'application/vnd.google-apps.folder',
'parents': [current_folder_id],
}
file = drive.files().create(body=file_metadata, fields='id').execute()
dic[os.path.join(path, dir)] = file.get('id')
folder_name = path.split(os.path.sep)[-1]
files = [os.path.join(path, file) for file in files]
fails = loop.run_until_complete(upload_files(files, folder_name, current_folder_id, concur_req))
for key, val in fails:
failed[val].append(key)
with open('failed.json', 'w') as f:
json.dump(failed, f)
if retry:
with open('failed.json', 'r') as f:
failed = json.load(f)
if failed:
print('Retrying to upload failed ones...')
sys.stdout.flush()
new_failed = defaultdict(list)
for key, files in failed.items():
if files:
folder_name = files[0].split(os.path.sep)[-2]
fails = loop.run_until_complete(upload_files(files, folder_name, key, concur_req))
for new_key, new_val in fails:
new_failed[new_val].append(new_key)
with open('failed.json', 'w') as f:
json.dump(new_failed, f)
loop.close()
if __name__ == '__main__':
'''
python googledriveapi_async.py C:\Git\PytorchBasic\caption\data_dir 1CwX9S8mJSL_43oJgLdoiNoF55yU7Vj**
'''
# parser = argparse.ArgumentParser(
# description='Upload folder including all sub-folders to google drive.')
# parser.add_argument('folder_path',
# help='folder_path: local folder path to upload'
# '(e.g.) C:\Git\PytorchBasic')
# parser.add_argument('folder_id',
# help='folder_id: target folder\'s id in google drive'
# '(e.g.) 1FzI5QChbh4Q-nEQGRu8D-********')
# parser.add_argument('concur_req', nargs='?', default=DEFAULT_CONCUR_REQ, type=int,
# help='concur_req: maximum number of concurrent connections'
# '(Default) DEFAULT_CONCUR_REQ')
# parser.add_argument('enable_retry', nargs='?', default=True, type=bool,
# help='retry uploading failed uploads if set to True'
# '(Default) True')
# args = parser.parse_args()
# if not os.path.isdir(args.folder_path):
# print('*** Folder path error: invalid path')
# parser.print_usage()
# sys.exit(1)
# folder_path = args.folder_path
# folder_id = args.folder_id
# concur_req = args.concur_req
# enable_retry = args.enable_retry
folder_path = r'C:\Users\chsze\Desktop\Georgia Tech\Second Semester\Deep Learning\Project\DATASET'
folder_id = '1oHa63uLxPcdBkSVixbv_skMt-nOJ7z**'
concur_req = DEFAULT_CONCUR_REQ
enable_retry = True
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
drive = create_drive()
token = get_token()
upload_folder(folder_path, folder_id, concur_req, enable_retry)