From b1f2c1189a00b3bc46c6eb1a29b9120ce59be2a8 Mon Sep 17 00:00:00 2001 From: aoaostar Date: Mon, 26 Jul 2021 21:56:24 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=AE=9A=E6=97=B6=E5=88=B7?= =?UTF-8?q?=E6=96=B0token?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AliyunDrive.py | 59 ++++++++++++++++++++++++++++++-------------------- Client.py | 34 +++++++++++++++++++---------- common.py | 23 +++++++++++++++----- main.py | 42 ++++++++++++++++++++++++++--------- 4 files changed, 108 insertions(+), 50 deletions(-) diff --git a/AliyunDrive.py b/AliyunDrive.py index c360601..455eb76 100644 --- a/AliyunDrive.py +++ b/AliyunDrive.py @@ -8,12 +8,11 @@ import json import math import os -import sys import requests from tqdm import tqdm +from common import DATA, LOCK_TOKEN_REFRESH import common -from common import LOCK, DATA requests.packages.urllib3.disable_warnings() @@ -42,7 +41,10 @@ def __init__(self, drive_id, root_path, chunk_size=10485760): self.file_id = 0 self.part_number = 0 self.filesize = 0 - self.headers = {} + self.headers = { + 'authorization': DATA['access_token'], + 'content-type': 'application/json;charset=UTF-8' + } self.id = None def load_task(self, task): @@ -91,7 +93,7 @@ def load_file(self, filepath, realpath): self.print(message, 'info') def token_refresh(self): - LOCK.acquire() + LOCK_TOKEN_REFRESH.acquire() try: data = {"refresh_token": DATA['config']['REFRESH_TOKEN']} post = requests.post( @@ -100,7 +102,8 @@ def token_refresh(self): headers={ 'content-type': 'application/json;charset=UTF-8' }, - verify=False + verify=False, + timeout=3 ) try: post_json = post.json() @@ -111,14 +114,15 @@ def token_refresh(self): self.print('refresh_token已经失效', 'warn') raise e - access_token = post_json['access_token'] + DATA['access_token'] = post_json['access_token'] self.headers = { - 'authorization': access_token, + 'authorization': DATA['access_token'], 'content-type': 'application/json;charset=UTF-8' } DATA['config']['REFRESH_TOKEN'] = post_json['refresh_token'] finally: - LOCK.release() + LOCK_TOKEN_REFRESH.release() + return True def create(self, parent_file_id): create_data = { @@ -143,7 +147,8 @@ def create(self, parent_file_id): verify=False ) requests_post_json = request_post.json() - self.check_auth(requests_post_json, lambda: self.create(parent_file_id)) + if not self.check_auth(requests_post_json): + return self.create(parent_file_id) # 覆盖已有文件 if DATA['config']['OVERWRITE'] and requests_post_json.get('exist'): if self.recycle(requests_post_json.get('file_id')): @@ -171,7 +176,8 @@ def get_upload_url(self): verify=False ) requests_post_json = requests_post.json() - self.check_auth(requests_post_json, self.get_upload_url) + if not self.check_auth(requests_post_json): + return self.get_upload_url() self.print('【%s】上传地址刷新成功' % self.filename, 'info') return requests_post_json.get('part_info_list') @@ -203,7 +209,8 @@ def upload(self): pass else: self.print(res.text, 'error') - res.raise_for_status() + # res.raise_for_status() + return False self.part_number += 1 udata = { "part_number": self.part_number, @@ -224,8 +231,8 @@ def complete(self): ) requests_post_json = complete_post.json() - self.check_auth(requests_post_json, self.complete) - + if not self.check_auth(requests_post_json): + return self.complete() self.finish_time = common.get_timestamp() self.spend_time = self.finish_time - self.start_time @@ -244,14 +251,16 @@ def create_folder(self, folder_name, parent_folder_id): "check_name_mode": "refuse", "type": "folder" } - create_post = requests.post( + requests_post = requests.post( 'https://api.aliyundrive.com/adrive/v2/file/createWithFolders', data=json.dumps(create_data), headers=self.headers, verify=False ) - requests_post_json = create_post.json() - self.check_auth(requests_post_json, lambda: self.create_folder(folder_name, parent_folder_id)) + + requests_post_json = requests_post.json() + if not self.check_auth(requests_post_json): + return self.create_folder(folder_name, parent_folder_id) return requests_post_json.get('file_id') def get_parent_folder_id(self, filepath): @@ -264,7 +273,7 @@ def get_parent_folder_id(self, filepath): parent_folder_name = os.sep if len(filepath_split) > 0: for folder in filepath_split: - if folder == '': + if folder in ['', 'root']: continue parent_folder_id = self.create_folder(folder, parent_folder_id) parent_folder_name = parent_folder_name.rstrip(os.sep) + os.sep + folder @@ -302,19 +311,21 @@ def recycle(self, file_id): verify=False ) requests_post_json = requests_post.json() - self.check_auth(requests_post_json, lambda: self.recycle(file_id)) + if not self.check_auth(requests_post_json): + return self.recycle(file_id) return True - def check_auth(self, response_json, func): - if 'code' in response_json: - self.print(response_json, 'error') + def check_auth(self, response_json): if response_json.get('code') == 'AccessTokenInvalid': self.print('AccessToken已失效,尝试刷新AccessToken中', 'info') if self.token_refresh(): - self.print('AccessToken刷新成功,返回创建上传任务', 'info') - return func() + self.print('AccessToken刷新成功,准备返回', 'info') + return False self.print('无法刷新AccessToken,准备退出', 'error') - sys.exit() + if 'code' in response_json.keys(): + self.print(response_json, 'error') + common.suicide() + return True def print(self, message, print_type): func = 'print_' + print_type diff --git a/Client.py b/Client.py index 4154936..defc1ee 100644 --- a/Client.py +++ b/Client.py @@ -6,15 +6,16 @@ # +------------------------------------------------------------------- # 配置信息 + import json import os import sqlite3 import sys from AliyunDrive import AliyunDrive -from common import get_running_path, print_error, get_config, DATA, get_config_file_path, qualify_path, \ - get_all_file_relative, print_success, LOCK, get_db_file_path, save_task, get_timestamp, print_warn, date, \ - print_info +from common import get_running_path, get_config, DATA, get_config_file_path, qualify_path, \ + get_all_file_relative, LOCK, get_db_file_path, save_task, get_timestamp, date, suicide +import common class Client(): @@ -29,8 +30,8 @@ def __upload(self, drive): drive.upload() except Exception as e: status = False - for index in range(DATA['config']['RETRY']): - print_warn('【%s】正在尝试第%d次重试!' % (drive.filename, index), drive.id) + for index in range(int(DATA['config']['RETRY'])): + self.print('【%s】正在尝试第%d次重试!' % (drive.filename, index), 'warn', drive.id) if drive.upload(): status = True break @@ -59,16 +60,16 @@ def init_config(self): "RESIDENT": False, } if not os.path.exists(get_config_file_path()): - print_error('请配置好config.json后重试') + self.print('请配置好config.json后重试', 'error') with open(get_config_file_path(), 'w') as f: f.write(json.dumps(config)) - sys.exit() + suicide() try: config.update(get_config()) DATA['config'] = config except Exception as e: - print_error('请配置好config.json后重试') + self.print('请配置好config.json后重试', 'error') raise e def init_command_line_parameter(self): @@ -136,6 +137,7 @@ def init_database(self): constraint task_log_pk primary key autoincrement, task_id INTEGER, + log_level TEXT default 'info' not null, content TEXT default '' not null, create_time INTEGER default 0 not null );''') @@ -148,7 +150,7 @@ def upload_file(self, task): # 加载任务队列 drive.load_task(task) # 刷新token - drive.token_refresh() + # drive.token_refresh() drive.load_file(task['filepath'], task['realpath']) # 创建目录 LOCK.acquire() @@ -173,7 +175,9 @@ def upload_file(self, task): if 'rapid_upload' in create_post_json and create_post_json['rapid_upload']: drive.finish_time = get_timestamp() drive.spend_time = drive.finish_time - drive.start_time - print_success('【{filename}】秒传成功!消耗{s}秒'.format(filename=drive.filename, s=drive.spend_time), drive.id) + + self.print('【{filename}】秒传成功!消耗{s}秒'.format(filename=drive.filename, s=drive.spend_time), 'success', + drive.id) drive.status = 1 return drive # 上传 @@ -196,9 +200,13 @@ def save_task(self, task): "part_number", "chunk_size", ] + data = {} for v in tmp: data[v] = task.__getattribute__(v) + if data[v] is None: + data[v] = '' + return save_task(task_id, data) def print_config_info(self): @@ -211,4 +219,8 @@ def print_config_info(self): 当前时间:%s%s ================================================= ''' % (date(get_timestamp()), s) - print_info(content) + self.print(content, 'info') + + def print(self, message, print_type, id=0): + func = 'print_' + print_type + return getattr(common, func)(message, id) diff --git a/common.py b/common.py index 2c12140..e290192 100644 --- a/common.py +++ b/common.py @@ -16,7 +16,10 @@ from sqlite import sqlite LOCK = threading.Lock() +LOCK_TOKEN_REFRESH = threading.Lock() DATA = { + 'access_token': '', + 'time_period': 600, 'config': {}, 'folder_id_dict': {}, 'task_template': { @@ -36,6 +39,11 @@ } + +def suicide(): + os._exit(1) + + # 处理路径 def qualify_path(path): if not path: @@ -142,24 +150,28 @@ def get_all_file_relative(path): def print_info(message, id=None): + message = message.__str__() # i = random.randint(34, 37) i = 36 - log(message, id) + log(message, id, 'info') print('\033[7;30;{i}m{message}\033[0m'.format(message=message, i=i)) def print_warn(message, id=None): - log(message, id) + message = message.__str__() + log(message, id, 'warn') print('\033[7;30;33m{message}\033[0m'.format(message=message)) def print_error(message, id=None): - log(message, id) + message = message.__str__() + log(message, id, 'error') print('\033[7;30;31m{message}\033[0m'.format(message=message)) def print_success(message, id=None): - log(message, id) + message = message.__str__() + log(message, id, 'success') print('\033[7;30;32m{message}\033[0m'.format(message=message)) @@ -173,12 +185,13 @@ def get_timestamp(): return int(time.time()) -def log(message, id=None): +def log(message, id=None, log_level='info'): if not id is None: db = get_db() idata = { 'task_id': id, 'content': message, + 'log_level': log_level, 'create_time': get_timestamp(), } db.table('task_log').insert(idata) diff --git a/main.py b/main.py index 927caea..751dd95 100644 --- a/main.py +++ b/main.py @@ -7,15 +7,16 @@ # +------------------------------------------------------------------- import os -import sys import time -from concurrent.futures import ThreadPoolExecutor, ALL_COMPLETED, wait +from concurrent.futures import ThreadPoolExecutor +from AliyunDrive import AliyunDrive from Client import Client -from common import DATA, print_error, get_db, get_timestamp, print_info, load_task, create_task +from common import DATA, print_error, get_db, get_timestamp, print_info, load_task, create_task, suicide if __name__ != '__main__': - sys.exit() + suicide() + client = Client() # 配置信息初始化 @@ -53,26 +54,47 @@ def thread(task): def distribute_thread(tasks): - if not DATA['config']['MULTITHREADING'] or DATA['config']['MAX_WORKERS'] <= 0: + if not DATA['config']['MULTITHREADING'] or int(DATA['config']['MAX_WORKERS']) <= 0: for task in tasks: thread(task) else: with ThreadPoolExecutor(max_workers=int(DATA['config']['MAX_WORKERS'])) as executor: - future_list = [] for task in tasks: # 提交线程 - future = executor.submit(thread, task) - future_list.append(future) + executor.submit(thread, task) + + +# 定时任务 +def crontab(): + def crontab_tasks(): + # 定时刷新token + (AliyunDrive(DATA['config']['DRIVE_ID'], DATA['config']['ROOT_PATH'], + DATA['config']['CHUNK_SIZE'])).token_refresh() + + time_period = DATA['time_period'] + crontab_tasks() + while True: + if time_period <= 0: + try: + crontab_tasks() + except Exception as e: + print_error(e.__str__()) + finally: + time_period = DATA['time_period'] + else: + time_period -= 1 + time.sleep(1) - wait(future_list, return_when=ALL_COMPLETED) +(ThreadPoolExecutor()).submit(crontab) while True: client.tasks = load_task() if len(client.tasks) <= 0: if not DATA['config']['RESIDENT']: - break + suicide() else: print_info('当前无任务,等待新的任务队列中', 0) time.sleep(5) distribute_thread(client.tasks) +