<a href="https://colab.research.google.com/github/dntlr4463/coding/blob/main/%ED%80%80%ED%8A%B8%20%EB%8D%B0%EC%9D%B4%ED%84%B0%20%EC%88%98%EC%A7%91%EB%AA%A8%EB%8D%B8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import gc
import traceback
import datetime
from collections import defaultdict
import numpy as np
import pandas as pd
from itertools import product


class PostQWStockMultifactors:
    """
        Post Module & Calculator for Multifactors Based on Quantiwise Data
    """
    def __init__(self, configs=None):
        """
            Initialization
        """
        self.configs = configs

        self.main_code = configs['main_code']
        self.tau = configs['tau']
        self.windows = configs['windows']
        self.available_stocks = configs['available_stocks']
        self.eps = configs['eps']
        self.pn = configs['pn']
        self.np = configs['np']
        self.nn = configs['nn']
        self.mf_table_name = configs['mf_table_name']

        self.max_window = max(self.windows)

        self.trade_dates = None
        self.codes = None
        self.items = None
        self.info_items = None
        self.mf_list = None
        self.columns = None
        self._load_dates()
        self._load_codes()
        self._load_qw_items()
        self._load_qw_information_items()
        self._load_mf_list()

        self.calculator = FactorCalculator(tau=self.tau,
                                        posneg=self.pn,
                                        negpos=self.np,
                                        negneg=self.nn)
       
    def post_multifactors_us(self, codes=None, target_date=None, length=20,
                            output=False):
        """
            Post Multifactors for us Stock Market Based on Quantiwise data

            Args:
                codes: 종목코드 리스트
                target_date: 적재 대상 날짜
                length: target_date 기준 과거 적재 대상 시계열 길이(default: 20)
                output: 데이터 반환 여부 결정(default: False)
            Return:
                results: dict. post 결과 요약
                    'total_codes_#': 적재된 종목 코드 수(unique),
                    'total_length_#': 적재된 데이터 row 길이,
                    'total_columns_#': 총 컬럼 갯수,
                    'start_date': 시작일,
                    'end_date': 종료일,
                    'start_date_pos': 총 시계열 중 시작 시점,
                    'end_date_pos': 총 시계열 중 종료 시점
        """
        try:
            if codes is None:
                codes = self.codes

            factors = self.calc_multifactors(codes, target_date, length)

            res_codes = factors['code'].unique()
            res_cols = factors.columns.shape[0]
            dates = factors['date'].unique()
            start_date = dates.min()
            end_date = dates.max()

            start_pos = self.trade_dates.index(start_date)
            end_pos = self.trade_dates.index(end_date)

            insert_query = self._make_insert_query(
                self.mf_table_name, factors.columns.values)

            #TODO: Nan 처리
            factors = factors.replace({np.nan: None})
            factors = factors.replace({None: np.nan})
            factors = factors.replace({np.nan: None}).values.tolist()
            factors = list(map(tuple, factors))

            DBExecuteManager(
                constants.DBCONN_TYPE_WRITE).set_commit(insert_query,
                                                        is_many=True,
                                                        rows=factors)

            results = {
                'total_codes_#': len(res_codes),
                'total_length_#': len(factors),
                'total_columns_#': res_cols,
                'start_date': start_date,
                'end_date': end_date,
                'start_date_pos': start_pos,
                'end_date_pos': end_pos
            }

            gc.collect()

            if output:
                return results, factors
            else:
                return results

        except Exception as err:
            error_comment =\
                "[ERROR] PostQWStockMultifactors - post_multifactors_us"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
            
    
    def _get_mktcaps(self, codes, start_date, end_date):
        """
            Load mktcap timeseries data
            Args:
                codes: 종목코드 리스트
                start_date: 데이터 시작 일자
                end_date: 데이터 마지막 일자
            Return:
                mktcaps: pd.DataFrame 시계열
        """
        item = "Mkt_Cap" # 시가총액
        mktcaps = self.get_qw_timeseries(codes, item, start_date, end_date)
        mktcaps = mktcaps.pivot_table(values='value', columns='code', index='date')

        return mktcaps
    
    
    def _calc_liquidity_factors(self, codes, start_date, end_date, length, interm_data):
        """
            Calculate liquidity factors
            Args:
                codes: 종목코드 리스트
                start_date: 데이터 시작 일자
                end_date: 데이터 마지막 일자
                length: target_date 기준 과거 적재 대상 시계열 길이(default: 20)
                interm_data: 중간 데이터 (이전 factor 연산에서 상속받는 값, dict) 
            Return:
                dict: 하위에 아래 정보를 포함한 dictionary
                    factors: pd.dataFrame 시계열 * (# rows X # columns)
                    dependencies: pd.dataFrame 다른 factor 연산 시 사용하는 중간 데이터
        """
        factors = defaultdict(None)
        dependencies = dict()
        internal_cache = dict()

        target_item_list = ['Turnover', 'Net_Purchase_Value_Individual', 'Net_Purchase_Value_Institutional', 'Net_Purchase_Value_Foreigner']
        mktcaps = interm_data['mktcaps']

        for item in target_item_list:
            values_df = self.get_qw_timeseries(codes, item, start_date, end_date)
            values_df = values_df.pivot_table(values='value', columns='code', index='date')
            data_type = item.split("_")[-1].lower()

            for w in self.windows:
                if item == 'Turnover':
                    is_caching = True
                    is_factor_origin_value = True
                    update_cache_key = f"turnover_{w}"
                    fname = update_cache_key
                    sub_type_list = ['mktcap']
                    factor_names = ["turnover_{sub_type}_{window}"]
                else:
                    is_caching = False
                    is_factor_origin_value = False
                    sub_type_list = ['turnover', 'mktcap']
                    factor_names = ["npv_{sub_type}_{data_type}_{window}"]

                mean_val = values_df.fillna(0).rolling(w).mean().iloc[-length:]
                if is_caching:
                    internal_cache[update_cache_key] = mean_val
                if is_factor_origin_value:
                    factors[fname] = mean_val

                for sub_type, factor_name in product(sub_type_list, factor_names):
                    cache_key = f"{sub_type}_{w}"
                    if cache_key in internal_cache.keys():
                        cached_val = internal_cache[cache_key]
                    elif sub_type == 'mktcap':
                        cached_val = mktcaps.fillna(0).rolling(w).mean().iloc[-length:]
                        internal_cache[cache_key] = cached_val
                    else:
                        raise ValueError(f"Not found value, sub_type: {sub_type}, window: {w}")

                    fname = factor_name.format(sub_type=sub_type, data_type=data_type, window=w)
                    factors[fname] = mean_val / cached_val

        rst = {'factor': factors, 'dependencies': dependencies}
        return rst
    
    
    def _calc_return_based_factors(self, codes, start_date, end_date, length):
        """
            Calculate return based factors
            Args:
                codes: 종목코드 리스트
                start_date: 데이터 시작 일자
                end_date: 데이터 마지막 일자
                length: target_date 기준 과거 적재 대상 시계열 길이(default: 20)
            Return:
                dict: 하위에 아래 정보를 포함한 dictionary
                    factors: pd.dataFrame 시계열 * (# rows X # columns)
                    dependencies: pd.dataFrame 다른 factor 연산 시 사용하는 중간 데이터
        """
        factors = defaultdict(None)
        dependencies = dict()

        returns = self.get_price_returns(codes, start_date=start_date, end_date=end_date, price=False)
        
        for w in self.windows:
            factors['volatility_'+str(w)] =\
                self.calculator.calc_volatility(returns, window=w).iloc[-length:]
            factors['skew_'+str(w)] =\
                self.calculator.calc_skew(returns, window=w).iloc[-length:]
            factors['lpm_'+str(w)] =\
                self.calculator.calc_lpm(returns, window=w).iloc[-length:]
            factors['pm_'+str(w)] =\
                self.calculator.calc_pm(returns, window=w, log=False).iloc[-length:]
            factors['rsi_'+str(w)] =\
                self.calculator.calc_rsi(returns, window=w).iloc[-length:]
            factors['vol_adj_pm_'+str(w)] =\
                self.calculator.calc_vol_adj_pm(returns, window=w).iloc[-length:]

        factors['pm_120_20'] =\
                self.calculator.calc_pm_reversal(returns, 120, 20).iloc[-length:]
        factors['pm_120_40'] =\
            self.calculator.calc_pm_reversal(returns, 120, 40).iloc[-length:]
        factors['pm_250_20'] =\
            self.calculator.calc_pm_reversal(returns, 250, 20).iloc[-length:]
        factors['pm_250_40'] =\
            self.calculator.calc_pm_reversal(returns, 250, 40).iloc[-length:]

        rst = {'factor': factors, 'dependencies': dependencies}
        return rst
    
    # 메소드 5
    def _calc_quality_factors(self, codes, start_date, end_date, length):
        """
            Calculate quality factors
            Args:
                codes: 종목코드 리스트
                start_date: 데이터 시작 일자
                end_date: 데이터 마지막 일자
                length: target_date 기준 과거 적재 대상 시계열 길이(default: 20)
            Return:
                dict: 하위에 아래 정보를 포함한 dictionary
                    factors: pd.dataFrame 시계열 * (# rows X # columns)
                    dependencies: pd.dataFrame 다른 factor 연산 시 사용하는 중간 데이터
        """
        factors = defaultdict(None)
        dependencies = dict()
        target_items = {'ROE_TTM':{'ffill': True}, 
                            'ROE_FY0':{'ffill': True}, 
                            'ROE_FY1':{'ffill': False}, 
                            'ROE_12MF':{'ffill': False}, 
                            'ROA_TTM':{'ffill': True}, 
                            'ROA_FY0':{'ffill': True}, 
                            'ROA_FY1':{'ffill': False}, 
                            'DE_Ratio':{'ffill': True}, 
                            'Net_Debt_Ratio':{'ffill': True}}

        for item, info in target_items.items():
            fname = item.lower()
            values_df = self.get_qw_timeseries(codes, item, start_date, end_date)
            if info['ffill']:
                values_df = values_df.pivot_table(values='value', columns='code', index='date').ffill().iloc[-length:]
            else:
                values_df = values_df.pivot_table(values='value', columns='code', index='date').iloc[-length:]

            factors[fname] = values_df / 100.

        rst = {'factor': factors, 'dependencies': dependencies}
        return rst

    # 메소드 6
    def _calc_value_factors(self, codes, start_date, end_date, length, interm_data):
        """
            Calculate value factors
            Args:
                codes: 종목코드 리스트
                start_date: 데이터 시작 일자
                end_date: 데이터 마지막 일자
                length: target_date 기준 과거 적재 대상 시계열 길이(default: 20)
                interm_data: 중간 데이터 (이전 factor 연산에서 상속받는 값, dict) 
            Return:
                dict: 하위에 아래 정보를 포함한 dictionary
                    factors: pd.dataFrame 시계열 * (# rows X # columns)
                    dependencies: pd.dataFrame 다른 factor 연산 시 사용하는 중간 데이터
        """
        factors = defaultdict(None)
        dependencies = dict()
        target_items = {'Sales_TTM':{'ffill': True, 'factor_name': 'sales_yield_ttm', 'value_dependency': True}, 
                        'Sales_FY0':{'ffill': True, 'factor_name': 'sales_yield_fy0', 'hist_dependency': True, 'value_dependency': True},
                        'Sales_FY1':{'ffill': False, 'factor_name': 'sales_yield_fy1', 'value_dependency': True}, 
                        'Sales_12MF':{'ffill': False, 'factor_name': 'sales_yield_12mf', 'hist_dependency': True, 'value_dependency': True},
                        'BV_TTM':{'ffill': True, 'factor_name': 'bp_ttm'}, 
                        'BV_FY0':{'ffill': True, 'factor_name': 'bp_fy0'}, 
                        'NP_TTM':{'ffill': True, 'factor_name': 'ep_ttm', 'value_dependency': True}, 
                        'NP_FY0':{'ffill': True, 'factor_name': 'ep_fy0', 'hist_dependency': True, 'value_dependency': True},
                        'NP_FY1':{'ffill': False, 'factor_name': 'ep_fy1', 'value_dependency': True},
                        'NP_12MF':{'ffill': False, 'factor_name': 'ep_12mf', 'hist_dependency': True, 'value_dependency': True},
                        'FCF_TTM':{'ffill': True, 'factor_name': 'fcf_yield_ttm'},
                        'FCF_FY0':{'ffill': True, 'factor_name': 'fcf_yield_fy0'},
                        'FCF_FY1':{'ffill': False, 'factor_name': 'fcf_yield_fy1'},
                        'FCF_12MF':{'ffill': False, 'factor_name': 'fcf_yield_12mf'}}

        mktcap = interm_data['mktcaps'].iloc[-length:]

        for item, info in target_items.items():
            fname = info['factor_name']
            values_df = self.get_qw_timeseries(codes, item, start_date, end_date)
            values_hist = values_df.pivot_table(values='value', columns='code', index='date')
            is_dependency_hist = info.get('hist_dependency', False)
            if is_dependency_hist:
                hist_dependency_key = f'{item.lower()}_hist'
                dependencies[hist_dependency_key] = values_hist
            
            if info['ffill']:
                values_df = values_hist.ffill().iloc[-length:]
            else:
                values_df = values_hist.iloc[-length:]

            is_val_dependency = info.get('value_dependency', False)
            if is_val_dependency:
                val_dependency_key = f'{item.lower()}'
                dependencies[val_dependency_key] = values_df

            factors[fname] = values_df / mktcap

        rst = {'factor': factors, 'dependencies': dependencies}
        return rst
    
    # 메소드 7
    def _calc_growth_factors(self, codes, start_date, end_date, length, interm_data):
        """
            Calculate growth factors
            Args:
                codes: 종목코드 리스트
                start_date: 데이터 시작 일자
                end_date: 데이터 마지막 일자
                length: target_date 기준 과거 적재 대상 시계열 길이(default: 20)
                interm_data: 중간 데이터 (이전 factor 연산에서 상속받는 값, dict) 
            Return:
                dict: 하위에 아래 정보를 포함한 dictionary
                    factors: pd.dataFrame 시계열 * (# rows X # columns)
                    dependencies: pd.dataFrame 다른 factor 연산 시 사용하는 중간 데이터
        """
        factors = defaultdict(None)
        dependencies = dict()

        # 중간 데이터 처리
        sales_fy0_hist = interm_data['sales_fy0_hist']
        sales_fy0 = interm_data['sales_fy0']
        sales_fy1 = interm_data['sales_fy1']
        sales_12mf = interm_data['sales_12mf_hist']
        s_12mf = interm_data['sales_12mf']
        sales_ttm = interm_data['sales_ttm']
        np_fy0_hist = interm_data['np_fy0_hist']
        np_fy0 = interm_data['np_fy0']
        np_fy1 = interm_data['np_fy1']
        n_12mf = interm_data['np_12mf']
        np_12mf = interm_data['np_12mf_hist']
        np_ttm = interm_data['np_ttm']

        sales_fy_1 = self.get_qw_timeseries(codes, 'Sales_FY_1', start_date, end_date)
        sales_fy_1 = sales_fy_1.pivot_table(values='value', columns='code', index='date')

        np_fy_1 = self.get_qw_timeseries(codes, 'NP_FY_1', start_date, end_date)
        np_fy_1 = np_fy_1.pivot_table(values='value', columns='code', index='date')

        factors['sales_growth_yoy'] = self.calculator.calc_growth(sales_fy0_hist, sales_fy_1).ffill().iloc[-length:]
        factors['sales_growth_fy1_fy0'] = self.calculator.calc_growth(sales_fy1, sales_fy0)
        factors['sales_growth_12mf_ttm'] = self.calculator.calc_growth(s_12mf, sales_ttm)

        s_12mf_s20 = sales_12mf.shift(20).iloc[-length:]
        factors['sales_growth_12mf_20d'] = self.calculator.calc_growth(s_12mf, s_12mf_s20)

        s_12mf_s60 = sales_12mf.shift(60).iloc[-length:]
        factors['sales_growth_12mf_60d'] = self.calculator.calc_growth(s_12mf, s_12mf_s60)

        factors['earnings_growth_yoy'] = self.calculator.calc_growth(np_fy0_hist, np_fy_1).ffill().iloc[-length:]
        factors['earnings_growth_fy1_fy0'] = self.calculator.calc_growth(np_fy1, np_fy0)
        factors['earnings_growth_12mf_ttm'] = self.calculator.calc_growth(n_12mf, np_ttm)

        n_12mf_s20 = np_12mf.shift(20).iloc[-length:]
        factors['earnings_growth_12mf_20d'] = self.calculator.calc_growth(n_12mf, n_12mf_s20)

        n_12mf_s60 = np_12mf.shift(60).iloc[-length:]
        factors['earnings_growth_12mf_60d'] = self.calculator.calc_growth(n_12mf, n_12mf_s60)

        rst = {'factor': factors, 'dependencies': dependencies}
        return rst

    # 메소드 8
    def _calc_yield_n_size_factors(self, codes, start_date, end_date, length, interm_data):
        """
            Calculate yield & size factors
            Args:
                codes: 종목코드 리스트
                start_date: 데이터 시작 일자
                end_date: 데이터 마지막 일자
                length: target_date 기준 과거 적재 대상 시계열 길이(default: 20)
                interm_data: 중간 데이터 (이전 factor 연산에서 상속받는 값, dict) 
            Return:
                dict: 하위에 아래 정보를 포함한 dictionary
                    factors: pd.dataFrame 시계열 * (# rows X # columns)
                    dependencies: pd.dataFrame 다른 factor 연산 시 사용하는 중간 데이터
        """
        factors = defaultdict(None)
        dependencies = dict()
        target_items = {'Dividend_TTM':{'factor_name':'dy_ttm'},
                        'Dividend_FY0':{'factor_name':'dy_fy0'}}

        mktcap = interm_data['mktcaps'].iloc[-length:]
        sales_fy0 = interm_data['sales_fy0']

        for item, info in target_items.items():
            fname = info['factor_name']
            values_df = self.get_qw_timeseries(codes, item, start_date, end_date)
            values_df = values_df.pivot_table(values='value', columns='code', index='date').ffill().iloc[-length:]

            factors[fname] = values_df / mktcap

        factors['log_mktcap'] = np.log(mktcap.where(mktcap > 0., np.nan))
        factors['log_sales'] = np.log(sales_fy0.where(sales_fy0 > 0., np.nan))

        rst = {'factor': factors, 'dependencies': dependencies}
        return rst
    
    # 메소드 9
    def calc_multifactors(self, codes, target_date=None, length=20,
                        set_cols=False):
        """
            Calculate multifactors
            Args:
                codes: 종목코드 리스트
                target_date: 적재 대상 날짜
                length: target_date 기준 과거 적재 대상 시계열 길이(default: 20)
                set_cols: DB table 컬럼 순서대로 정렬 여부
            Return:
                factors: pd.DataFrame 멀티팩터 시계열
                    * (# rows X # columns)
        """
        try:
            factors = defaultdict(None)
            interm_data = dict()
            target_date = self._convert_date(target_date, type='date')
            tdate_pos = self.trade_dates.index(target_date)

            start_point = tdate_pos - (length - 1) - (self.max_window - 1) - 60
            if start_point < 0:
                start_point = start_point + 60

            assert start_point >= 0

            start_date = self.trade_dates[start_point]

            mktcaps = self._get_mktcaps(codes, start_date, target_date)
            interm_data['mktcaps'] = mktcaps

            # Calculate liqudity factors
            liqudity_factors = self._calc_liquidity_factors(codes, start_date, target_date, length, interm_data)
            factors.update(liqudity_factors['factor'])
            interm_data.update(liqudity_factors['dependencies'])

            # Calculate return based factors
            return_based_factors = self._calc_return_based_factors(codes, start_date, target_date, length)
            factors.update(return_based_factors['factor'])
            interm_data.update(return_based_factors['dependencies'])

            # Calculate quality factors
            quality_factors = self._calc_quality_factors(codes, start_date, target_date, length)
            factors.update(quality_factors['factor'])
            interm_data.update(quality_factors['dependencies'])

            # Calculate Value factors
            value_factors = self._calc_value_factors(codes, start_date, target_date, length, interm_data)
            factors.update(value_factors['factor'])
            interm_data.update(value_factors['dependencies'])

            # Calculate growth factors
            growth_Factors = self._calc_growth_factors(codes, start_date, target_date, length, interm_data)
            factors.update(growth_Factors['factor'])
            interm_data.update(growth_Factors['dependencies'])

            # Calculate yield & size factors
            yield_n_size_factors = self._calc_yield_n_size_factors(codes, start_date, target_date, length, interm_data)
            factors.update(yield_n_size_factors['factor'])
            interm_data.update(yield_n_size_factors['dependencies'])

            factors = pd.concat(factors).unstack(-1).transpose()

            if set_cols:
                factors = factors[self.mf_list]

            # factors = factors[pd.notnull(factors['log_mktcap'])]
            factors = factors[pd.notnull(factors['log_sales'])]

            factors['available'] = pd.notnull(factors).sum(1)
            factors = factors[factors['available'] > 0]

            factors.reset_index(inplace=True)

            return factors[factors.columns[:-1]]

        except Exception as err:
            error_comment =\
                "[ERROR] PostQWStockMultifactors - calc_multifactors"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))

    # 메소드 10
    def get_qw_timeseries(self, codes, item, start_date=None, end_date=None):
        """
            Get stocky_base.quantiwise_timeseries data

            Args:
                codes: list or str
                start_date: datetime.date start_date
                end_date: datetime.date end_date
        """
        try:
            if isinstance(codes, str):
                codes = [codes]

            start_date = self._convert_date(start_date, type='str')
            end_date = self._convert_date(end_date, type='str')
            item_info = self.items[self.items['item'] == item]
            if item_info.empty:
                raise ValueError(f"Invalid item, item: {item}")

            query = f"""
                        select
                            `code`,`name`,`item_code`,`date`,`base_date`,
                            `val_type`,`value`
                        from
                            stocky_base.quantiwise_timeseries
                        where
                            `code` in ('{"','".join(codes)}')
                            and
                            `date` between
                            '{start_date}' and '{end_date}'
                            and
                            `item_code` = '{item_info.item_code.values[0]}'
                            and
                            `base_date` = '{item_info.base_date.values[0]}'
                    """
            results = DBExecuteManager(constants.DBCONN_TYPE_READ).get_fetchall(query)

            results = pd.DataFrame(results)
            results = results.sort_values(['code', 'date']) # result는 code와 date 만으로 이루어진 dataframe 을 형성

            return results

        except Exception as err:
            error_comment =\
                "[ERROR] PostQWStockMultifactors - get_qw_timeseries"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
          
    # 메소드 11
    def get_price_returns(self, codes, start_date=None, end_date=None, price=False):
        """
            Get Rate of Returns

            Args:
                codes: list or str
                start_date: datetime.date start_date
                end_date: datetime.date end_date
                price: bool
                    * True: return price series
                    * False: return returns only
                    * default: False
        """
        try:
            if isinstance(codes, str):
                codes = [codes]

            start_date = self._convert_date(start_date, type='str')
            end_date = self._convert_date(end_date, type='str')

            if price:
                cols = ['code', 'date', 'value', 'return']
            else:
                cols = ['code', 'date', 'return']

            query = f"""
                        select
                            `{"`,`".join(cols)}`
                        from
                            stocky_base.quantiwise_timeseries_closing_price
                        where
                            `code` in ('{"','".join(codes)}')
                            and
                            `date` between
                            '{start_date}' and '{end_date}'
                    """
            results = DBExecuteManager(constants.DBCONN_TYPE_READ).get_fetchall(query)

            results = pd.DataFrame(results)
            returns = pd.pivot_table(results, columns='code',
                                    index='date', values='return') / 100.

            if price:
                price = pd.pivot_table(results, columns='code',
                                    index='date', values='value')

                return returns, price

            return returns

        except Exception as err:
            error_comment =\
                "[ERROR] PostQWStockMultifactors - get_price"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
            
    # 메소드 12
    def get_qw_information_series(self, codes, start_date=None, end_date=None):
        """
            Get stocky_base.quantiwise_timeseries_information data

            Args:
                codes: list or str
                start_date: datetime.date start_date
                end_date: datetime.date end_date
        """
        try:
            if isinstance(codes, str):
                codes = [codes]

            start_date = self._convert_date(start_date, type='str')
            end_date = self._convert_date(end_date, type='str')

            query = f"""
                        select
                            `code`,`item_code`,`date`,`base_date`,
                            `name`,`val_type`,`value`
                        from
                            stocky_base.quantiwise_timeseries_information
                        where
                            `code` in ('{"','".join(codes)}')
                            and
                            `date` between
                            '{start_date}' and '{end_date}'
                    """
            results = DBExecuteManager(constants.DBCONN_TYPE_READ).get_fetchall(query)

            results = pd.DataFrame(results)

            results = results.sort_values(['code', 'item_code', 'date'])

            return results

        except Exception as err:
            error_comment =\
                "[ERROR] PostQWStockMultifactors - get_qw_information_series"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
            
    # 메소드 13
    def _load_dates(self):
        """
            load trading dates(US Market)
        """
        try:
            query = f"""
                        select
                            date
                        from
                            stocky_base.quantiwise_timeseries_closing_price
                        where
                            code = '{self.main_code}'
                    """

            trade_dates =\
                DBExecuteManager(constants.DBCONN_TYPE_READ).get_fetchall(query)
            trade_dates = pd.DataFrame(trade_dates)
            trade_dates = trade_dates.values.ravel().tolist()

            self.trade_dates = trade_dates
        except Exception as err:
            error_comment =\
                "[ERROR] PostQWStockMultifactors - _load_dates"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
    
    # 메소드 14
    def _load_codes(self):
        """
            load stock codes(US Market)
        """
        try:
            query = """
                        select
                            distinct code
                        from
                            stocky_base.quantiwise_company
                        where
                            length(code) = 7
                    """
            codes =\
                DBExecuteManager(constants.DBCONN_TYPE_READ).get_fetchall(query)
            codes = pd.DataFrame(codes)
            codes = codes.values.ravel().tolist()

            self.codes = codes
        except Exception as err:
            error_comment =\
                "[ERROR] PostQWStockMultifactors - _load_codes"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
            
    # 메소드 15
    def _load_qw_items(self):
        """
            Load Quantiwise Financial Items
        """
        try:
            query = """
                        select
                            `item`,`item_code`,`item_name`,`unit`, IFNULL(`base_date`, 'CPD') as `base_date`
                        from
                            stocky_base.lu_quantiwise_items
                        where
                            `type` = 'timeseries_company' and flag = 1; # table에 flag 존재?
                    """
            items = DBExecuteManager(constants.DBCONN_TYPE_READ).get_fetchall(query)
            items = pd.DataFrame(items)

            self.items = items
        except Exception as err:
            error_comment =\
                "[ERROR] PostQWStockMultifactors - _load_qw_items"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
            
    # 메소드 16
    def _load_qw_information_items(self):
        """
            Load Quantiwise Information Items
        """
        try:
            query = """
                        select
                            `item`,`item_code`,`item_name`,`unit`
                        from
                            stocky_base.lu_quantiwise_items
                        where
                            `type` = 'timeseries_information'
                    """
            items = DBExecuteManager(constants.DBCONN_TYPE_READ).get_fetchall(query)
            items = pd.DataFrame(items)

            self.info_items = items
        except Exception as err:
            error_comment =\
                "[ERROR] PostQWStockMultifactors - _load_qw_information_items"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
    
    # 메소드 17
    def _convert_date(self, date, type='date'):
        if isinstance(date, datetime.date) and type == 'str':
            date = datetime.datetime.strftime(date, '%Y-%m-%d')
        elif isinstance(date, str) and type == 'date':
            date = datetime.datetime.strptime(date, "%Y-%m-%d").date()

        return date

    # 메소드 18
    def _load_mf_list(self):
        """
            Load Multifactor Item List
        """
        try:
            query = f"""
                        show columns from {self.mf_table_name}
                    """
            mf_list = DBExecuteManager(constants.DBCONN_TYPE_READ).get_fetchall(query)
            mf_list = pd.DataFrame(mf_list).values[:, 0].ravel()

            self.mf_list = mf_list[2:]
            self.columns = mf_list
        except Exception as err:
            error_comment =\
                "[ERROR] PostQWStockMultifactors - _load_mf_list"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
            
    # 메소드 19
    def _make_insert_query(self, table, key_list):
        """
            kb_common 활용 데이터 truncated insert query 만드는 함수
            Args:
                table: str. 'db_name.table_name'
                key_list: list. key list
            Returns:
                insert_query: str. insert query
        """
        str_key_column = "(" + ", ".join([str(key)
                                        for key in key_list]) + ")"
        str_values = "(" + ", ".join(["%s"] * len(key_list)) + ")"

        update_cond_skeleton = "{key} = values({key})"
        str_after_update = ", ".join([update_cond_skeleton.format(key=key)
                                    for key in key_list])

        insert_query = """
                        insert into
                            {table} {str_key_column}
                        values
                            {str_values}
                        on duplicate key update
                            {update_cond_skeleton}
                    """.format(table=table,
                            str_key_column=str_key_column,
                            str_values=str_values,
                            update_cond_skeleton=str_after_update)

        return insert_query # kb table insert

    
