# 第一章 Bar合成

In [13]:
import pandas as pd

m1_bar = pd.read_csv('./RB2310.csv')
m1_bar


Unnamed: 0,Time,Open,High,Low,Close,Vol
0,2022/11/4 10:12,3389.0,3390.0,3389.0,3390.0,9.0
1,2022/11/4 10:13,3390.0,3393.0,3390.0,3393.0,11.0
2,2022/11/4 10:14,3393.0,3393.0,3393.0,3393.0,4.0
3,2022/11/4 10:15,3393.0,3393.0,3391.0,3391.0,6.0
4,2022/11/4 10:31,3392.0,3396.0,3392.0,3396.0,58.0
...,...,...,...,...,...,...
43240,2023/5/12 22:56,3665.0,3666.0,3663.0,3664.0,2273.0
43241,2023/5/12 22:57,3664.0,3666.0,3663.0,3666.0,2561.0
43242,2023/5/12 22:58,3666.0,3673.0,3664.0,3672.0,13437.0
43243,2023/5/12 22:59,3672.0,3677.0,3671.0,3672.0,13143.0


In [14]:
from datetime import time
# rb2310期货品种的开盘时间
futures_dict = {
    'morning_start': time(9,0),
    'morning_end': time(11,30),
    'afternoon_start': time(13,30),
    'afternoon_end': time(15,0),
    'night_start': time(21,0),
    'ningt_end': time(23,0),
    'day_break': time(15,0),
}



TypeError: 'module' object is not callable

In [None]:
from enum import Enum
class Exchange(Enum):
    """
    Exchange.
    """
    # Chinese
    CFFEX = "CFFEX"         # China Financial Futures Exchange
    SHFE = "SHFE"           # Shanghai Futures Exchange
    CZCE = "CZCE"           # Zhengzhou Commodity Exchange
    DCE = "DCE"             # Dalian Commodity Exchange
    INE = "INE"             # Shanghai International Energy Exchange
    GFEX = "GFEX"           # Guangzhou Futures Exchange
    SSE = "SSE"             # Shanghai Stock Exchange
    SZSE = "SZSE"           # Shenzhen Stock Exchange
    BSE = "BSE"             # Beijing Stock Exchange
    SHHK = "SHHK"           # Shanghai-HK Stock Connect
    SZHK = "SZHK"           # Shenzhen-HK Stock Connect
    SGE = "SGE"             # Shanghai Gold Exchange
    WXE = "WXE"             # Wuxi Steel Exchange
    CFETS = "CFETS"         # CFETS Bond Market Maker Trading System
    XBOND = "XBOND"         # CFETS X-Bond Anonymous Trading System

    # Global
    SMART = "SMART"         # Smart Router for US stocks
    NYSE = "NYSE"           # New York Stock Exchnage
    NASDAQ = "NASDAQ"       # Nasdaq Exchange
    ARCA = "ARCA"           # ARCA Exchange
    EDGEA = "EDGEA"         # Direct Edge Exchange
    ISLAND = "ISLAND"       # Nasdaq Island ECN
    BATS = "BATS"           # Bats Global Markets
    IEX = "IEX"             # The Investors Exchange
    AMEX = "AMEX"           # American Stock Exchange
    TSE = "TSE"             # Toronto Stock Exchange
    NYMEX = "NYMEX"         # New York Mercantile Exchange
    COMEX = "COMEX"         # COMEX of CME
    GLOBEX = "GLOBEX"       # Globex of CME
    IDEALPRO = "IDEALPRO"   # Forex ECN of Interactive Brokers
    CME = "CME"             # Chicago Mercantile Exchange
    ICE = "ICE"             # Intercontinental Exchange
    SEHK = "SEHK"           # Stock Exchange of Hong Kong
    HKFE = "HKFE"           # Hong Kong Futures Exchange
    SGX = "SGX"             # Singapore Global Exchange
    CBOT = "CBT"            # Chicago Board of Trade
    CBOE = "CBOE"           # Chicago Board Options Exchange
    CFE = "CFE"             # CBOE Futures Exchange
    DME = "DME"             # Dubai Mercantile Exchange
    EUREX = "EUX"           # Eurex Exchange
    APEX = "APEX"           # Asia Pacific Exchange
    LME = "LME"             # London Metal Exchange
    BMD = "BMD"             # Bursa Malaysia Derivatives
    TOCOM = "TOCOM"         # Tokyo Commodity Exchange
    EUNX = "EUNX"           # Euronext Exchange
    KRX = "KRX"             # Korean Exchange
    OTC = "OTC"             # OTC Product (Forex/CFD/Pink Sheet Equity)
    IBKRATS = "IBKRATS"     # Paper Trading Exchange of IB

    # Special Function
    LOCAL = "LOCAL"         # For local generated data

