Skip to content

Commit

Permalink
update metadata watch
Browse files Browse the repository at this point in the history
  • Loading branch information
SAKURA-CAT committed Apr 5, 2024
1 parent 04cfcf2 commit aa25de1
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 112 deletions.
27 changes: 22 additions & 5 deletions swanlab/cloud/_log_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from typing import List
from .utils import ThreadUtil, ThreadTaskABC
import asyncio
from swanlab.log import swanlog
from .utils import LogQueue
from swanlab.error import UpLoadError


class LogCollectorTask(ThreadTaskABC):
Expand All @@ -28,20 +31,30 @@ class LogCollectorTask(ThreadTaskABC):
"""

def __init__(self):
self.container: List = []
self.container: List[LogQueue.MsgType] = []
"""
日志容器,存储从管道中获取的日志信息
"""
self.lock = False

async def upload(self):
"""
将收集到的所有上传事件统一触发,上传日志信息
所有的请求都是网络请求,因此需要异步处理,并且在此处统一
"""
tasks = [x() for x in self.container]
await asyncio.gather(*tasks)
# 假设上传时间为1秒
await asyncio.sleep(1)
tasks = [x[0].value['upload'](x[1]) for x in self.container]
results = await asyncio.gather(*tasks)
# 检查每一个结果
has_error = False
for index, result in enumerate(results):
# 如果出现问题
if isinstance(result, UpLoadError):
has_error = True
continue
elif isinstance(result, Exception):
has_error = True
swanlog.error(f"upload logs error: {result}, it might be a swanlab bug, data will be lost!")
continue

async def task(self, u: ThreadUtil, *args):
"""
Expand All @@ -52,9 +65,13 @@ async def task(self, u: ThreadUtil, *args):
self.container.extend(u.queue.get_all())
# print("线程" + u.name + "获取到的日志信息: ", self.container)
if u.timer.can_run(self.UPLOAD_TIME, len(self.container) == 0):
if self.lock:
return swanlog.debug("upload task still in progressing, passed")
self.lock = True
await self.upload()
# 清除容器内容
self.container.clear()
self.lock = False

async def callback(self, u: ThreadUtil, *args):
"""
Expand Down
82 changes: 31 additions & 51 deletions swanlab/cloud/dog/log_sniffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,84 +8,64 @@
日志嗅探器
嗅探不做上传操作,只做采集操作,将采集到的日志、差异信息发送给日志聚合器
"""
from ..utils import ThreadTaskABC, ThreadUtil, LogQueueABC
from ..utils import ThreadTaskABC, ThreadUtil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from watchdog.utils.dirsnapshot import DirectorySnapshot, DirectorySnapshotDiff
from typing import List, Tuple, Dict
from queue import Queue
from .sniffer_queue import SnifferQueue
from .metadata_handle import MetaHandle
from typing import List
from ..files_types import FileType
import asyncio

q = Queue()


class SnifferQueue(LogQueueABC):

def __init__(self, readable: bool = True, writable: bool = True):
super().__init__(q, readable, writable)


class LogSnifferHandler(FileSystemEventHandler):
"""
日志嗅探处理器,负责处理日志信息的改动,在使用中应该对应到每一个具体类型的文件夹的监听
由于日志信息的改动可能是十分频繁的,因此需要设置定时器,定时处理一段时间内的所有改动
处理方式应该由传入的LogType事先定义好的处理函数决定,这里应该只负责收集日志信息
Watchdog 默认情况下使用两个线程。一个线程用于监视文件系统事件,另一个线程用于处理这些事件并触发相应的回调函数
因此在这里不需要考虑多线程问题
"""

def __init__(self, watched_path: str):
"""
初始化日志嗅探处理器
:param watched_path: 监听的路径,用作初始对照
"""
self.watched_path = watched_path
self.queue = SnifferQueue(readable=False, writable=True)

def on_modified(self, event):
"""
在设计上所有文件都不会被删除,只会被修改,因此只需要处理文件修改事件而不需要管其他的
"""
if event.is_directory:
# 文件夹修改,不做处理
# print(f"Directory modified: {event.src_path}, watched directory: {self.watched_path}")
pass
else:
print(f"File modified: {event.src_path}, watched directory: {self.watched_path}")


class LogSnifferTask(ThreadTaskABC):
"""
日志嗅探器,负责监听日志信息的改动,当日志信息发生改动时将改动的部分包装发送给日志聚合器
"""
SNIFFER_TIMEOUT = 2
__QUEUE: Queue = Queue()

def __init__(self, observer_paths: List[Tuple[str,]]):
def __init__(self, meta_path: str):
"""
初始化日志嗅探器
:param observer_paths: 监听的路径和路径下对应的日志类型
:param meta_path: 元数据文件夹路径,由于目前只有元数据文件夹需要嗅探,因此只需要传入元数据文件夹路径
后续如果有其他需要嗅探的文件夹,可以将此处改成传入handle类
"""
self.__sniffer_queue = SnifferQueue(readable=True, writable=False)
self.__sniffer_queue = SnifferQueue(self.__QUEUE, readable=True, writable=False)
"""
日志嗅探器队列,用于存放从LogSnifferHandler中收集到的日志信息
日志嗅探器队列,用于存放从一系列Handler中收集到的日志信息
"""
self.__observer = Observer(timeout=self.SNIFFER_TIMEOUT)
for watched_path, in observer_paths:
self.__observer.schedule(LogSnifferHandler(watched_path),
watched_path,
recursive=True)
self.__observer.schedule(MetaHandle(self.__QUEUE, watched_path=meta_path),
meta_path,
recursive=True)
# observer,启动!
self.__observer.start()