class FactorCalculator:
    """
        Class for Factor Calculation
    """
    def __init__(self, tau=0., posneg=None, negpos=None, negneg=None):
        self.tau = tau

        self.pn = posneg
        self.np = negpos
        self.nn = negneg

    def calc_volatility(self, returns, window=5):
        """
            calculate volatility of stock returns

            Args:
                returns: pd.DataFrame (trade_dates X stocks)
                    * 종목 별 수익률 시계열
                window: moving window
            Return:
                volatility: pd.DataFrame (trade_dates X stocks)
        """
        try:
            volatility = returns.rolling(window, min_periods=window).std()

            return volatility
        except Exception as err:
            error_comment =\
                "[ERROR] FactorCalculator - calc_volatility"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))

    def calc_skew(self, returns, window=5):
        """
            calculate skewness of stock returns

            Args:
                returns: pd.DataFrame (trade_dates X stocks)
                    * 종목 별 수익률 시계열
                window: moving window
            Return:
                skew: pd.DataFrame (trade_dates X stocks)
        """
        try:
            skew = returns.rolling(window, min_periods=window).skew()

            return skew
        except Exception as err:
            error_comment =\
                "[ERROR] FactorCalculator - calc_skew"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
            
    def calc_lpm(self, returns, window=5):
        """
            calculate lower partial moment

            Args:
                returns: pd.DataFrame (trade_dates X stocks)
                    * 종목 별 수익률 시계열
                window: moving window
            Return:
                lpm: pd.DataFrame (trade_dates X stocks)
        """
        try:
            lpm = self.tau - returns
            lpm = lpm.where(lpm > 0, 0).rolling(window, min_periods=window).mean()

            return lpm
        except Exception as err:
            error_comment =\
                "[ERROR] FactorCalculator - calc_lpm"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
            
    def calc_pm(self, returns, window=5, log=False):
        """
            calculate price momentum

            Args:
                returns: pd.DataFrame (trade_dates X stocks)
                    * 종목 별 수익률 시계열
                window: moving window
                log: 로그수익률 여부(default: False)
            Return:
                pm: pd.DataFrame (trade_dates X stocks)
        """
        try:
            if not log:
                returns = np.log(returns + 1.)

            pm = returns.rolling(window, min_periods=window).sum()
            pm = np.exp(pm) - 1.

            return pm
        except Exception as err:
            error_comment =\
                "[ERROR] FactorCalculator - calc_pm"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))
            
    def calc_rsi(self, returns, window=5):
        """
            calculate Relative Strength Index

            Args:
                returns: pd.DataFrame (trade_dates X stocks)
                    * 종목 별 수익률 시계열
                window: moving window
            Return:
                rsi: pd.DataFrame (trade_dates X stocks)
        """
        try:
            au = (returns > 0).astype(float).rolling(window, min_periods=window).mean()
            ad = (returns <= 0).astype(float).rolling(window, min_periods=window).mean()

            rsi = au / (au + ad)
            return rsi
        except Exception as err:
            error_comment =\
                "[ERROR] FactorCalculator - calc_rsi"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))

    def calc_pm_reversal(self, returns, window=250, rec_window=20):
        """
            calculate price momentum with reversal

            Args:
                returns: pd.DataFrame (trade_dates X stocks)
                    * 종목 별 수익률 시계열
                window: moving window
                rec_window: 계산에서 제외되어야 할 최근 window
            Return:
                pmr: pd.DataFrame (trade_dates X stocks)
        """
        try:
            pm = self.calc_pm(returns, window=window-rec_window)
            pmr = pm.shift(rec_window)
            return pmr
        except Exception as err:
            error_comment =\
                "[ERROR] FactorCalculator - calc_pm_reversal"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))

    def calc_vol_adj_pm(self, returns, window=5, eps=1e-6):
        """
            calculate volatility adjusted price momentum

            Args:
                returns: pd.DataFrame (trade_dates X stocks)
                    * 종목 별 수익률 시계열
                window: moving window
            Return:
                vpm: pd.DataFrame (trade_dates X stocks)
        """
        try:
            pm = self.calc_pm(returns, window=window)
            vol = self.calc_volatility(returns, window=window)

            vpm = pm / (vol + eps)
            return vpm
        except Exception as err:
            error_comment =\
                "[ERROR] FactorCalculator - calc_vol_adj_pm"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))

    def calc_growth(self, numerator, denominator):
        """
            calculate growth

            Args:
                numerator: pd.DataFrame (trade_dates X stocks)
                    * 분모
                denominator: pd.DataFrame (trade_dates X stocks)
                    * 분자
            Return:
                growth: pd.DataFrame (trade_dates X stocks)
                    * 분모 / 분자 - 1
        """
        try:
            nu_pos = (numerator > 0).astype(float)
            de_pos = (denominator > 0).astype(float)
            nu_neg = (numerator <= 0).astype(float)
            de_neg = (denominator <= 0).astype(float)

            growth = (nu_pos * de_pos * ((numerator / denominator) - 1.)) +\
                (nu_pos * de_neg * self.pn) +\
                (nu_neg * de_pos * self.np) +\
                (nu_neg * de_neg * self.nn)

            return growth
        except Exception as err:
            error_comment =\
                "[ERROR] FactorCalculator - calc_growth"
            error_message = traceback.format_exc()
            raise Exception("{} / {} / {}".format(str(err),
                error_comment, error_message))