class Interval(Enum):
    """
    Interval of bar data.
    """
    MINUTE = "1m"
    HOUR = "1h"
    DAILY = "d"
    WEEKLY = "w"
    TICK = "tick"

In [None]:
"""
Basic data structure used for general trading function in the trading platform.
"""

from dataclasses import dataclass, field
from datetime import datetime
from logging import INFO
#from .constant import Exchange, Interval

@dataclass
class BaseData:
    """
    Any data object needs a gateway_name as source
    and should inherit base data.
    """

    gateway_name: str

    extra: dict = field(default=None, init=False)


@dataclass
class TickData(BaseData):
    """
    Tick data contains information about:
        * last trade in market
        * orderbook snapshot
        * intraday market statistics.
    """

    symbol: str
    exchange: Exchange
    datetime: datetime

    name: str = ""
    volume: float = 0
    turnover: float = 0
    open_interest: float = 0
    last_price: float = 0
    last_volume: float = 0
    limit_up: float = 0
    limit_down: float = 0

    open_price: float = 0
    high_price: float = 0
    low_price: float = 0
    pre_close: float = 0

    bid_price_1: float = 0
    bid_price_2: float = 0
    bid_price_3: float = 0
    bid_price_4: float = 0
    bid_price_5: float = 0

    ask_price_1: float = 0
    ask_price_2: float = 0
    ask_price_3: float = 0
    ask_price_4: float = 0
    ask_price_5: float = 0

    bid_volume_1: float = 0
    bid_volume_2: float = 0
    bid_volume_3: float = 0
    bid_volume_4: float = 0
    bid_volume_5: float = 0

    ask_volume_1: float = 0
    ask_volume_2: float = 0
    ask_volume_3: float = 0
    ask_volume_4: float = 0
    ask_volume_5: float = 0

    localtime: datetime = None

    def __post_init__(self) -> None:
        """"""
        self.vt_symbol: str = f"{self.symbol}.{self.exchange.value}"


@dataclass
class BarData(BaseData):
    """
    Candlestick bar data of a certain trading period.
    """

    symbol: str
    exchange: Exchange
    datetime: datetime

    interval: Interval = None
    volume: float = 0
    turnover: float = 0
    open_interest: float = 0
    open_price: float = 0
    high_price: float = 0
    low_price: float = 0
    close_price: float = 0

    def __post_init__(self) -> None:
        """"""
        self.vt_symbol: str = f"{self.symbol}.{self.exchange.value}"



In [None]:
"""
General utility functions.
"""

from datetime import datetime
from typing import Callable, Dict, Tuple, Union, Optional

#from .object import BarData, TickData
#from .constant import Exchange, Interval

