In [3]:
from abc import ABC, abstractmethod
import os
import logging
import time
from typing import List
import redis
from tradingday.tradingday import TradingDays


logger = logging.getLogger(__name__)
logging.basicConfig(
    level=os.environ.get("LOGLEVEL", "INFO"),
    format="%(levelname)-3s %(name)s: %(message)s",
)


class PubSubServiceBase(ABC):
    def __init__(
        self,
        host="localhost",
        port=6379,
        db=0,
        sub_streams: List[str] = None,
        pub_stream_by_default: str = None,
        group: str = "cg",
    ):
        logger.info("\n[{}] try init redis. ".format(self.__class__.__name__))
        self.db_ = redis.Redis(host=host, port=port, db=db)

        # ensure sub_streams exitst
        for stream_obj in sub_streams:
            if not self.db_.exists(stream_obj):
                msg_id = self.db_.xadd(stream_obj, {"": ""}, id=b"0-1")
                self.db_.xdel(stream_obj, msg_id)

            # create
            try:
                self.db_.xgroup_create(
                    name=stream_obj, groupname=group, id="0-0", mkstream=False
                )
            except redis.exceptions.ResponseError as exc:
                if not exc.args[0].startswith("BUSYGROUP"):
                    raise

        logger.info(
            "\n[{}] init redis ok. group is {}\n[{}] sub stream(s) are {}\n[{}] pub stream(s) are [{}]".format(
                self.__class__.__name__,
                group,
                self.__class__.__name__,
                sub_streams,
                self.__class__.__name__,
                pub_stream_by_default,
            )
        )

        self.group_ = group
        self.consumer_name_ = group + ".tosaka"
        # self.cg_.set_id() 默认行为F是从最后一次deliver的地方开始读，
        # 这也是我们想要的，只要把每次pending的拿来消费就可以保证消息不丢失了

        self.pub_stream_by_default_ = pub_stream_by_default
        self.sub_streams_ = sub_streams

        self.stop_flag_ = False

    @abstractmethod
    def consume_message(self, stream, msg_id, content):
        """
        定义如何消费一个监听到的消息(来自于 self.sub_streams), 消费顺利的话需要 ack 这个消息
        默认是打印这个消息, 子类需要自定义如何消费

        stream : source of message
        msg_id : a message id(at least for ack)
        content: content of the message

        """

    def ack_message(self, stream, msg_id):
        """
        处理完的消息都应该被及时 ack, 否则该 service 会认为该条消息没有消费成功，在 service 下一次
        启动时, 可能会检查 pending 列表，再次消费该条消息

        stream : source of message
        msg_id : a message id(used to ack)

        """
        self.db_.xack(stream, self.group_, msg_id)

    def push_message(self, msg: dict, stream: str = None):
        self.db_.xadd(
            stream or self.pub_stream_by_default_,
            msg,
            id="*",
            maxlen=None,
            approximate=True,
        )

    def resolve_pending(self):
        # Check if has unack message
        # for stream_obj in self.sub_streams_:
        for stream_obj in self.sub_streams_:
            pending_list = self.db_.xpending_range(
                stream_obj,
                self.group_,
                min="-",
                max="+",
                count=10000000,
                consumername=self.consumer_name_,
            )

            logger.info(
                "\n[{}] pending list <{}> size is {}".format(
                    self.__class__.__name__, stream_obj, len(pending_list)
                )
            )
            for pending_obj in pending_list:
                msg_id = pending_obj["message_id"]
                msg_list = self.db_.xrange(
                    name=stream_obj, min=msg_id, max=msg_id, count=1000000
                )
                msg_id, content = msg_list[0]
                msg_id = str(msg_id, "utf-8")
                content = {
                    str(key, "utf-8"): str(value, "utf-8")
                    for key, value in content.items()
                }
                self.consume_message(stream_obj, msg_id, content)

    def try_read(self, block_time=20):
        # stream_items is a list of [stream_name, message_list]
        stream_items = self.db_.xreadgroup(
            self.group_,
            self.consumer_name_,
            {i: ">" for i in self.sub_streams_},
            block=block_time,
            count=100000000,
        )
        if len(stream_items) == 0:
            return
        # item is a list of stream_name and message_list
        # item has only two elements
        for item in stream_items:
            stream_name = item[0]
            msg_list = item[1]
            for msg in msg_list:
                msg_id, content = msg
                msg_id = str(msg_id, "utf-8")
                content = {
                    str(key, "utf-8"): str(value, "utf-8")
                    for key, value in content.items()
                }
                self.consume_message(stream_name, msg_id, content)

    def listen(self, resolve_pending: bool = True):
        if resolve_pending:
            self.resolve_pending()

        while not self.stop_flag_:
            try:
                self.try_read(block_time=100)
            except redis.exceptions.ConnectionError as conn_error:
                logger.error(
                    f"[{self.__class__.__name__}] Redis ConnectionError {conn_error}."
                )
                self.reconnect()

    def stop(self):
        self.stop_flag_ = True

    def reconnect(self):
        stop_retry_flag = False
        while not stop_retry_flag:
            try:
                _ = self.db_.exists(self.sub_streams_[0])
                stop_retry_flag = True
                logger.info(f"[{self.__class__.__name__}] Redis reconnect success.")
            except Exception as e:
                logger.error(
                    f"[{self.__class__.__name__}] Redis reconnect error {e} Retry in 5s."
                )
                time.sleep(5)