In [None]:
import numpy as np

QWMF_CALC_CONFIG = {
    'main_code': 'XOM-US',# 종목코드
    'windows': [5, 10, 20, 40, 60, 90, 120, 250],# T일
    'tau': 0.,
    'available_stocks': None,
    'eps': 1e-6,
    'pn': -100000000,
    'np': -200000000,
    'nn': -300000000,
    'mf_table_name': 'stocky_analysis.multifactors_us'
}

QWMF_SCORE_CONFIG = {
    'methods': ['adj_zscore', 'min_max', 'percentile', 'zscore'],
    'score_table_name_format': 'stocky_analysis.multifactor_scores_us_{method}',
    'factor_definition_table_name': 'stocky_analysis.lu_us_stock_market_factor_definition'
}

In [None]:
#모듈시작
PQWL=PostQWStockMultifactors(QWMF_CALC_CONFIG)

NameError: ignored

In [None]:
#적재모듈
for i in range(0,10080):
  try:
    PQWL.post_multifactors_us(PQWL._load_codes()[i], '2023-01-31', 4710) == True

  except:
    pass

In [None]:
import numpy as np
import pandas as pd
from functools import lru_cache


class PostStockMultifactorsDailyScore:
    """
        Post Module Daily Multifactor Scores
    """
    def __init__(self, configs):
        self.configs = configs
        self.mf_table_name = configs['mf_table_name']
        self.score_table_format = configs['score_table_name_format']
        self.factor_def_table_name = configs['factor_definition_table_name']
        self.pn = configs['pn']
        self.np = configs['np']
        self.nn = configs['nn']
        self.method_list = configs['methods']

        self.factor_sign = self._get_multifactor_sign()

    @lru_cache(maxsize=10)
    def _get_multifactors(self, date, codes):
        """
            multifactor 데이터 조회 메소드

        Args:
            date (str): 조회일자
            codes (list, optional): 조회 대상 기업 목록. Defaults to None.

        Returns:
            pd.DataFrame: multifactor dataframe
        """

        cond_codes = ""
        if codes:
            cond_codes = f"AND codes in {tuple(codes)}"

        query = f"""
                SELECT  *
                  FROM  {self.mf_table_name}
                 WHERE  date = '{date}' {cond_codes}
                 """

        rows = DBExecuteManager(constants.DBCONN_TYPE_READ).get_fetchall(query)        
        mf = pd.DataFrame(rows)

        return mf

    def _get_multifactor_sign(self):
        """
            factor definition 기준 정보 조회 메소드

        Returns:
            list of dict: factor definition 목록
        """        
        query = f""" 
                SELECT factor_group, factor, `order` 
                  FROM {self.factor_def_table_name};
                """
        rows = DBExecuteManager(constants.DBCONN_TYPE_READ).get_fetchall(query)
        sign = pd.DataFrame(rows)

        return sign
    
    def _perform_adj_zscore(self, data):
        """
            normalize 연산 메소드 (adjusted zscore)

        Args:
            data (pd.DataFrame): multifactor dataframe

        Returns:
            pd.DataFrame: normalized multifactor dataframe
        """

        for col in data.columns:
            if col in ('code', 'date'):
                continue
            if self.factor_sign[self.factor_sign.factor == col]['order'].item() == 0:
                sign = 1
            else:
                sign = -1
            data[col] = sign * (data[col] - data[col].mean())/data[col].std()
            data.loc[data[col]>=0, col] = data.loc[data[col]>=0, col]+1
            data.loc[data[col]<0, col] = 1/(1-data.loc[data[col]<0, col])
        
        return data

    def _perform_min_max(self, data):
        """
            normalize 연산 메소드 (min-max)

        Args:
            data (pd.DataFrame): multifactor dataframe

        Returns:
            pd.DataFrame: normalized multifactor dataframe
        """

        for col in data.columns:
            if col in ('code', 'date'):
                continue
            if self.factor_sign[self.factor_sign.factor == col]['order'].item() == 0:
                data[col] = (data[col] - data[col].min())/(data[col].max()-data[col].min()+1e-6)
            else:
                data[col] = 1- (data[col] - data[col].min())/(data[col].max()-data[col].min()+1e-6)

        return data
    
    def _perform_percentile(self, data):
        """
            normalize 연산 메소드 (percentile)

        Args:
            data (pd.DataFrame): multifactor dataframe

        Returns:
            pd.DataFrame: normalized multifactor dataframe
        """

        for col in data.columns:
            if col in ('code', 'date'):
                continue
            if self.factor_sign[self.factor_sign.factor == col]['order'].item() == 0:
                data[col] = data[col].rank(pct = True)
            else:
                data[col] = 1- data[col].rank(pct = True)

        return data

    def _perform_zscore(self, data):
        """
            normalize 연산 메소드 (zscore)

        Args:
            data (pd.DataFrame): multifactor dataframe

        Returns:
            pd.DataFrame: normalized multifactor dataframe
        """

        for col in data.columns:
            if col in ('code', 'date'):
                continue
            if self.factor_sign[self.factor_sign.factor == col]['order'].item() == 0:
                sign = 1
            else:
                sign = -1
            data[col] = sign * (data[col] - data[col].mean())/data[col].std()

        return data
        
    def _calculate_daily_score(self, factor_score, method):
        """
            multifactor 데이터를 normalized 연산하는 메소드  \n
            해당 메소드에서 각 normalize 메소드를 호출 \n
            메소드명 포맷: _perform_{normalize method name}

        Args:
            factor_score (pd.DataFrame): multifactor dataframe
            method (str): normalize 메소드

        Returns:
            pd.DataFrame: normalized multifactor dataframe
        """

        data = factor_score.copy()
        func_name = f"_perform_{method}"
        func = getattr(self, func_name)
        data = func(data)

        return data
    
    def _fill_data(self, data, method):
        """
            NULL 데이터 처리 메소드

        Args:
            data (pd.DataFrame): normalized multifactor dataframe
            method (str): fill 메소드

        Raises:
            ValueError: 잘못된 fill method를 사용한 경우

        Returns:
            pd.DataFrame: NULL 처리가 완료된 multifactor score 데이터
        """
        
        if method == 'min_value':
            for col in data.columns:
                if col in ('code', 'date','score_type'):
                    continue
                if self.factor_sign[self.factor_sign.factor == col]['order'].item() == 0:
                    val = data[col].min()
                else:
                    val = data[col].max()
                data.loc[data[col].isnull(), col] = val
                data.loc[data[col].isin([self.pn, self.np, self.nn]), col] = val
        
        elif method == 'minus1':
            for col in data.columns:
                if col in ('code', 'date','score_type'):
                    continue
                if self.factor_sign[self.factor_sign.factor == col]['order'].item() == 0:
                    val = data[col].min()
                else:
                    val = data[col].max()
                if val >=0 or val<0:
                    val = -1
                data.loc[data[col].isnull(), col] = val
                data.loc[data[col].isin([self.pn, self.np, self.nn]), col] = val
        
        else:
            raise KeyError(f"Invalid fill method: {method}")

        return data
    
    def _insert_score(self, method, scores):
        """
            연산 결과를 저장하는 메소드

        Args:
            method (str): normalize 메소드
            scores (pd.DataFrame): noremalized multifactor dataframe
        """        

        table_name = self.score_table_format.format(method=method)
        columns = tuple(scores.columns)
        str_columns = str(columns).replace('\'','')
        str_values = ", ".join(["%s"] * len(columns))
        str_upsert_cond = ", ".join(f"{col} = values({col})" for col in columns)

        query = f"""
                INSERT INTO {table_name} 
                    {str_columns} 
                VALUES ({str_values})
                ON DUPLICATE KEY UPDATE {str_upsert_cond}
                """

        rows = list()
        for _, row in scores.iterrows():
            row['date'] = str(row['date'])
            row = row.replace({np.nan:None})
            rows.append(tuple(row))

        DBExecuteManager(constants.DBCONN_TYPE_WRITE).set_commit(query, is_many = True, rows = rows)
        
    def post_score(self, date, method, codes = None):
        """
            multifactor 데이터의 값을 normalize하는 메소드

        Args:
            date (str): multifactor 데이터 조회 일자
            method (str): normalize 방법
            codes (list, optional): 조회 대상 기업 목록. Defaults to None.

        Raises:
            KeyError: 잘못된 method가 설정된 경우
            ValueError: 입력한 날짜(date)의 multifactor 데이터가 없는 경우

        Returns:
            dict: 연산 결과 정보
        """     

        method = method.lower()
        if method not in self.method_list:
            raise KeyError(f'Invalid scoring method: {method}, available method list: {self.method_list}')

        fill_method = 'min_value'
        if method == 'min_max':
            fill_method = 'minus1'

        multifactors = self._get_multifactors(date, codes)
        if multifactors.empty:
            raise ValueError(f'Not found multifactor data, date: {date}')

        scores = self._calculate_daily_score(multifactors, method = method)
        scores = self._fill_data(scores, method = fill_method)
        self._insert_score(method, scores)

        rst = {'date': date, 'method': method, 'fill': fill_method, 'count': len(scores)}

        return rst

     def filtering_score(self):
      query = 
          """
              select *
              from stocky_test.multifactor_global_ubs
          """
      rows = #SQL연결
      rows = pd.DataFrame(rows)

      code_date = rows.groupby('code')['date'].min().reset_index()
      codes = code_date[code_date['date'] == datetime.date(2019,1,2)]['code'].unique().tolist()
      data = rows[rows['code'].isin(codes)]

      x = data.loc[data.isnull().sum(1)>0]['code'].unique().tolist()
      aaa = pd.DataFrame(data[data['code']=='UBS-US'].isnull().sum())
      aaa.columns = ['is']
      data = data.loc[:,~data.columns.isin(aaa[aaa['is']>0].index.tolist())]
      x = data.loc[data.isnull().sum(1)>0]
      final = data[~data.isin(x)]

      return final 