class BarGenerator:
    """
    For:
    1. generating 1 minute bar data from tick data
    2. generating x minute bar/x hour bar data from 1 minute data
    Notice:
    1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
    2. for x hour bar, x can be any number
    """

    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE
    ) -> None:
        """Constructor"""
        self.bar: BarData = None
        self.on_bar: Callable = on_bar

        self.interval: Interval = interval
        self.interval_count: int = 0

        self.hour_bar: BarData = None

        self.window: int = window
        self.window_bar: BarData = None
        self.on_window_bar: Callable = on_window_bar

        self.last_tick: TickData = None

    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute: bool = False

        # Filter tick data with 0 last price
        if not tick.last_price:
            return

        # Filter tick data with older timestamp
        if self.last_tick and tick.datetime < self.last_tick.datetime:
            return

        if not self.bar:
            new_minute = True
        elif (
            (self.bar.datetime.minute != tick.datetime.minute)
            or (self.bar.datetime.hour != tick.datetime.hour)
        ):
            self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
            )
            self.on_bar(self.bar)

            new_minute = True

        if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            if tick.high_price > self.last_tick.high_price:
                self.bar.high_price = max(self.bar.high_price, tick.high_price)

            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            if tick.low_price < self.last_tick.low_price:
                self.bar.low_price = min(self.bar.low_price, tick.low_price)

            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        if self.last_tick:
            volume_change: float = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)

            turnover_change: float = tick.turnover - self.last_tick.turnover
            self.bar.turnover += max(turnover_change, 0)

        self.last_tick = tick

    def update_bar(self, bar: BarData) -> None:
        """
        Update 1 minute bar into generator
        """
        if self.interval == Interval.MINUTE:
            self.update_bar_minute_window(bar)
        else:
            self.update_bar_hour_window(bar)

    def update_bar_minute_window(self, bar: BarData) -> None:
        """"""
        # If not inited, create window bar object
        if not self.window_bar:
            dt: datetime = bar.datetime.replace(second=0, microsecond=0)
            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price
            )
        # Otherwise, update high/low price into window bar
        else:
            self.window_bar.high_price = max(
                self.window_bar.high_price,
                bar.high_price
            )
            self.window_bar.low_price = min(
                self.window_bar.low_price,
                bar.low_price
            )

        # Update close price/volume/turnover into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += bar.volume
        self.window_bar.turnover += bar.turnover
        self.window_bar.open_interest = bar.open_interest

        # Check if window bar completed
        if not (bar.datetime.minute + 1) % self.window:
            self.on_window_bar(self.window_bar)
            self.window_bar = None

    def update_bar_hour_window(self, bar: BarData) -> None:
        """"""
        # If not inited, create window bar object
        if not self.hour_bar:
            dt: datetime = bar.datetime.replace(minute=0, second=0, microsecond=0)
            self.hour_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
                close_price=bar.close_price,
                volume=bar.volume,
                turnover=bar.turnover,
                open_interest=bar.open_interest
            )
            return

        finished_bar: BarData = None

        # If minute is 59, update minute bar into window bar and push
        if bar.datetime.minute == 59:
            self.hour_bar.high_price = max(
                self.hour_bar.high_price,
                bar.high_price
            )
            self.hour_bar.low_price = min(
                self.hour_bar.low_price,
                bar.low_price
            )

            self.hour_bar.close_price = bar.close_price
            self.hour_bar.volume += bar.volume
            self.hour_bar.turnover += bar.turnover
            self.hour_bar.open_interest = bar.open_interest

            finished_bar = self.hour_bar
            self.hour_bar = None

        # If minute bar of new hour, then push existing window bar
        elif bar.datetime.hour != self.hour_bar.datetime.hour:
            finished_bar = self.hour_bar

            dt: datetime = bar.datetime.replace(minute=0, second=0, microsecond=0)
            self.hour_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
                close_price=bar.close_price,
                volume=bar.volume,
                turnover=bar.turnover,
                open_interest=bar.open_interest
            )
        # Otherwise only update minute bar
        else:
            self.hour_bar.high_price = max(
                self.hour_bar.high_price,
                bar.high_price
            )
            self.hour_bar.low_price = min(
                self.hour_bar.low_price,
                bar.low_price
            )

            self.hour_bar.close_price = bar.close_price
            self.hour_bar.volume += bar.volume
            self.hour_bar.turnover += bar.turnover
            self.hour_bar.open_interest = bar.open_interest

        # Push finished window bar
        if finished_bar:
            self.on_hour_bar(finished_bar)

    def on_hour_bar(self, bar: BarData) -> None:
        """"""
        if self.window == 1:
            self.on_window_bar(bar)
        else:
            if not self.window_bar:
                self.window_bar = BarData(
                    symbol=bar.symbol,
                    exchange=bar.exchange,
                    datetime=bar.datetime,
                    gateway_name=bar.gateway_name,
                    open_price=bar.open_price,
                    high_price=bar.high_price,
                    low_price=bar.low_price
                )
            else:
                self.window_bar.high_price = max(
                    self.window_bar.high_price,
                    bar.high_price
                )
                self.window_bar.low_price = min(
                    self.window_bar.low_price,
                    bar.low_price
                )

            self.window_bar.close_price = bar.close_price
            self.window_bar.volume += bar.volume
            self.window_bar.turnover += bar.turnover
            self.window_bar.open_interest = bar.open_interest

            self.interval_count += 1
            if not self.interval_count % self.window:
                self.interval_count = 0
                self.on_window_bar(self.window_bar)
                self.window_bar = None

    def generate(self) -> Optional[BarData]:
        """
        Generate the bar data and call callback immediately.
        """
        bar: BarData = self.bar

        if self.bar:
            bar.datetime = bar.datetime.replace(second=0, microsecond=0)
            self.on_bar(bar)

        self.bar = None
        return bar


# 如何确定一个Tick属于哪一根K线

**合约的交易时段与K线周期的关系**

![bar_gen_1](./asset/bar_gen_1.png)

LD：合约上市日期（Listed date）

LTD：最新交易日（Lastest trade date）

ULD：合约最后交易日（Unlisted date）

TTPD：每日交易时间（Trade time per day）

KW：K线周期（Kindle witdth）

STTD：交易日开始时间（从LD起算）

STTK：K线日开始时间（从LD起算）

Days = LTD到LD之间到交易日数（去掉节假日）

STTD = TTPD×Days

