-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgdrive_pyinteract.py
179 lines (119 loc) · 5.3 KB
/
gdrive_pyinteract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
from pydrive.drive import GoogleDrive
from pydrive.auth import GoogleAuth
import os
class client:
pass
def set_client_config_path(path_to_secrets_file):
GoogleAuth.DEFAULT_SETTINGS['client_config_file'] = path_to_secrets_file
def authenticate_client(creds_path):
client.gauth = GoogleAuth()
gauth=client.gauth
# Try to load saved client credentials
gauth.LoadCredentialsFile(creds_path)
if gauth.credentials is None:
# Authenticate if they're not there
gauth.GetFlow()
gauth.flow.params.update({'access_type': 'offline'})
gauth.flow.params.update({'approval_prompt': 'force'})
gauth.LocalWebserverAuth()
elif gauth.access_token_expired:
# Refresh them if expired
gauth.Refresh()
else:
# Initialize the saved creds
gauth.Authorize()
# Save the current credentials to a file
gauth.SaveCredentialsFile(creds_path) #Note that credentials will expire after some time and may not refresh. When this happens, delete the mycreds.txt file and run the program again. A new and valid mycreds.txt will automatically be created.
client.drive = GoogleDrive(client.gauth)
return client
def list_files(client, folder_id):
# authenticate_client()
return client.drive.ListFile({'q': f"'{folder_id}' in parents and trashed=false"}).GetList()
def get_id(client, folder_path):
fileID = None
try:
folder_path = folder_path.split('/')
fileList = client.drive.ListFile({'q': "'root' in parents and trashed=false"}).GetList()
for file in fileList:
if file['title']==folder_path[0]:
fileID = file['id']
break
folder_path.pop(0)
for folder_name in folder_path:
fileList = client.drive.ListFile({'q': f"'{fileID}' in parents and trashed=false"}).GetList()
for file in fileList:
if file['title']==folder_name:
fileID = file['id']
break
except:
fileID = False
finally:
return fileID
def upload_file(client, target_folder_path , home_path, file_name):
# authenticate_client()
target_id = get_id(client, target_folder_path)
if target_id == False:
print(f"[ ERROR in gprocesses.py ] : {target_folder_path} not found.")
return False
home_path = rf"{home_path}"
f = client.drive.CreateFile({
'title': file_name,
'parents': [{'id': target_id}]
})
f.SetContentFile(os.path.join(home_path, file_name))
f.Upload()
# Example: upload_file(client, '<drive_folder_name>/<drive_folder_name>/.../<drive_folder_name>',rf"C:/.../<system_directory_name>", "<file_name>")
def upload_folder(client, target_folder_path, home_path):
target_id = get_id(client, target_folder_path)
if target_id == False:
print(f"[ ERROR in gprocesses.py ] : {target_folder_path} not found.")
return False
home_path = rf"{home_path}"
folder_name = os.path.basename(home_path)
f = client.drive.CreateFile({
'title': folder_name,
'parents': [{'id': target_id}],
'mimeType': 'application/vnd.google-apps.folder'
})
folder = client.drive.CreateFile(f)
folder.Upload()
for name in os.listdir(home_path):
path = home_path +'/'+ name
if os.path.isfile(path):
upload_file(client, target_folder_path+'/'+folder_name, home_path, name)
elif os.path.isdir(path):
upload_folder(client, target_folder_path+'/'+folder_name, path)
# Example: upload_folder(client, '<drive_folder_name>/<drive_folder_name>/.../<drive_folder_name>',rf"C:/.../<folder_name>")
def download_file(client, target_file_path, home_path):
# authenticate_client()
target_id = get_id(client, target_file_path)
if target_id == False:
print(f"[ ERROR in gprocesses.py ] : {target_file_path} not found.")
return False
home_path = rf"{home_path}"
file = client.drive.CreateFile({'id': target_id})
working_path = os.getcwd()
os.chdir(home_path)
file.GetContentFile(file['title'])
os.chdir(working_path)
# Example: download_file(client, "<drive_folder_name>/<drive_folder_name>/.../<file_name>", "C:/.../<system_directory_name>")
def download_folder(client, target_folder_path, home_path, files_only = True):
#If files_only = True, only files will be downloaded. If files_only = False, parent folder containing the files will be downloaded along with its contents.
# authenticate_client()
working_path = os.getcwd()
target_id = get_id(client, target_folder_path)
if target_id == False:
print(f"[ ERROR in gprocesses.py ] : {target_folder_path} not found.")
return False
home_path = rf"{home_path}"
if files_only == False:
home_path = os.path.join(home_path,target_folder_path.split('/')[len(target_folder_path.split('/'))-1])
print(home_path)
os.mkdir(home_path, 0o666)
files=list_files(client, target_id)
for file in files:
file = client.drive.CreateFile({'id': file['id']})
os.chdir(home_path)
file.GetContentFile(file['title'])
os.chdir(working_path)
# download_folder (client, "<drive_folder_name>/<drive_folder_name>/.../<drive_folder_name>", "C:/.../<system_directory_name>")