ModuleNotFoundError: No module named 'tradingday'

In [10]:
class Fuker(PubSubServiceBase):
    def __init__(
        self,
        host="localhost",
        port=6379,
        db=1,
        sub_stream: str = "",
        pub_stream: str = "",
        group: str = "Gourp_Demo",
        table_map={
            "JsyMinBarArcImporter": "jsy.MinBar",
        },
        root: str = "/test_root",
    ):
        super().__init__(
            host=host,
            port=port,
            db=db,
            sub_streams = sub_stream,
            pub_stream_by_default=pub_stream,
            group=group,
        )

        self.table_map = table_map
        self.root = root

    def consume_message(self, stream, msg_id, content):
        pass



In [46]:
god = Fuker(host = "bishop", port = 6379, db = 6, sub_stream = "FilesOfAccounting", pub_stream = "FilesOfAccounting", group = "GroupJupyter")

INFO __main__: 
[Fuker] try init redis. 
INFO __main__: 
[Fuker] init redis ok. group is GroupJupyter
[Fuker] sub stream(s) are FilesOfAccounting
[Fuker] pub stream(s) are [FilesOfAccounting]


In [51]:
from pathlib import Path
books = ["kunpeng1"]
ids = {"dingli1":"zz_dingli1", "jinglun1":"zx_jinglun1", "jinglun2":"zx_jinglun2", "jinglun3":"zx_jinglun3", "kunpeng1":"lh_kunpeng1"}
for book in books:
    root_path = Path(f"/var/lib/wonder/warehouse/mail/{book}")
    print(root_path)
    for child in root_path.iterdir():
        message = {"date":f"{child.name}","book_id":ids[book],"file_path":(root_path / child / "qifei").as_posix()}
        print(message)
        god.push_message(message)

/var/lib/wonder/warehouse/mail/kunpeng1
{'date': '20220614', 'book_id': 'lh_kunpeng1', 'file_path': '/var/lib/wonder/warehouse/mail/kunpeng1/20220614/qifei'}
{'date': '20220523', 'book_id': 'lh_kunpeng1', 'file_path': '/var/lib/wonder/warehouse/mail/kunpeng1/20220523/qifei'}
{'date': '20220525', 'book_id': 'lh_kunpeng1', 'file_path': '/var/lib/wonder/warehouse/mail/kunpeng1/20220525/qifei'}
{'date': '20220527', 'book_id': 'lh_kunpeng1', 'file_path': '/var/lib/wonder/warehouse/mail/kunpeng1/20220527/qifei'}
{'date': '20220621', 'book_id': 'lh_kunpeng1', 'file_path': '/var/lib/wonder/warehouse/mail/kunpeng1/20220621/qifei'}
{'date': '20220630', 'book_id': 'lh_kunpeng1', 'file_path': '/var/lib/wonder/warehouse/mail/kunpeng1/20220630/qifei'}
{'date': '20220715', 'book_id': 'lh_kunpeng1', 'file_path': '/var/lib/wonder/warehouse/mail/kunpeng1/20220715/qifei'}
{'date': '20220622', 'book_id': 'lh_kunpeng1', 'file_path': '/var/lib/wonder/warehouse/mail/kunpeng1/20220622/qifei'}
{'date': '202207