**每日交易时间TTPD的计算**
不同合约的交易时间段可能是不一样的，它们可以利用rqdatac的instruments(symbol)函数读取，返回结果中包含 trading_hours字段。如：

RB2010交易时间段：

trading_hours='21:01-23:00,09:01-10:15,10:31-11:30,13:31-15:00'

AG2012交易时间段：

trading_hours='21:01-02:30,09:01-10:15,10:31-11:30,13:31-15:00'

MA2010交易时间段：

trading_hours='21:01-23:00,09:01-10:15,10:31-11:30,13:31-15:00'

trading_hours的形式：start1-stop1,start2-stop2，...,startN-stopN

![bar_gen_2](./asset/bar_gen_2.png)
注意把start和stop的单位换算为日内的分钟数量

**计算Tick所在K线的索引**
已知tick中包含时间datetime，K线的周期为KW，那么从上市日算起的各个时间：

利用tick.datetime求出所在交易日，从而求出STTD

TickTD：tick在交易日内的分钟数

TickLD：tick从上市开始的分钟数=STTD+TickTD

IndexK：tick所在K线的所有=int(TickLD/KW)

STTK：K线开始时间=IndexK*KW

# rqdatac可以提供的功能

**获取合约信息**
```python
rq.instruments(symbol)
```

返回值：
```python
Instrument(order_book_id='RB2010', symbol='螺纹钢2010', round_lot=1.0, contract_multiplier=10.0, underlying_order_book_id='null', underlying_symbol='RB', maturity_date='2020-10-15', type='Future', exchange='SHFE', listed_date='2019-10-16', de_listed_date='2020-10-15', margin_rate=0.09, trading_hours='21:01-23:00,09:01-10:15,10:31-11:30,13:31-15:00', market_tplus=0, industry_name='焦煤钢矿', product='Commodity')
```
其中：
```
listed_date——上市日期
de_listed_date——交割日期
trading_hours——交易时间段
```

**求最新交易日**
```python
 rq.get_latest_trading_date()
 ```

**求两个日期之间的交易日**
```python
rq.get_trading_dates(listed_date,last_trade_date)
```

**求下n个交易日**
```python
rq.get_next_trading_date(listed_date,n=days)
```

**获取交易时间段**
```python
rq.get_trading_hours(symbol,frequency='tick',expected_fmt='str')
rq.get_trading_hours(symbol,frequency='1m',expected_fmt='datetime')
rq.get_trading_hours(symbol,frequency='1m',expected_fmt='time')
```