async def callback(self, u: ThreadUtil, *args):
# 文件事件可能会有延迟,因此需要等待一段时间
# await asyncio.sleep(1.5)
await asyncio.sleep(1.5)
self.__observer.stop()
self.pass_msg(u)

def pass_msg(self, u: ThreadUtil):
all_sniffer_msg: List = self.__sniffer_queue.get_all()
if not all_sniffer_msg or len(all_sniffer_msg) == 0:
return
# 去重,由于现在只有files元数据文件,所以只需要针对它去重就行
# 遍历所有的消息
files = {FileType.FILE: []}
for msg in all_sniffer_msg:
for path in msg[0]:
if path not in files[FileType.FILE]:
files[FileType.FILE].append(path)
new_msg = (FileType.FILE, files[FileType.FILE])
u.queue.put(new_msg)

async def task(self, u: ThreadUtil, *args):
"""
任务执行函数,在此处收集处理的所有日志信息,解析、包装、发送给日志聚合器
:param u: 线程工具类
"""
# 在此处完成日志信息聚合
print("日志嗅探器开始执行")
self.pass_msg(u)
79 changes: 79 additions & 0 deletions swanlab/cloud/dog/metadata_handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
r"""
@DATE: 2024/4/5 18:20
@File: metadata_handle.py
@IDE: pycharm
@Description:
元数据处理器,看门狗嗅探元数据文件夹,向聚合器发送元数据信息
"""
from ..files_types import FileType
from .sniffer_queue import SnifferQueue
from typing import Union, List
from watchdog.events import FileSystemEventHandler, FileSystemEvent
from swanlab.log import swanlog
import os
from queue import Queue


class MetaHandle(FileSystemEventHandler):
ValidFiles = ['config.yaml', 'requirements.txt', 'swanlab-metadata.json']
"""
有效的元数据文件列表,只有这些文件会被传输,如果出现其他文件出现waning
"""

ModifiableFiles = [ValidFiles[0], ValidFiles[2]]
"""
可修改的元数据文件列表(其他只会传输一次)
"""

def __init__(self, queue: Queue, watched_path: str):
"""
初始化日志嗅探处理器
:param watched_path: 监听的路径,用作初始对照
"""
self.watched_path = watched_path
self.queue = SnifferQueue(queue, readable=False)
# 元数据文件夹快照
self.snapshot = None
# 延时处理器
self.timer = None
# 已上传的文件列表
self.uploaded_files = []
self.on_init_upload()

def list_all_meta_files(self) -> List[str]:
"""
列出所有的元数据文件
"""
files = [x for x in os.listdir(self.watched_path) if os.path.isfile(self.fmt_file_path(x)[0])]
return [x for x in files if x in self.ValidFiles]

def fmt_file_path(self, file_name: Union[List[str], str]) -> List[str]:
"""
格式化文件路径
"""
if isinstance(file_name, str):
file_name = [file_name]
return [os.path.join(self.watched_path, x) for x in file_name]

def on_init_upload(self):
"""
实例化的时候进行一次文件扫描,watched_path下所有ValidFiles生成一个一个msg发给队列
"""
meta_files = self.list_all_meta_files()
if len(meta_files) == 0:
return swanlog.warning("empty meta files, it might be a bug?")
self.queue.put((self.fmt_file_path(meta_files), FileType.FILE))

def on_modified(self, event: FileSystemEvent) -> None:
"""
文件被修改时触发
"""
if event.is_directory:
return
file_name = os.path.basename(event.src_path)
if file_name not in self.ModifiableFiles:
# 被忽略
return swanlog.warning(f"file {file_name} is not allowed to be modified")
self.queue.put((self.fmt_file_path(file_name), FileType.FILE))
46 changes: 46 additions & 0 deletions swanlab/cloud/dog/sniffer_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
r"""
@DATE: 2024/4/5 21:23
@File: sniffer_queue.py
@IDE: pycharm
@Description:
嗅探器队列,负责收集所有嗅探线程注册的日志信息
"""
from typing import List
from queue import Queue
from ..utils import LogQueue


class SnifferQueue(LogQueue):

def __init__(self, queue: Queue, readable: bool = True, writable: bool = True):
super().__init__(queue, readable, writable)

def put(self, msg: LogQueue.MsgType):
"""
向管道中写入日志信息,日志信息必须是函数,聚合器会依次执行他们
:param msg: 日志信息
"""
super().put(msg)

def get(self) -> LogQueue.MsgType:
"""
从管道中读取日志信息
:return: 日志信息
"""
return super().get()

def get_all(self) -> List[LogQueue.MsgType]:
"""
从管道中读取所有的日志信息
:return: 日志信息
"""
return super().get_all()

def put_all(self, msgs: List[LogQueue.MsgType]):
"""
向管道中写入所有日志信息
:param msgs: 日志信息
"""
super().put_all(msgs)
Loading

0 comments on commit aa25de1

Please sign in to comment.