In [None]:
# UBS 업데이트 적재모듈
df = PQWL.calc_multifactors('UBS-US', '2023-05-17', length=1)
pymysql.install_as_MySQLdb()
engine = create_engine("mysql+mysqldb://root:password@localhost:3306/qw_test", encoding='utf-8')
conn = engine.connect()
df.to_sql(name='multifactors_global_ubs', con=engine, if_exists='append', index=False)

In [None]:
# 필터링 모듈 시작
PSMF = PostStockMultifactorsDailyScore(PS)

In [None]:
# first_data
first_data = PSMF.filtering_score()

In [None]:
# MinMax Normalization
rst = None
for d in first_data['date'].unique().tolist():
  tmp = first_data[first_data['date']==d]
  mm_tmp = PSMF._perform_min_max(tmp)

  if rst is not None:
    rst = pd.concat((rst, mm_tmp))

   else:
    rst = mm_tmp 

In [None]:
# Ranking
rst = rst['date'].between(datetime.date(2022,11,30), datetime.date(2023,5,15))
rst_column = rst.loc[:, ['code', 'date']]
rst_data = rst.groupby('date').rank(method='max', ascending=False, na_option = 'keep')
mm_scores_timeseries = pd.concat([rst_column, rst_data], axis = 1, ignore_index = False)
df = mm_scores_timeseries.sort_values('date', ascending=True)
re_mm_score = df.reset_index(drop=True)
re_mm_score