**示例代码**
```python
from typing import Callable,List,Dict, Tuple, Union

import copy
import numpy as np
import talib
import vnpy.usertools.mylib as mylib

from vnpy.app.cta_strategy import (
    BarGenerator,
    ArrayManager
)

from vnpy.trader.object import (
    BarData,
    TickData,
    TradeData
)

#from vnpy.trader.constant import Interval
from enum import Enum

import datetime
import rqdatac as rq
from rqdatac.utils import to_date

import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")


''' 获得上市日期 '''
def get_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.listed_date)

''' 获得交割日期 '''
def get_de_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.de_listed_date)


class Timeunit(Enum):
    """ 时间单位 """
    SECOND = '1s'
    MINUTE = '1m'
    HOUR = '1h'


class TradeHours(object):
    """ 合约交易时间段 """
    def __init__(self,symbol:str):
        self.symbol = symbol
        self.init()

    def init(self):
        """ 初始化交易日字典及交易时间段数据列表 """
        self.listed_date = get_listed_date(self.symbol)
        self.de_listed_date = get_de_listed_date(self.symbol)

        self.trade_date_index = {}   # 合约的交易日索引字典
        self.trade_index_date = {}   # 交易天数与交易日字典

        trade_dates = rq.get_trading_dates(self.listed_date,self.de_listed_date) # 合约的所有的交易日
        days = 0
        for td in trade_dates:
            self.trade_date_index[td] = days
            self.trade_index_date[days] = td
            days += 1

        trading_hours = rq.get_trading_hours(self.symbol,date=self.listed_date,frequency='tick',expected_fmt='datetime')

        self.time_dn_pairs = self._get_trading_times_dn(trading_hours)

        trading_hours0 = [(start.replace(tzinfo=CHINA_TZ),stop.replace(tzinfo=CHINA_TZ)) for start,stop in trading_hours]
        self.trade_date_index[self.listed_date] = (0,trading_hours0)
        for day in range(1,days):
            td = self.trade_index_date[day]
            trade_datetimes = []
            for (start,dn1),(stop,dn2) in self.time_dn_pairs:
                #start:开始时间,dn1：相对交易日前推天数,
                #stop :开始时间,dn2：相对开始时间后推天数
                d = self.trade_index_date[day+dn1]
                trade_datetimes.append((
                    datetime.datetime.combine(d,start,tzinfo=CHINA_TZ),
                    datetime.datetime.combine(d,stop,tzinfo=CHINA_TZ)+datetime.timedelta(days=dn2)))
            self.trade_date_index[td] = (day,trade_datetimes)

    def _get_trading_times_dn(self,trading_hours:List[Tuple[datetime.datetime,datetime.datetime]]):
        """
        交易时间跨天处理，不推荐外部使用 。
        产生的结果：[((start1,dn11),(stop1,dn21)),((start2,dn12),(stop2,dn22)),...,((startN,dn1N),(stopN,dn2N))]
        其中：
            startN:开始时间,dn1N：相对交易日前推天数,
            stopN：开始时间,dn2N：相对开始时间后推天数
        """
        ilen = len(trading_hours)
        if ilen == 0:
            return []
        start_stops = []
        for start,stop in trading_hours:
            start_stops.insert(0,(start.time(),stop.time()))

        pre_start,pre_stop = start_stops[0]
        dn1 = 0
        dn2 = 1 if pre_start > pre_stop else 0
        time_dn_pairs = [((pre_start,dn1),(pre_stop,dn2))]
        for start,stop in start_stops[1:]:
            if start > pre_start:
                dn1 -= 1
            dn2 = 1 if start > stop else 0
            time_dn_pairs.insert(0,((start,dn1),(stop,dn2)))
            pre_start,pre_stop = start,stop

        return time_dn_pairs

    def get_date_tradetimes(self,date:datetime.date):
        """ 得到合约date日期的交易时间段 """
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        return idx,trade_times

    def get_trade_datetimes(self,dt:datetime,allday:bool=False):
        """ 得到合约date日期的交易时间段 """

        # 得到最早的交易时间
        idx0,trade_times0 = self.get_date_tradetimes(self.listed_date)
        start0,stop0 = trade_times0[0]
        if dt < start0:
            return None,[]

        # 首先找到dt日期自上市以来的交易天数
        date,dn = dt.date(),0
        days = None
        while date < self.de_listed_date:
            days,ths = self.trade_date_index.get(date,(None,[]))
            if not days:
                dn += 1
                date = (dt+datetime.timedelta(days=dn)).date()
            else:
                break
        # 如果超出交割日也没有找到，那这就不是一个有效的交易时间
        if days is None:
            return (None,[])

        index_3 = [days,days+1,days-1]  # 前后三天的

        date_3d = []
        for day in index_3:
            date = self.trade_index_date.get(day,None)
            date_3d.append(date)

        # print(date_3d)

        for date in date_3d:
            if not date:
                # print(f"{date} is not trade date")
                continue

            idx,trade_dts = self.get_date_tradetimes(date)
            # print(f"{date} tradetimes {trade_dts}")
            ilen = len(trade_dts)
            if ilen > 0:
                start0,stop = trade_dts[0]      # start0 是date交易日的开始时间
                start,stop0 = trade_dts[-1]
            if dt<start0 or dt>stop0:
                continue

            for start,stop in trade_dts:
                if dt>=start and dt < stop:
                    if allday:
                        return idx,trade_dts
                    else:
                        return idx,[(start,stop)]

        return None,[]

    def get_trade_time_perday(self):
        """ 计算每日的交易总时长（单位：分钟） """
        TTPD = datetime.timedelta(0,0,0)

        datetimes = []
        today = datetime.datetime.now().date()

        for (start,dn1),(stop,dn2) in self.time_dn_pairs:
            start_dt = datetime.datetime.combine(today,start,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn1)
            stop_dt = datetime.datetime.combine(today,stop,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn2)
            time_delta = stop_dt - start_dt
            TTPD = TTPD + time_delta
        return int(TTPD.seconds/60)

    def get_trade_time_inday(self,dt:datetime,unit:Timeunit=Timeunit.MINUTE):
        """
        计算dt在交易日内的分钟数
        unit: '1s':second;'1m':minute;'1h';1h
        """
        TTID = datetime.timedelta(0,0,0)

        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            return None

        for start,stop in trade_times:
            if dt > stop:
                time_delta = stop - start
                TTID += time_delta
            elif dt > start:
                time_delta = dt - start
                TTID += time_delta
                break
            else:
                break

        if unit == Timeunit.SECOND:
            return TTID.seconds
        elif unit == Timeunit.MINUTE:
            return int(TTID.seconds/60)
        elif unit == Timeunit.HOUR:
            return int(TTID.seconds/3600)
        else:
            return TTID

    def convet_to_datetime(self,day:int,minutes:int):
        """ 计算minutes在第day交易日内的datetime形式的时间 """
        date = self.trade_index_date.get(day,None)
        if date is None:
            return None
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        if not trade_times:     # 不一定必要
            return None
        for (start,stop) in trade_times:
            timedelta = stop - start
            if minutes < int(timedelta.seconds/60):
                return start + datetime.timedelta(minutes=minutes)
            else:
                minutes -= int(timedelta.seconds/60)
        return None

    def get_bar_window(self,dt:datetime,window:int,interval:Interval=Interval.MINUTE):
        """ 计算dt所在K线的起止时间 """
        bar_windows = (None,None)

        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            # print(f"day={day} trade_times={trade_times}")
            return bar_windows


        # 求每个交易日的交易时间分钟数
        TTPD = self.get_trade_time_perday()

        # 求dt在交易日内的分钟数
        TTID = self.get_trade_time_inday(dt,unit=Timeunit.MINUTE)

        # 得到dt时刻K线的起止时间
        total_minites = day*TTPD + TTID

        # 计算K线宽度(分钟数)
        if interval == Interval.MINUTE:
            bar_width = window
        elif interval == Interval.HOUR:
            bar_width = 60*window
        elif interval == Interval.DAILY:
            bar_width = TTPD*window
        elif interval == Interval.WEEKLY:
            bar_width = TTPD*window*5
        else:
            return bar_windows

        # 求K线的开始时间的和结束的分钟形式
        start_m = int(total_minites/bar_width)*bar_width
        stop_m = start_m + bar_width

        # 计算K开始时间的datetime形式
        start_d = int(start_m / TTPD)
        minites = start_m % TTPD
        start_dt = self.convet_to_datetime(start_d,minites)
        # print(f"start_d={start_d} minites={minites}---->{start_dt}")

        # 计算K结束时间的datetime形式
        stop_d = int(stop_m / TTPD)
        minites = stop_m % TTPD
        stop_dt = self.convet_to_datetime(stop_d,minites)
        # print(f"stop_d={stop_d} minites={minites}---->{stop_dt}")

        return start_dt,stop_dt
```

