# 财务报表数据清洗

> 针对wind 三张表的原始数据进行数据清洗、数据填充、报表离散化、TTM计算

## 0 - 导入数据

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import datetime
from pandas.tseries.offsets import MonthEnd

WindData_path = '/root/pbcsf/WindDataBase/data/'

df_balance=pd.read_csv(WindData_path + 'asharebalancesheet.csv', encoding='utf-8', low_memory=False)
df_income=pd.read_csv(WindData_path + 'ashareincome.csv', encoding='utf-8', low_memory=False)
df_cashflow=pd.read_csv(WindData_path + 'asharecashflow.csv', encoding='utf-8', low_memory=False)

In [10]:
print(len(df_balance))
print(df_balance.columns.to_list())
df_balance.head(20)[['REPORT_PERIOD','ANN_DT','ACTUAL_ANN_DT']]

610532
['OBJECT_ID', 'S_INFO_WINDCODE', 'WIND_CODE', 'ANN_DT', 'REPORT_PERIOD', 'STATEMENT_TYPE', 'CRNCY_CODE', 'MONETARY_CAP', 'TRADABLE_FIN_ASSETS', 'NOTES_RCV', 'ACCT_RCV', 'OTH_RCV', 'PREPAY', 'DVD_RCV', 'INT_RCV', 'INVENTORIES', 'CONSUMPTIVE_BIO_ASSETS', 'DEFERRED_EXP', 'NON_CUR_ASSETS_DUE_WITHIN_1Y', 'SETTLE_RSRV', 'LOANS_TO_OTH_BANKS', 'PREM_RCV', 'RCV_FROM_REINSURER', 'RCV_FROM_CEDED_INSUR_CONT_RSRV', 'RED_MONETARY_CAP_FOR_SALE', 'OTH_CUR_ASSETS', 'TOT_CUR_ASSETS', 'FIN_ASSETS_AVAIL_FOR_SALE', 'HELD_TO_MTY_INVEST', 'LONG_TERM_EQY_INVEST', 'INVEST_REAL_ESTATE', 'TIME_DEPOSITS', 'OTH_ASSETS', 'LONG_TERM_REC', 'FIX_ASSETS', 'CONST_IN_PROG', 'PROJ_MATL', 'FIX_ASSETS_DISP', 'PRODUCTIVE_BIO_ASSETS', 'OIL_AND_NATURAL_GAS_ASSETS', 'INTANG_ASSETS', 'R_AND_D_COSTS', 'GOODWILL', 'LONG_TERM_DEFERRED_EXP', 'DEFERRED_TAX_ASSETS', 'LOANS_AND_ADV_GRANTED', 'OTH_NON_CUR_ASSETS', 'TOT_NON_CUR_ASSETS', 'CASH_DEPOSITS_CENTRAL_BANK', 'ASSET_DEP_OTH_BANKS_FIN_INST', 'PRECIOUS_METALS', 'DERIVATIVE_FI

Unnamed: 0,REPORT_PERIOD,ANN_DT,ACTUAL_ANN_DT
0,20131231,20150324.0,20150324.0
1,20180331,20180428.0,20180428.0
2,20180331,20180428.0,20180428.0
3,20171231,20180331.0,20180331.0
4,20161231,20180331.0,20180331.0
5,20171231,20180331.0,20180331.0
6,20161231,20180331.0,20180331.0
7,20171231,20180331.0,20180331.0
8,20161231,20180331.0,20180331.0
9,20201231,20210402.0,20210402.0


## 1 - 基本数据清洗

### 1.1 时间戳转化
### 1.2 保留最新报表，保留主板股票
### 1.3 剔除无用数据
* 1998年之前只有年报
* 1998 - 2002只有年报和半年报
### 1.4 一家公司一年只有四种报告类型，3，6，9，12

In [11]:
"""
STEP 1 : DATA CLEANING
依次运行以下四个函数
"""

def date_type_change(df,date_column_list):
    '''
    将指定的列转换日期格式
    '''
    for date_column in date_column_list:
        df[date_column] = pd.to_datetime(df[date_column], format='%Y%m%d')
    return df


def data_filter(df):
    '''
    仅保留最新更新的报表数据，仅保留主板股票
    '''
    # ？
    df = df.loc[(df['STATEMENT_TYPE'] == 408005000) | (df['STATEMENT_TYPE'] == 408001000)].sort_values(
    ['S_INFO_WINDCODE', 'REPORT_PERIOD', 'STATEMENT_TYPE'])
    
    # 保留最新的报表数据
    # TODO 这样的保留方式是否会使一些信息没有被有效的利用，因为刚开始有一些报表如果不是最后的一个，就会认为这时候没有任何报表信息，这是不符合实际投资的
    df.drop_duplicates(subset=['S_INFO_WINDCODE', 'REPORT_PERIOD'], keep='last', inplace=True)
    # 保留主板的股票
    df = df[(df['WIND_CODE'].str.startswith('0')) | (df['WIND_CODE'].str.startswith('3'))
                       | (df['WIND_CODE'].str.startswith('6'))].copy()    
    return df


def drop_useless_data(income_sub, time_var='REPORT_PERIOD'):
    '''
    This function is used to drop useless finance data: 
        1) we can only trust annual report before 1998;
        2) we can only trust semi-annual report between 1998 and 2002

    :param income_sub: a pandas DataFrame need to be converted
    :param time_var: time indicator, 'REPORT_PERIOD' is default value

    :return: a pandas DataFrame which will not contain any useless finance data.
    
    '''

    income_sub['year_temp'] = income_sub[time_var].apply(lambda x: int(x.year))
    income_sub['month_temp'] = income_sub[time_var].apply(lambda x: x.month)

    # 1998 年以前只能有年报
    temp_index = income_sub[(income_sub['year_temp'] < 1998) & (income_sub['month_temp'] != 12)].index
    income_sub.drop(index=temp_index, inplace=True)


    # 1998 到 2002 年以前只能有半年报
    temp_index = income_sub[
        (income_sub['year_temp'] >= 1998) & (income_sub['year_temp'] < 2002) & (income_sub['month_temp'] == 3)].index
    income_sub.drop(index=temp_index, inplace=True)

    temp_index = income_sub[
        (income_sub['year_temp'] >= 1998) & (income_sub['year_temp'] < 2002) & (income_sub['month_temp'] == 9)].index
    income_sub.drop(index=temp_index, inplace=True)


    del income_sub['year_temp']
    del income_sub['month_temp']

    return income_sub


def get_month_right(balance_data, time_var='REPORT_PERIOD'):
    '''
    一家公司一年只能有这4种报告类型

    :param income_sub: a pandas DataFrame need to be converted
    :param time_var: time indicator, 'REPORT_PERIOD' is default value

    :return: a pandas DataFrame which will not contain any useless finance data.
    '''    
    balance_data1 = balance_data.copy()
    balance_data1['month_temp'] = balance_data1[time_var].apply(lambda x: x.month)
    balance_data1 = balance_data1[(balance_data1['month_temp'] == 3) | (balance_data1['month_temp'] == 6) |
                                  (balance_data1['month_temp'] == 9) | (balance_data1['month_temp'] == 12)]

    del balance_data1['month_temp']
    return balance_data1

In [12]:
df_balance=date_type_change(df_balance,['REPORT_PERIOD','ANN_DT','ACTUAL_ANN_DT'])
df_income=date_type_change(df_income,['REPORT_PERIOD','ANN_DT','ACTUAL_ANN_DT'])
df_cashflow=date_type_change(df_cashflow,['REPORT_PERIOD','ANN_DT','ACTUAL_ANN_DT'])

df_balance=data_filter(df_balance)
df_income=data_filter(df_income)
df_cashflow=data_filter(df_cashflow)

df_balance=drop_useless_data(df_balance)
df_income=drop_useless_data(df_income)
df_cashflow=drop_useless_data(df_cashflow)

df_balance=get_month_right(df_balance)
df_income=get_month_right(df_income)
df_cashflow=get_month_right(df_cashflow)

# 2 - 获取最新交易日期

* 1） 通过比较ANN_DT和ACTUAL_ANN_DT,取较小值，生成TRADE_DT
* 2） 计算TRADE_DT和REPORT_PERIOD之间的days gap，如果gap大于400天则认为数据存在问题，将TRADE_DT的数据重制为PERIOD延后1或2或4个月的时间，具体取决于季报，半年报，年报的种类
* 3） 如果ANN_DT和ACTUAL_ANN_DT存在缺失，直接在period的基础上进行延后

In [13]:
#传入值balance是未处理的财务报表
#作用是删去间隔时间过长的样本，返回值是清洗过后的报表

#month_adjust的作用是获取异常日期的调整数，季报调整一个月，半年报调整两个月，年报调整4个月（2019年报为6个月）
def month_adjust(array):
    '''
    get_ann_dt调用的子函数，作用是给财报发布日期缺失的样本添加日期，即向后顺延n个月，n由报告期决定
    '''
    if((array.REPORT_PERIOD.month==3)|(array.REPORT_PERIOD.month==9)):
        month_adjust_num=1
    elif(array.REPORT_PERIOD.month==6):
        month_adjust_num=2
    elif(array.REPORT_PERIOD.month==12):
        if(array.REPORT_PERIOD.year==2019):
            month_adjust_num=6
        else:
            month_adjust_num=4
    else:
        print('error')
    array.TRADE_DT=array.REPORT_PERIOD+MonthEnd(month_adjust_num)
    return array


def get_ann_dt(balance):
    
    '''
    基于财务报表公告日期来定位出财报数据最早可用的时间
    
    ANN_DT   公告日期 ：指定期报告公布的日期
    ACTUAL_ANN_DT   实际公告日期：指出了更正公告的日期


    * 检查并处理财报公布日期与财报年度差距或过小问题
    * 检查并处理处理财报公布日期早于财报年度问题
    
    ANN_DT>ACTUAL_ANN_DT原因： 07年新旧准则替换，我们会把20080630中报公布的新准则数据20070630调整数复制出来，替换成20070630当年公布的旧准则数据。
    用ACTUAL_ANN_DT来记录旧准则公布的时间，ANN_DT是新准则公布的时间。这时的ANN_DT是大于ACTUAL_ANN_DT的。除去这种情况ANN_DT是记录正确的公告日期。

    '''
    
    balance['year_temp'] = balance['REPORT_PERIOD'].apply(lambda x: int(x.year))
    balance['month_temp'] = balance['REPORT_PERIOD'].apply(lambda x: x.month)
    
    balance['TRADE_DT'] = np.nanmin(balance.loc[:, ['ANN_DT', 'ACTUAL_ANN_DT']], axis=1)
        #balance['TRADE_DT'] = pd.to_datetime(balance['TRADE_DT'], format='%Y%m%d')
    balance['gap_num'] = balance['TRADE_DT'] - balance['REPORT_PERIOD']
    balance['gap_num'] = balance['gap_num'].apply(lambda x: x.days)
    
    # TODO if gap_num higher than 4*22 = 88 days or less than min_gap_days set gap to 4 month
    
    temp_index = balance[(balance['month_temp']==12)&(balance['gap_num'] > 400)&(balance['year_temp']!=2019)].index
    # todo 在2008年1月1日以前的，直接通过+时间来处理，具体处理方式如下：年报+4，半年报+2，季报+1
    balance.loc[temp_index, 'TRADE_DT'] = balance.loc[temp_index, 'REPORT_PERIOD'] + MonthEnd(4)
    

    temp_index = balance[(balance['month_temp']==3)&(balance['gap_num'] > 400)].index
    # todo 在2008年1月1日以前的，直接通过+时间来处理，具体处理方式如下：年报+4，半年报+2，季报+1
    balance.loc[temp_index, 'TRADE_DT'] = balance.loc[temp_index, 'REPORT_PERIOD'] + MonthEnd(1)
    
    
    temp_index = balance[(balance['month_temp']==9)&(balance['gap_num'] > 400)].index
    # todo 在2008年1月1日以前的，直接通过+时间来处理，具体处理方式如下：年报+4，半年报+2，季报+1
    balance.loc[temp_index, 'TRADE_DT'] = balance.loc[temp_index, 'REPORT_PERIOD'] + MonthEnd(1)

    # 没有6是因为半年报没有这样gap过大的情况
    
    # 处理report date都为NaN的情况（？）
    temp_index = balance[pd.isnull(balance[ 'TRADE_DT']) == True].index
    balance.loc[temp_index] = balance.loc[temp_index].apply(lambda x:month_adjust(x),axis=1)
    balance.drop(['year_temp','month_temp'],axis=1,inplace=True)
    
    return balance

In [14]:
df_balance=get_ann_dt(df_balance)
df_income=get_ann_dt(df_income)
df_cashflow=get_ann_dt(df_cashflow)

In [6]:
"""
生成每个公司对应财务季的财务报表最早可用时间，通过合并三张财务报表得到
"""

#返回值是['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']的list
#此函数的作用是统一化三张报表的TRADE_DT,如果日期存在差异则取最小值，合并后TRADE_DT_x代表传入的第一个报表，TRADE_DT_y为第二个，TRADE_DT为第三个
#传入的三张报表数据必须是一个股票一个会计期一个报表的情况（也就是不同的报表类型只保留一份）
def create_TRADE_DT(df_balance,df_income,df_cashflow):
    '''
    #返回值是['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']的list
    #此函数的作用是统一化三张报表的TRADE_DT,如果日期存在差异则取最小值，合并后TRADE_DT_x代表传入的第一个报表，TRADE_DT_y为第二个，TRADE_DT为第三个
    #传入的三张报表数据必须是一个股票一个会计期一个报表的情况（也就是不同的报表类型只保留一份
    # 由于之前已经保留了每个报告期的最新报表，所以这个条件自动符合    
    '''
    df_balance_s=df_balance[['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']].copy()
    df_income_s=df_income[['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']].copy()
    df_cashflow_s=df_cashflow[['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']].copy()
    
    df_balance_s.rename({'TRADE_DT':'TRADE_DT_balance'},axis=1,inplace=True)
    df_income_s.rename({'TRADE_DT':'TRADE_DT_income'},axis=1,inplace=True)
    df_cashflow_s.rename({'TRADE_DT':'TRADE_DT_cashflow'},axis=1,inplace=True)
    
    df_combine=pd.merge(df_balance_s,df_income_s,on=['S_INFO_WINDCODE','REPORT_PERIOD'],how='outer')
    df_combine=pd.merge(df_combine,df_cashflow_s,on=['S_INFO_WINDCODE','REPORT_PERIOD'],how='outer')
    
    df_combine['TRADE_DT']=df_combine[['TRADE_DT_balance','TRADE_DT_income','TRADE_DT_cashflow']].min(axis=1)#把日期更新为最小值
    
    df_TRADEDT=df_combine[['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']].copy()
    
    return df_TRADEDT

# TODO 很多问题
def update_TRADE_DT(df_balance,df_income,df_cashflow,df_TRADEDT_old,startdate=None,enddate=None):
    '''
    #返回值是一个更新后的df_TRADEDT_update
    #df_balance,df_income,df_cashflow的要求与create_TRADE_DT相同
    #df_TRADEDT_old是旧版本的df_TRADEDT
    #startdate和enddate要求传入字符串如“2000-01-01”，并且这里的起始和结束针对的是REPORT_PERIOD而不是TRADE_DT
    #比如startdate传入“2000-01-01”，enddate传入“2010-12-31”，那么1999年的年报（在2000年发表）不会包括在内
    #如果startdate不传入参数，则默认为最新一期报告的前12个月，如果enddate不传入参数，则默认为调用函数当天
    '''
    
    df_balance_s=df_balance[['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']].copy()
    df_income_s=df_income[['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']].copy()
    df_cashflow_s=df_cashflow[['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']].copy()
    
    
    df_balance_s.rename({'TRADE_DT':'TRADE_DT_balance'},axis=1,inplace=True)
    df_income_s.rename({'TRADE_DT':'TRADE_DT_income'},axis=1,inplace=True)
    df_cashflow_s.rename({'TRADE_DT':'TRADE_DT_cashflow'},axis=1,inplace=True)
    

    if(startdate==None):
        # 如果没有输入start date，则取填充数据最大报表期的最大值的前一年
        startdate=df_balance_s.REPORT_PERIOD.max()-MonthEnd(12)
    else:
        startdate=pd.to_datetime(startdate)

    df_balance_s=df_balance_s[df_balance_s['REPORT_PERIOD']>=startdate].copy()
    df_income_s=df_income_s[df_income_s['REPORT_PERIOD']>=startdate].copy()
    df_cashflow_s=df_cashflow_s[df_cashflow_s['REPORT_PERIOD']>=startdate].copy()   

    if(enddate!=None):
        enddate=pd.to_datetime(enddate)
        df_balance_s=df_balance_s[df_balance_s['REPORT_PERIOD']<=enddate].copy()
        df_income_s=df_income_s[df_income_s['REPORT_PERIOD']<=enddate].copy()
        df_cashflow_s=df_cashflow_s[df_cashflow_s['REPORT_PERIOD']<=enddate].copy()        

    df_combine=pd.merge(df_balance_s,df_income_s,on=['S_INFO_WINDCODE','REPORT_PERIOD'],how='outer')
    df_combine=pd.merge(df_combine,df_cashflow_s,on=['S_INFO_WINDCODE','REPORT_PERIOD'],how='outer')

    df_combine['TRADE_DT']=df_combine[['TRADE_DT_balance','TRADE_DT_income','TRADE_DT_cashflow']].min(axis=1)#把日期更新为最小值

    df_TRADEDT_new=df_combine[['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']].copy()
    df_TRADEDT_old_1=df_TRADEDT_old[(df_TRADEDT_old['REPORT_PERIOD'])<startdate].copy()
    
    if(enddate!=None):
        df_TRADEDT_old_2=df_TRADEDT_old[(df_TRADEDT_old['REPORT_PERIOD'])>enddate].copy()
        df_TRADEDT_update=pd.concat([df_TRADEDT_old_1,df_TRADEDT_new])
    else:
        # TODO BUG
        df_TRADEDT_update=pd.concat([df_TRADEDT_old_1,df_TRADEDT_old_2,df_TRADEDT_new])

    return df_TRADEDT_update

# 3 - 填充缺失报表

## 3.1 填充缺失REPORT_PERIOD

In [10]:
def fill_row_stk(df_stk,report_date_list):
    
    '''
    #该函数是fill_row函数对每个股票分别调用的子函数，不需要自行调用
    #df_stk是该股票的数据切片,report_date_list是所有的REPORT_PERIOD数据
    #返回值是每个股票填补空行后的数据
    '''
    start_date=df_stk.REPORT_PERIOD.min()
    end_date=df_stk.REPORT_PERIOD.max()
    # 找出理论上应该有的报表期
    stk_report_date=[report_date for report_date in report_date_list if((report_date<=end_date)and (report_date>=start_date))]
    # 如果现在的报表期和理论的报表期相同，则说明没有报表期缺失
    if(len(stk_report_date)==df_stk.shape[0]):
        pass
    else:
        df_stk.index=df_stk.REPORT_PERIOD
        df_stk=df_stk.reindex(stk_report_date) # reindex会按照大小顺序自动补齐index中缺失的值，并且在所有的col填nan
        df_stk['S_INFO_WINDCODE']=[df_stk['S_INFO_WINDCODE'].unique()[0]]*df_stk.shape[0]
        df_stk.REPORT_PERIOD=df_stk.index # 借助index补齐periods
        df_stk['year_temp'] = df_stk['REPORT_PERIOD'].apply(lambda x: int(x.year))
        df_stk['month_temp'] = df_stk['REPORT_PERIOD'].apply(lambda x: x.month)        
    return df_stk
   

def fill_row(df):    
    '''
    #该函数作用是找出全部观测值缺失的样本（比如某个季度没发表季报），并填充回去
    #df是传入的财务报告（要求每个股票每个报告期只保留一种报告类型）
    '''
    report_date_list=df.REPORT_PERIOD.unique()
    report_date_list.sort()
    df=df.groupby('S_INFO_WINDCODE',as_index=False).apply(lambda x:fill_row_stk(x,report_date_list))
    df.index=list(range(df.shape[0]))
    return df


In [None]:
df_balance=fill_row(df_balance)
df_income=fill_row(df_income)
df_cashflow=fill_row(df_cashflow)

以下所有的填充都是基于以上填充了nan之后的报表：以下的填充会检测是否存在nan值后进行对应的表的对应填充

## 3.2 填充流量表
（一年中进行累加的表）

In [62]:
#fill_empty_stk_flow和 fill_empty_stk_stock是fill_empty中调用的子函数，不需要自行调用，flow代表流量表，stock代表存量表

#fill_empty_stk_year_flow是fill_empty_flow在 #flow_fill_method='copy_front'调用的子函数
def fill_empty_stk_year_flow(df_stk_year,column_list):
    '''
    #fill_empty_stk_year_flow是fill_empty_flow在#flow_fill_method='copy_front'调用的子函数
    '''
    for column in column_list:
        if(df_stk_year[column].isnull().any()==False): # 如果不存在nan，则直接返回
            pass
        else:
        #  取出nan的idx，逐一进行填充         
        #  更简单的方法是用sorting，bfill，然后replace（nan，0）   
            empty_index_list=np.isnan(df_stk_year[column])[np.isnan(df_stk_year[column])].index.tolist()
            for empty_index in empty_index_list:
                try:
                    df_stk_year[column].loc[empty_index]=df_stk_year[column].loc[empty_index-1]
                except:#说明这是第一条，应该赋值为0
                    df_stk_year[column].loc[empty_index]=0
    
    return df_stk_year



#fill_empty_stk_flow是fill_empty_flow在#flow_fill_method='average_front'调用的子函数
def fill_empty_stk_flow(df_stk,column_list,max_retreat_year):
    '''
    #fill_empty_stk_year_flow是fill_empty_flow在#flow_fill_method='copy_front'调用的子函数
    '''
    for column in column_list: # 对每一个需要计算的column进行处理
        
        if((df_stk[column].isnull().any()==False)|(df_stk[column].isnull().all()==True)): # 全为空或者全部非空则不需要处理
            pass
        else:
            empty_index_list=np.isnan(df_stk[column])[np.isnan(df_stk[column])].index.tolist() # 找出空值所在的index，此前通过重置index已经保证了所有的index都是连续的
            min_nonempty_index=df_stk[column].notna()[df_stk[column].notna()].index.min() # 找出索引最小的非空值（也就是该股票在该项目首次发布数值的时间）
            empty_index_list=[empty_index for empty_index in empty_index_list if (empty_index>min_nonempty_index)] # 找出可以被填补的缺失值（此前有数据可追溯的缺失值）
            if(len(empty_index_list)==0):
                continue
            
            last_nonempty_index=min(empty_index_list)-1 # last_nonempty_index记录的是距离当前缺失值最近的非缺失值索引，方便判断时间间隔是否超过了max_retreat_year
            
            for i in range(len(empty_index_list)):
            
                empty_index=empty_index_list[i]
                if(empty_index==empty_index_list[i-1]+1): # 判断这个值的上一个值在原始报表中是否为空值             
                    last_nonempty_index=last_nonempty_index # 如果上一个值为空值，则last_nonempty_index不更新
                else:
                    last_nonempty_index=empty_index-1 # 如果上一个值不为空值，则last_nonempty_index更新为该空值的上一个值
                    
                last_nonempty_time=df_stk.REPORT_PERIOD.loc[last_nonempty_index] # 获取最近的非缺失值发布的时间
                this_time=df_stk.REPORT_PERIOD.loc[empty_index] # 获取当前的时间
            
                if(this_time-last_nonempty_time>=datetime.timedelta(days=max_retreat_year*365)): # 超过了max_retreat_year，不填充
                    pass
                else: 
                    # 这里用empty index-1是合理的，因为就算前一个数据是填充得到的，只要gap不超过max year（gap是由last empty计算的），则填充的值都是相同的
                    last_period=df_stk.month_temp.loc[empty_index-1]
                    this_period=df_stk.month_temp.loc[empty_index]
                    df_stk[column].loc[empty_index]=df_stk[column].loc[empty_index-1]*this_period/last_period 
            
    return df_stk
                                    

def fill_empty_flow(df_test,column_list,flow_fill_method='average_front',max_retreat_year=1):
    '''
    # fill_empty_xxx函数的作用是将某一列的空缺值进行填补,fill_empty_flow是对流量表处理，fill_empty_stock是对存量表处理
    # 输出值是['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']以及填补缺失值后的列

    # 输入值df_test是等待处理的原始报表，column_list是需要填补缺失值的列名
    
    # flow_fill_method可选参数为'copy_front'和'average_front'
        #'copy_front'是把流量表中每年第一个空值替换为0，其余空值替换为前值，例如  nan 6 nan 12 替换成 0 6 6 12
        #'average_front'把流量表中的空值用前一个非空值计算出来，比如去年年报利润报告为12，今年一季报没有报告利润，那么补充的值为12/4*1=3
        #'copy_front'方法中每年一季报如果为为nan则替换为0，无法利用前一年数据，但'average_front'的一季报为空可以利用前一年的数据
    # max_retreat_year为最长回溯的时间，单位为年，默认值为1,由于'copy_front'不涉及到使用前一年的数据，因此该方法不需考虑max_retreat
    
    '''
    df_test.sort_values(['S_INFO_WINDCODE','REPORT_PERIOD'],inplace=True)  
    df_test.index=list(range(df_test.shape[0]))
    df_test['year_temp'] = df_test['REPORT_PERIOD'].apply(lambda x: int(x.year))
    df_test['month_temp'] = df_test['REPORT_PERIOD'].apply(lambda x: x.month)

    if(flow_fill_method=='copy_front'):
        df_test=df_test.groupby(['S_INFO_WINDCODE','year_temp']).apply(lambda x:fill_empty_stk_year_flow(x,column_list))
    elif(flow_fill_method=='average_front'):
        df_test=df_test.groupby('S_INFO_WINDCODE').apply(lambda x:fill_empty_stk_flow(x,column_list,max_retreat_year))
    else:
        print('输入的flow_fill_method参数有误，现返回原输入的dataframe')
        return df_test
        
    return_list=['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']+column_list
    
    return df_test[return_list]


## 3.3 填充存量表

In [63]:
#fill_empty_stk_stock是fill_empty_stock调用的子函数
def fill_empty_stk_stock(df_stk,column_list,max_retreat_year):
    '''
    对存量表进行处理，这是一个被调用的子函数
    '''
    for column in column_list:
        
        if((df_stk[column].isnull().any()==False)|(df_stk[column].isnull().all()==True)):#全为空或者全部非空则不需要处理
            pass
        
        else:
            
            empty_index_list=np.isnan(df_stk[column])[np.isnan(df_stk[column])].index.tolist() # 找出空值所在的index，此前通过重置index已经保证了所有的index都是连续的
            min_nonempty_index=df_stk[column].notna()[df_stk[column].notna()].index.min() # 找出索引最小的非空值（也就是该股票在该项目首次发布数值的时间）
            empty_index_list=[empty_index for empty_index in empty_index_list if (empty_index>min_nonempty_index)] # 找出可以被填补的缺失值（此前有数据可追溯的缺失值）
            
            if(len(empty_index_list)==0):
                continue
            
            last_nonempty_index=min(empty_index_list)-1 # last_nonempty_index记录的是距离当前缺失值最近的非缺失值索引，方便判断时间间隔是否超过了max_retreat_year
            for i in range(len(empty_index_list)):
            
                empty_index=empty_index_list[i]

                if(empty_index==empty_index_list[i-1]+1): # 判断这个值的上一个值在原始报表中是否为空值
                    last_nonempty_index=last_nonempty_index # 如果上一个值为空值，则last_nonempty_index不更新
                else:
                    last_nonempty_index=empty_index-1 # 如果上一个值不为空值，则last_nonempty_index更新
                    
                last_nonempty_time=df_stk.REPORT_PERIOD.loc[last_nonempty_index] # 获取最近的非缺失值发布的时间
                this_time=df_stk.REPORT_PERIOD.loc[empty_index]
            
                if(this_time-last_nonempty_time>=datetime.timedelta(days=max_retreat_year*365)): # 超过了max_retreat_year
                    pass
                else:
                    df_stk[column].loc[empty_index]=df_stk[column].loc[empty_index-1]

    return df_stk

#fill_empty_xxx函数的作用是将某一列的空缺值进行填补,fill_empty_flow是对流量表处理，fill_empty_stock是对存量表处理
#输出值是['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']以及填补缺失值后的列
#输入值df_test是等待处理的原始报表，column_list是需要填补缺失值的列名
def fill_empty_stock(df_test,column_list,max_retreat_year=1):
    
    df_test.sort_values(['S_INFO_WINDCODE','REPORT_PERIOD'],inplace=True)  
    df_test.index=list(range(df_test.shape[0]))
    df_test=df_test.groupby('S_INFO_WINDCODE').apply(lambda x:fill_empty_stk_stock(x,column_list,max_retreat_year))
    return_list=['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']+column_list
    
    return df_test[return_list]

# 4 - 生成DISCRETE报表 & TTM操作

## 4.1 DISCRETE报表

In [12]:
#生成上一个会计期末到本会计期间的单季度报表，例如1999/2003年的REPORT_PERIOD为6.30的单季度报表表示(1.1-4.30)/（4.1-6.30）期间的流量，仅针对流量表适用
#df是输入的财务报表，column_list是需要季度化的数据
#返回值是季度化后的财务报表，3为一季度，6为二季度（而非半年报），9为三季度，12为四季度（而非年报）
def weight_adjust(x):#当期跟上一期（如果没有上一期则为年初）之间的距离为3/6/9/12月时，应该需要调整的权重为1,1/2,1/3,1/4
    x.weight_adjust=3/x.month_cover
    return x

def set_day_gap(x):
    if(pd.isna(x.report_gap)):
        x.day_gap=x.REPORT_PERIOD.month*30
    else:
        x.day_gap=x.report_gap.days
    return x

def create_discrete(df,column_list):
    '''
    生成离散的季度报表，column_list输入的是需要离散的数据
    '''
    df=df[(['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']+column_list)].copy()
    df['day_gap']=[np.nan]*df.shape[0] # 新建一列数据
    df['weight_adjust']=[np.nan]*df.shape[0] # 新建一列数据
    df.sort_values(['S_INFO_WINDCODE','REPORT_PERIOD'],inplace=True)
    df['year_temp'] = df['REPORT_PERIOD'].apply(lambda x: int(x.year))
    df['month_temp'] = df['REPORT_PERIOD'].apply(lambda x: int(x.month))

    df['REPORT_PERIOD_shift1']=df.groupby(['S_INFO_WINDCODE'])['REPORT_PERIOD'].shift(1)
    df['report_gap']=(df['REPORT_PERIOD']-df['REPORT_PERIOD_shift1']) # 距离上一期期末/年初的时间gap
    df=df.apply(lambda x:set_day_gap(x),axis=1) # 把report_gap转为相隔的天数
    df['month_cover']=df['day_gap'].apply(lambda x:int(x/30)) # 把相隔的天数转为相隔的月份
    df=df.apply(lambda x:weight_adjust(x),axis=1) # 比如某些股票的第一份财报是2008年的9月份，因此需要把这个流量值除以3才能得到当季度
    
    
    for column in column_list:
        df[(column+'_shift1')]=df.groupby(['S_INFO_WINDCODE','year_temp'])[column].shift(1,fill_value=0)
        df.rename({column:(column+'_continue')},axis=1,inplace=True)
        df[column]=df[(column+'_continue')]-df[(column+'_shift1')]
        df[column]=df[column]*df['weight_adjust']
    
    df=df[(['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']+column_list)].copy()
    return df

## 4.2 TTM
针对已经加工的DISCRETE进行计算

In [20]:
#这是ttmDiscrete调用的子函数
def ttmDiscrete_stk(report_df_stk, label_list):
    for label in label_list: # 标记这个label在这个观测期是否为空,从而调整month_cover
        report_df_stk[label+'_nonemptysignal']=report_df_stk[label].apply(lambda x:0 if np.isnan(x) else 1)

    for i in range(report_df_stk.shape[0]):

        year_temp=report_df_stk.year_temp.iloc[i]
        month_temp=report_df_stk.month_temp.iloc[i]
        
        
        #下面判断的是需要过去一年的数据需要多少张报表
        #1998年之前只有年报，因此只需要获取一张报表即可
        #1998-2002有半年报，因此需要获取两张报表
        #2002年的9月需要获取2002的三季报，2002的半年报，2002的一季报（不需要2001年的年报是因为该离散数据数据范围是2001.6-2001.12，已经超出一年
        if(year_temp<1998):
            report_num=1
        elif(year_temp<2002):
            report_num=2
        elif((year_temp==2002) and (month_temp<=6)):
            report_num=2
        elif((year_temp==2002) and (month_temp==9)):
            report_num=3
        else:
            report_num=4
        
        report_select=report_df_stk.iloc[max(0,i-report_num+1):(i+1)]
        
        for label in label_list:
            # TODO BUG 这里是不是应该用select？
            report_num=np.sum(report_select[label+'_nonemptysignal']) #计算过去一年可用的报表数量
            label_sum=report_select[label].sum()
            if(report_num!=0):
                report_df_stk[label+'_ttm'].iloc[i]=(label_sum/report_num) * 4
                #正常来说可用的有4个，但如果因为某些原因出现缺失值，则进行权重调整
            else:
                report_df_stk[label+'_ttm'].iloc[i]=np.nan
    return report_df_stk


#作用是获取ttm值，report_df是单季报表，label_list是需要计算ttm的所有列，如['','']
#返回值为['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']+label_list_ttm
def ttmDiscrete(report_df, label_list):
    label_list_ttm=[(label+'_ttm') for label in label_list]
    for label_ttm in label_list_ttm:
        report_df[label_ttm]=[np.nan]*report_df.shape[0]
    
    report_df=report_df[(['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']+label_list+label_list_ttm)].copy()
    report_df['year_temp'] = report_df['REPORT_PERIOD'].apply(lambda x: int(x.year))
    report_df['month_temp'] = report_df['REPORT_PERIOD'].apply(lambda x: int(x.month))
    report_df.sort_values(['S_INFO_WINDCODE','TRADE_DT'],inplace=True)
    
    report_df=report_df.groupby(['S_INFO_WINDCODE']).apply(lambda x:ttmDiscrete_stk(x,label_list))

    report_df=report_df[(['S_INFO_WINDCODE','REPORT_PERIOD','TRADE_DT']+label_list_ttm)].copy()
    return report_df