# 导入库

In [2]:
import os
from pathlib import Path
os.chdir(Path(os.getcwd()).parent.parent)

In [3]:
os.getcwd()

'D:\\LFProjects\\NewPythonProject'

# 商品池路径结构

商品池文件位于commodity_pool文件夹中，然后根据商品池类别设置文件夹，目前有FixedPool和DynamicPool两类，分别表示固定商品池和动态商品池。固定商品池又有许多种类，动态商品池也有许多种类,每个具体的商品池构成一个py文件，同时在py文件里面实现一个商品池类。例如：
![image.png](attachment:image.png)
commodity_pool文件夹中有DynamicPool文件夹，代表动态商品池这一类商品池，DynamicPool文件夹中又有一系列py文件，每个py文件中又有同名的类。
![image-2.png](attachment:image-2.png)

# 如何构建商品池

## 商品池基类BaseCommodityPool

1.根据商品池路径结构中所说，如果需要创建新的一类因子，则需要在commodity_pool文件夹中创建一个子文件夹。\
2.决定是否要创建该类商品池的基类商品池。

In [4]:
from commodity_pool.base import BaseCommodityPool

### 商品池基类BaseCommodityPool的属性和方法

In [5]:
[i for i in dir(BaseCommodityPool) if not i.startswith("__")]

['_abc_impl',
 '_get_param_names',
 'compute_commodity_pool',
 'get_commodity_pool_value',
 'get_limited_date',
 'get_params',
 'get_string',
 'get_volume_per_symbol',
 'set_commodity_pool_value',
 'set_params']

从上面可知，BaseCommodityPool定义了一系列方法:\
1.compute_commodity_pool是确定商品池计算逻辑的方法，使用者需要在compute_commodity_pool方法中完成商品池的计算，得到商品池DataFrame，即index为交易日期, columns为品种代码，data为True or False，True表示纳入商品池，False表示不纳入商品池。同时在完成计算后将因子值DataFrame传给commodity_pool_value属性，并返回商品池DataFrame。\
2.获取数据的方法，如get_limited_date, get_volume_per_symbol。\
3.继承自BaseCommodityPool的set_params和get_params，用于获取参数和修改参数。

In [6]:
import inspect
import pandas as pd
from pathlib import Path
from pandas import DataFrame
from datetime import timedelta
from abc import ABC, abstractmethod

from bases.base import BaseClass
from data_manager.DailyDataManager import DailyDataManager
from data_manager.BasicsDataManager import BasicsDataManager
from data_manager.ContinuousContractDataManager import ContinuousContractDataManager

class BaseCommodityPool(BaseClass):

    def __init__(self, **params) -> None:
        """Constructor"""
        super().__init__(**params)

        self.group: str = Path(inspect.getfile(self.__class__)).parent.name
        self.name: str = self.__class__.__name__

        self.daily_data_manager: DailyDataManager = DailyDataManager()
        self.basics_data_manager: BasicsDataManager = BasicsDataManager()
        self.continuous_contract_data_manager: ContinuousContractDataManager = ContinuousContractDataManager()

        self.daily_volume: DataFrame = None
        self.listed_date_df: DataFrame = None
        self.all_instruments: DataFrame = None
        self.commodity_pool_value: DataFrame = None

    def get_limited_date(self, days: int = 60) -> DataFrame:
        if not isinstance(self.all_instruments, DataFrame):
            all_instruments = self.basics_data_manager.get_all_instruments()
        else:
            all_instruments = self.all_instruments
        listed_date_df = all_instruments.sort_values(by='contract').\
            groupby('underlying_symbol', as_index=True)['listed_date'].nth(0).to_frame('listed_date')
        listed_date_df['listed_date'] = pd.to_datetime(listed_date_df['listed_date'])
        listed_date_df['limited_date'] = pd.DatetimeIndex(listed_date_df['listed_date']) + timedelta(days=days)
        self.listed_date_df = listed_date_df
        return listed_date_df

    def get_volume_per_symbol(self, window: int = 60) -> DataFrame:
        daily_data = self.daily_data_manager.get_daily_data()
        daily_volume = daily_data.groupby(['datetime', 'underlying_symbol'], as_index=True)['volume'].sum()
        daily_volume = daily_volume.unstack(level=-1).rolling(window=60, min_periods=0).mean().stack().\
            to_frame('volume').reset_index()
        self.daily_volume = daily_volume
        return daily_volume

    def set_commodity_pool_value(self, commodity_pool_value: DataFrame) -> None:
        self.commodity_pool_value = commodity_pool_value

    def get_commodity_pool_value(self) -> DataFrame:
        return self.commodity_pool_value

    @abstractmethod
    def compute_commodity_pool(self) -> DataFrame:
        raise NotImplementedError

    def __repr__(self):
        group = self.group
        name = self.name
        title = ''
        title += f"commodity_pool(group={group}, name={name}, "
        # 添加因子参数
        for key, value in self.get_params().items():
            title += f"{key}={value}, "
        title = title[:-2]
        title += ")"
        return title

    def get_string(self) -> str:
        return self.__repr__()

## 单类商品池的基类

某一些商品池会有这类商品池的特性，通过定义这类商品池的基类，可以简化这类商品池的各个具体的商品池的计算逻辑，常见的情形：\ 
1.商品池在计算时一些代码是公共的。\
2.同类商品池中的不同商品池需要同一个获取数据的方法。

注：目前还没有构建单类商品池的基类。

## 以DyanmicPool3为例构建商品池

1.动态商品池DynamicPool3继承自商品池基类BaseCommodityPool。\
2.在__init__中定义了参数q, window，并利用super()调用父类的__init__方法。\
3.在compute_commodity_pool中定义该动态商品池的计算逻辑，商品池DataFrame计算完毕后将其传给commodity_pool_value属性，并返回商品池DataFrame。

In [7]:
import numpy as np
from pandas import DataFrame
from commodity_pool.base import BaseCommodityPool

class DynamicPool3(BaseCommodityPool):
    """
    动态商品池3

    计算每个品种每日持仓量(当日品种各合约的持仓量之和）的滚动window日平均，将大于q分位数的品种纳入商品池
    
    Attributes
    __________
    q: float, default 0.25
        分位数
    
    window: int, default 126
            滚动窗口
    """
    def __init__(self, q: float = 0.25, window: int = 126) -> None:
        super().__init__(q=q, window=window)

    def compute_commodity_pool(self) -> DataFrame:
        daily_data = self.daily_data_manager.get_daily_data()
        daily_open_interest = daily_data.groupby(['datetime', 'underlying_symbol'], as_index=True)[
            'open_interest'].sum(). \
            unstack(level=-1)
        daily_rolling_open_interest = daily_open_interest.rolling(window=self.window, min_periods=0).mean()
        daily_rolling_open_interest.loc[:, ['IF', 'IH', 'IC', 'T', 'TF', 'TS', 'SC', 'NR', 'LU', 'BC']] = np.nan
        daily_quantile = daily_rolling_open_interest.quantile(q=self.q, axis=1, interpolation='higher')
        commodity_pool_value = (daily_rolling_open_interest.T >= daily_quantile).T
        commodity_pool_value.fillna(False, inplace=True)

        self.commodity_pool_value = commodity_pool_value
        return commodity_pool_value