**执行结果**
```python
RB2010 2020-07-28'd index: 191
RB2010's start:2020-07-27 21:01:00-stop:2020-07-27 23:00:00，timedelta=2:00:00
RB2010's start:2020-07-28 09:01:00-stop:2020-07-28 10:15:00，timedelta=1:15:00
RB2010's start:2020-07-28 10:31:00-stop:2020-07-28 11:30:00，timedelta=1:00:00
RB2010's start:2020-07-28 13:31:00-stop:2020-07-28 15:00:00，timedelta=1:30:00
RB2010's TTPD=5:45:00
RB2010's STTD=45 days, 18:15:00
TickTime=2020-07-28 14:37:00,tt_total=45 days, 23:37:00,STTK=45 days, 23:30:00
```

In [None]:
# 一个等交易长度、固定位置的K线产生器
from typing import Callable,List,Dict, Tuple, Union

#import vnpy.usertools.mylib as mylib

#from vnpy.app.cta_strategy import (
#    BarGenerator,
#    ArrayManager
#)

#from vnpy.trader.object import (
#    BarData,
#    TickData,
#    TradeData
#)

#from vnpy.trader.constant import Interval
from enum import Enum

import datetime
import rqdatac as rq
from rqdatac.utils import to_date

import pytz
CHINA_TZ = pytz.timezone("Asia/Shanghai")


''' 获得上市日期 '''
def get_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.listed_date)

''' 获得交割日期 '''
def get_de_listed_date(symbol:str):
    info = rq.instruments(symbol)
    return to_date(info.de_listed_date)


class Timeunit(Enum):
    """ 时间单位 """
    SECOND = '1s'
    MINUTE = '1m'
    HOUR = '1h'


