## 二、数据及研究方法

在本部分，首先介绍了数据的来源，然后详细说明了 B-score 的构建方法以及检验 B-score 有效性的方法。

### （一）数据获取

使用 CSMAR 数据库，时间范围选取 2014-2023 年的全市场股票财务数据。

**1. Safety 维度**

$$
Safety=\frac{1}{leverage}=\frac{1}{0.38\times MLEV+0.35\times DTOA+0.27\times BLEV}
$$

$$
MLEV=\frac{普通股市场价值+优先股+长期负债}{普通股市场价值}, BLEV=\frac{普通股账面价值+优先股+长期负债}{普通股账面价值}, DTOA=\frac{总负债}{总资产}
$$

其中普通股市场价值用上一个交易日的股票收盘价计算，其余数据用上一财年的年报计算。普通股账面价值=所有者权益总计-优先股

**2. Cheapness 维度**

$$
PEG=\frac{市盈率(TTM)}{利润增长率\times 100}
$$

$$
利润增长率=\frac{利润总额_{n+1}}{利润总额_n}-1
$$

**3. Quality 维度**

$$
Quality=\frac{净利润}{营业利润}
$$

In [354]:
import numpy as np
import pandas as pd
import os
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

In [154]:
def fetch_data(rel_path):
    files = os.listdir(rel_path)
    csv_name = [x for x in files if x.split(".")[1] == "csv"][0]
    txt_name = [x for x in files if x.split(".")[1] == "txt"][0]
    txt_file = open(os.path.join(rel_path, txt_name), encoding="utf-8")
    csv_data = pd.read_csv(os.path.join(rel_path, csv_name), dtype={0: str})
    content = txt_file.readlines()
    name_list = []
    for line in content:
        code, name = line.split(" [")
        name = name.split("]")[0]
        name_list.append(name)

    # rename columns
    csv_data.columns = name_list

    # clean the data
    csv_data = csv_data.fillna(0)
    if "交易月份" in csv_data.columns:
        csv_data["交易月份"] = pd.to_datetime(csv_data["交易月份"])
    else:
        csv_data["统计截止日期"] = pd.to_datetime(csv_data["统计截止日期"])

    # use type A report
    if "报表类型" in name_list:
        csv_data = csv_data[csv_data["报表类型"] == "A"]

    return csv_data

raw_data = {}
mapping = {"bal": "资产负债表", "inc": "利润表", "pe": "相对价值指标", "ret": "月个股回报率", "day": "公布日期"}
for item in list(mapping.keys()):
    path = f'../data/{mapping[item]}'
    raw_data[item] = fetch_data(path)

### （二）B-score 的构建

考虑到年/季报晚于统计截止日期，为了消除前瞻偏差（look- ahead bias），需要对数据做特殊处理

In [355]:
def get_calendar():
    cal = pd.date_range(start="2013-12-31", end="2023-12-31", freq="ME")
    cal = cal.strftime("%Y-%m")
    return cal.tolist()

def get_symbols(raw_data):
    symbol_lists = []
    for item in list(raw_data.keys()):
        tmp_df = raw_data[item]
        index_name = "证券代码" if "证券代码" in tmp_df.columns else "股票代码"
        symbol_lists.append(tmp_df[index_name].tolist())
    tmp_set = set()
    for i in range(len(symbol_lists)):
        if i == 0:
            tmp_set = set(symbol_lists[i])
        else:
            tmp_set = tmp_set & set(symbol_lists[i])  # intersaction

    return sorted(list(tmp_set))

def init_df():
    return pd.DataFrame(0, index=get_calendar(), columns=get_symbols(raw_data))

def store_csv(df, name):
    output_dir = "../output/"
    df.to_csv(output_dir + name + ".csv")

In [356]:
def process_data(raw_data, key, variables=None):
    clean_data = {}
    tmp_df = init_df()
    if variables is None:
        variables = raw_data[key].columns[4:]
    for var in tqdm(variables, desc="variables"):
        if f"{var}.csv" in os.listdir("../output/"):  # skip existed
            continue
        index_name = "证券代码" if "证券代码" in raw_data[key].columns else "股票代码"  # syntax diff in pe
        tmp_tb = pd.pivot(raw_data[key], index=index_name, columns="统计截止日期", values=var)
        report_df = raw_data["day"].groupby(["证券代码", "统计截止日期"])["报告公布日期"].first()
        report_df = pd.to_datetime(report_df, errors='coerce')
        report_df = report_df.dropna()
        report_df = report_df.dt.strftime("%Y-%m")

        for (symbol, day), month in report_df.items():  # match by unique symbol and day
            if (month in tmp_df.index) & (symbol in tmp_df.columns):
                tmp_df.loc[month, symbol] = tmp_tb[var].loc[symbol, day] if len(variables) > 1 else tmp_tb.loc[symbol, day]
        
        tmp_df = tmp_df.replace(0, np.nan).ffill()
        tmp_df = tmp_df.dropna(axis=0, how="all").fillna(0)
        store_csv(tmp_df, name=var)
        
        clean_data[var] = tmp_df

    return clean_data

In [None]:
pe_data = process_data(raw_data, "pe", ["市盈率（PE）TTM"])
bal_data = process_data(raw_data, "bal")
inc_data = process_data(raw_data, "inc")