class TradeHours(object):
    """ 合约交易时间段 """
    def __init__(self,symbol:str):
        self.symbol = symbol
        self.init()

    def init(self):
        """ 初始化交易日字典及交易时间段数据列表 """
        self.listed_date = get_listed_date(self.symbol)
        self.de_listed_date = get_de_listed_date(self.symbol)

        self.trade_date_index = {}   # 合约的交易日索引字典
        self.trade_index_date = {}   # 交易天数与交易日字典

        trade_dates = rq.get_trading_dates(self.listed_date,self.de_listed_date) # 合约的所有的交易日
        days = 0
        for td in trade_dates:
            self.trade_date_index[td] = days
            self.trade_index_date[days] = td
            days += 1

        trading_hours = rq.get_trading_hours(self.symbol,date=self.listed_date,frequency='tick',expected_fmt='datetime')

        self.time_dn_pairs = self._get_trading_times_dn(trading_hours)

        trading_hours0 = [(start.replace(tzinfo=CHINA_TZ),stop.replace(tzinfo=CHINA_TZ)) for start,stop in trading_hours]
        self.trade_date_index[self.listed_date] = (0,trading_hours0)
        for day in range(1,days):
            td = self.trade_index_date[day]
            trade_datetimes = []
            for (start,dn1),(stop,dn2) in self.time_dn_pairs:
                #start:开始时间,dn1：相对交易日前推天数,
                #stop :开始时间,dn2：相对开始时间后推天数
                d = self.trade_index_date[day+dn1]
                trade_datetimes.append((
                    datetime.datetime.combine(d,start,tzinfo=CHINA_TZ),
                    datetime.datetime.combine(d,stop,tzinfo=CHINA_TZ)+datetime.timedelta(days=dn2)))
            self.trade_date_index[td] = (day,trade_datetimes)

    def _get_trading_times_dn(self,trading_hours:List[Tuple[datetime.datetime,datetime.datetime]]):
        """
        交易时间跨天处理，不推荐外部使用 。
        产生的结果：[((start1,dn11),(stop1,dn21)),((start2,dn12),(stop2,dn22)),...,((startN,dn1N),(stopN,dn2N))]
        其中：
            startN:开始时间,dn1N:相对交易日前推天数,
            stopN:开始时间,dn2N:相对开始时间后推天数
        """
        ilen = len(trading_hours)
        if ilen == 0:
            return []
        start_stops = []
        for start,stop in trading_hours:
            start_stops.insert(0,(start.time(),stop.time()))

        pre_start,pre_stop = start_stops[0]
        dn1 = 0
        dn2 = 1 if pre_start > pre_stop else 0
        time_dn_pairs = [((pre_start,dn1),(pre_stop,dn2))]
        for start,stop in start_stops[1:]:
            if start > pre_start:
                dn1 -= 1
            dn2 = 1 if start > stop else 0
            time_dn_pairs.insert(0,((start,dn1),(stop,dn2)))
            pre_start,pre_stop = start,stop

        return time_dn_pairs

    def get_date_tradetimes(self,date:datetime.date):
        """ 得到合约date日期的交易时间段 """
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        return idx,trade_times

    def get_trade_datetimes(self,dt:datetime,allday:bool=False):
        """ 得到合约date日期的交易时间段 """

        # 得到最早的交易时间
        idx0,trade_times0 = self.get_date_tradetimes(self.listed_date)
        start0,stop0 = trade_times0[0]
        if dt < start0:
            return None,[]

        # 首先找到dt日期自上市以来的交易天数
        date,dn = dt.date(),0
        days = None
        while date < self.de_listed_date:
            days,ths = self.trade_date_index.get(date,(None,[]))
            if not days:
                dn += 1
                date = (dt+datetime.timedelta(days=dn)).date()
            else:
                break
        # 如果超出交割日也没有找到，那这就不是一个有效的交易时间
        if days is None:
            return (None,[])

        index_3 = [days,days+1,days-1]  # 前后三天的

        date_3d = []
        for day in index_3:
            date = self.trade_index_date.get(day,None)
            date_3d.append(date)

        # print(date_3d)

        for date in date_3d:
            if not date:
                # print(f"{date} is not trade date")
                continue

            idx,trade_dts = self.get_date_tradetimes(date)
            # print(f"{date} tradetimes {trade_dts}")
            ilen = len(trade_dts)
            if ilen > 0:
                start0,stop = trade_dts[0]      # start0 是date交易日的开始时间
                start,stop0 = trade_dts[-1]
            if dt<start0 or dt>stop0:
                continue

            for start,stop in trade_dts:
                if dt>=start and dt < stop:
                    if allday:
                        return idx,trade_dts
                    else:
                        return idx,[(start,stop)]

        return None,[]

    def get_trade_time_perday(self):
        """ 计算每日的交易总时长（单位：分钟） """
        TTPD = datetime.timedelta(0,0,0)

        datetimes = []
        today = datetime.datetime.now().date()

        for (start,dn1),(stop,dn2) in self.time_dn_pairs:
            start_dt = datetime.datetime.combine(today,start,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn1)
            stop_dt = datetime.datetime.combine(today,stop,tzinfo=CHINA_TZ) + datetime.timedelta(days=dn2)
            time_delta = stop_dt - start_dt
            TTPD = TTPD + time_delta
        return int(TTPD.seconds/60)

    def get_trade_time_inday(self,dt:datetime,unit:Timeunit=Timeunit.MINUTE):
        """
        计算dt在交易日内的分钟数
        unit: '1s':second;'1m':minute;'1h';1h
        """
        TTID = datetime.timedelta(0,0,0)

        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            return None

        for start,stop in trade_times:
            if dt > stop:
                time_delta = stop - start
                TTID += time_delta
            elif dt > start:
                time_delta = dt - start
                TTID += time_delta
                break
            else:
                break

        if unit == Timeunit.SECOND:
            return TTID.seconds
        elif unit == Timeunit.MINUTE:
            return int(TTID.seconds/60)
        elif unit == Timeunit.HOUR:
            return int(TTID.seconds/3600)
        else:
            return TTID

    def convet_to_datetime(self,day:int,minutes:int):
        """ 计算minutes在第day交易日内的datetime形式的时间 """
        date = self.trade_index_date.get(day,None)
        if date is None:
            return None
        idx,trade_times = self.trade_date_index.get(date,(None,[]))
        if not trade_times:     # 不一定必要
            return None
        for (start,stop) in trade_times:
            timedelta = stop - start
            if minutes < int(timedelta.seconds/60):
                return start + datetime.timedelta(minutes=minutes)
            else:
                minutes -= int(timedelta.seconds/60)
        return None

    def get_bar_window(self,dt:datetime,window:int,interval:Interval=Interval.MINUTE):
        """ 计算dt所在K线的起止时间 """
        bar_windows = (None,None)

        day,trade_times = self.get_trade_datetimes(dt,allday=True)
        if not trade_times:
            # print(f"day={day} trade_times={trade_times}")
            return bar_windows


        # 求每个交易日的交易时间分钟数
        TTPD = self.get_trade_time_perday()

        # 求dt在交易日内的分钟数
        TTID = self.get_trade_time_inday(dt,unit=Timeunit.MINUTE)

        # 得到dt时刻K线的起止时间
        total_minites = day*TTPD + TTID

        # 计算K线宽度(分钟数)
        if interval == Interval.MINUTE:
            bar_width = window
        elif interval == Interval.HOUR:
            bar_width = 60*window
        elif interval == Interval.DAILY:
            bar_width = TTPD*window
        elif interval == Interval.WEEKLY:
            bar_width = TTPD*window*5
        else:
            return bar_windows

        # 求K线的开始时间的和结束的分钟形式
        start_m = int(total_minites/bar_width)*bar_width
        stop_m = start_m + bar_width

        # 计算K开始时间的datetime形式
        start_d = int(start_m / TTPD)
        minites = start_m % TTPD
        start_dt = self.convet_to_datetime(start_d,minites)
        # print(f"start_d={start_d} minites={minites}---->{start_dt}")

        # 计算K结束时间的datetime形式
        stop_d = int(stop_m / TTPD)
        minites = stop_m % TTPD
        stop_dt = self.convet_to_datetime(stop_d,minites)
        # print(f"stop_d={stop_d} minites={minites}---->{stop_dt}")

        return start_dt,stop_dt

class FixedBarGenerator(BarGenerator):
    """ 固定位置K线生成器 """
    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE,
        symbol:str=''
    ):
        super().__init__(on_bar,window,on_window_bar,interval)
        self.trade_hours = TradeHours(symbol)

        self.kx_start,self.kx_stop = None,None

    def update_bar(self,bar:BarData) -> None:
        """
        Update 1 minute bar into generator
        """

        # 如果bar的时间戳笔windows的时间戳还早，丢弃bar
        if self.window_bar and bar.datetime < self.window_bar.datetime:
            return

        if (self.kx_start,self.kx_stop) == (None,None):
            self.kx_start,self.kx_stop = self.trade_hours.get_bar_window(bar.datetime,self.window,self.interval)
            if (self.kx_start,self.kx_stop) == (None,None):
                return

        # If not inited, creaate window bar object
        if (not self.window_bar):
            # 获得K线的交易起止时间
            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=self.kx_start,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )

        elif self.kx_start <= bar.datetime and bar.datetime < self.kx_stop:
            # 1分钟K线属于当前K线
            self.window_bar.high_price = max(
                self.window_bar.high_price, bar.high_price)
            self.window_bar.low_price = min(
                self.window_bar.low_price, bar.low_price)

        elif bar.datetime >= self.kx_stop:       # Check if window bar completed
            self.on_window_bar(self.window_bar)
            self.window_bar = None

            self.kx_start,self.kx_stop = self.trade_hours.get_bar_window(bar.datetime,self.window,self.interval)
            if (self.kx_start,self.kx_stop) == (None,None):
                # 不在交易时段
                return

            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=self.kx_start,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
            )

        # Update close price/volume into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += int(bar.volume)
        self.window_bar.open_interest = bar.open_interest