In [110]:
import pandas as pd
import numpy as np
import os 
from datetime import datetime, timedelta
import re
from pandas.tseries.offsets import MonthEnd
from pandas.tseries.offsets import DateOffset


기본 메서드

In [5]:
def get_today(form='%Y-%m-%d'):
    mapping = {
        '%Y%m%d': datetime.now().strftime("%Y%m%d"),
        'yyyymmdd': datetime.now().strftime("%Y%m%d"),
        '%Y-%m-%d': datetime.now().strftime("%Y-%m-%d"),
        'yyyy-mm-dd': datetime.now().strftime("%Y-%m-%d"),
        'datetime': datetime.now(),
        '%Y%m%d%H': datetime.now().strftime("%Y%m%d%H"),
    }
    today = mapping[form]
    return today 

def scan_files_including_regex(file_folder, regex, option='name'):
    with os.scandir(file_folder) as files:
        lst = [file.name for file in files if re.findall(regex, file.name)]
    
    mapping = {
        'name': lst,
        'path': [os.path.join(file_folder, file_name) for file_name in lst]
    }
    return mapping[option]

def format_date(date):
    date = date.replace('-', '')
    date = datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d')
    return date

def save_df_to_file(df, file_folder, subject, file_memo, file_code,input_date, include_index=False, file_extension='.csv', archive=False, archive_folder='./archive'):
    def get_today(form='%Y%m%d'):
        return datetime.now().strftime(form)
    try:
#         new_folder_path = os.path.join(file_folder, new_folder_name)
#         os.makedirs(new_folder_path, exist_ok=True)
        save_time = get_today()
        file_name = f'dataset-{subject}-{file_memo}-code{file_code}-date{input_date}-save{save_time}{file_extension}'
        file_path = os.path.join(file_folder, file_name)
        if os.path.exists(file_path) and archive:
            df_archive = pd.read_csv(file_path)
            os.makedirs(archive_folder, exist_ok=True)
            archive_file_name = 'archive-' + file_name
            archive_file_path = os.path.join(archive_folder, archive_file_name)
            df_archive.to_csv(archive_file_path, index=False)
            print(f'Archived: {archive_file_path}')
        df.to_csv(file_path, index=include_index, encoding='utf-8-sig')
        print(f'Saved: {file_path}')
    except Exception as e:
        print(f"Error: {e}")

In [155]:
class M8186:
    def __init__(self, fund_code, start_date =None, end_date = None, menu_code = '8186'):
        self.fund_code = fund_code
        self.menu_code = menu_code
        self.start_date = start_date
        self.end_date = end_date
        self.df = None  # 데이터프레임을 위한 초기화



    def open_df_raw(self):
        lst = scan_files_including_regex(file_folder = './캡스톤데이터2', regex = f'menu{self.menu_code}-code{self.fund_code}')
        lst = sorted(lst, reverse = True)
        file_path = lst[0]
        full_path = os.path.join(os.getcwd(), '캡스톤데이터2', file_path)
        df = pd.read_csv(full_path)
        return df
    
    def get_df_ref(self):
        self.df = self.open_df_raw()
        self.df = self.df[['일자','KOSPI지수', 'KOSDAQ지수', 'KOSPI200지수', '수정기준가']]

        if self.start_date is None:
            self.start_date = self.df['일자'].min()

        if self.end_date is None:
            self.end_date = self.df['일자'].max()

        return self.df

    def open_df_SPX_index_raw(self):
        # 'dataset-index' 폴더에서 'dataset-price-' 패턴을 포함하는 파일 목록을 가져옴
        lst = scan_files_including_regex('./dataset-index', 'dataset-price-')
        lst = sorted(lst, reverse = True)
        file_path = lst[0]
        full_path = os.path.join(os.getcwd(), 'dataset-index', file_path)
        df = pd.read_csv(full_path)

        df['SPX INDEX'] = pd.to_numeric(df['SPX INDEX'], errors='coerce')
        df = df.dropna(subset=['SPX INDEX']).reset_index(drop=True)
        df.rename(columns={'SPX INDEX': index_name.lower(), 'ticker': '일자'}, inplace=True)
        return df



    def fill_zero_with_previous(self):
        columns = ['수정기준가', 'KOSPI지수', 'KOSPI200지수', 'KOSDAQ지수']
        for column in columns:
            self.df[column] = self.df[column].replace(0, None)
            self.df[column] = self.df[column].ffill()
        return self.df

    def convert_to_float(self):
        columns = ['수정기준가', 'KOSPI지수', 'KOSPI200지수', 'KOSDAQ지수']
        for column in columns:
            self.df[column] = self.df[column].apply(lambda x: float(x.replace(',', '' )) if isinstance (x,str) else x)
        return self.df 
            
    def filter_by_date_range(self):
        # '일자' 컬럼을 datetime 타입으로 변환
        self.df['일자'] = pd.to_datetime(self.df['일자'])

        # start_date와 end_date를 기준으로 데이터 필터링
        self.df = self.df[(self.df['일자'] >= self.start_date) & (self.df['일자'] <= self.end_date)]
        return self.df

    # def calculate_cumulative_return(self):
    #     columns = ['수정기준가', 'KOSPI지수', 'KOSPI200지수', 'KOSDAQ지수']
    #     for column_name in columns:
    #         initial_value = self.df[column_name].iloc[0]
    #         self.df[column_name + ' (%)'] = ((self.df[column_name] - initial_value) / initial_value) * 100
    #         # 첫 번째 행의 수익률을 0으로 설정 (시작점이므로)
    #         self.df.loc[self.df.index[0], column_name + ' (%)'] = 0
    #     return self.df

    def calculate_cumulative_return_for_df(self, df):
        df = df.copy()  # 명시적으로 데이터프레임 복사본 생성
        columns = ['수정기준가', 'KOSPI지수', 'KOSPI200지수', 'KOSDAQ지수']
        for column_name in columns:
            if column_name in df.columns:
                initial_value = df[column_name].iloc[0]
                updated_values = ((df[column_name] - initial_value) / initial_value) * 100
                updated_values.iloc[0] = 0  # 첫 번째 행의 수익률을 0으로 설정
                df.loc[:, column_name + ' (%)'] = updated_values
        return df



    def filter_for_period(self, months):
        if months is not None:
            # 현재 가장 최근 날짜를 구함
            df_end_date = self.df['일자'].max()

            # 지정된 개월 수만큼 과거 날짜를 계산
            period_start_date = df_end_date - DateOffset(months=months)

            # period_start_date보다 이전 데이터를 필터링
            filtered_df = self.df[self.df['일자'] >= period_start_date]

            return filtered_df
        else:
            # months가 None이면 전체 데이터프레임 반환
            return self.df


    def generate_period_df(self):
        # self.df의 최대 및 최소 날짜 찾기
        df_start_date = self.df['일자'].min()
        df_end_date = self.df['일자'].max()

        # 가능한 모든 기간을 검사하여 default_periods 설정
        potential_periods = [1, 3, 6, 12, 24, 36, 48, 60]
        default_periods = []

        for period in potential_periods:
            period_start_date = df_end_date - DateOffset(months=period)
            if period_start_date >= df_start_date:
                default_periods.append(period)

        period_dfs = {}  # 각 기간에 해당하는 데이터프레임을 저장할 딕셔너리

        # 각 기간에 대한 데이터프레임 생성
        for period in default_periods:
            period_dfs[f"{period}m"] = self.filter_for_period(period)

        # YTD 데이터프레임 생성
        # 현재 연도 필터링
        current_year = pd.Timestamp.now().year
        current_year_df = self.df[self.df['일자'].dt.year == current_year]

        # 현재 연도 데이터가 1월 1일부터 시작하는지 확인
        if current_year_df['일자'].min() == pd.Timestamp(year=current_year, month=1, day=1):
            period_dfs['YTD'] = current_year_df

        return period_dfs

    def get_final_cumulative_returns(self, period_dfs):
        final_returns_data = []

        for period, df in period_dfs.items():
            last_row = df.iloc[-1]
            row_data = {
                '기간': period,
                '수정기준가': last_row.get('수정기준가 (%)', None),
                'KOSPI': last_row.get('KOSPI지수 (%)', None),
                'KOSPI200': last_row.get('KOSPI200지수 (%)', None),
                'KOSDAQ': last_row.get('KOSDAQ지수 (%)', None)
            }
            final_returns_data.append(row_data)

        # 데이터를 기반으로 새로운 데이터프레임 생성
        final_returns_df = pd.DataFrame(final_returns_data)
        final_returns_df.set_index('기간', inplace=True)

        return final_returns_df
    
    def process_period_dfs(self):
        # 각 기간별 데이터프레임을 생성
        period_dfs = self.generate_period_df()

        # 각 데이터프레임에 대해 누적 수익률 계산
        for period, df in period_dfs.items():
            period_dfs[period] = self.calculate_cumulative_return_for_df(df)

        # 전체 기간에 대한 누적수익률 추가
        period_dfs['전체 기간'] = self.calculate_cumulative_return_for_df(self.df)
        
        # 각 기간별 누적수익률의 마지막 값으로 구성된 데이터프레임을 반환
        final_returns_df = self.get_final_cumulative_returns(period_dfs)

        return final_returns_df


    #기간별 수익률을 위한 메인 메서드 
    def main(self):
        self.get_df_ref()
        self.filter_by_date_range()
        self.convert_to_float()
        self.fill_zero_with_previous()
        #convert_to_float 이 먼저 실행되고 fill_zero_with_previous 가 실행되어야 함 
        final_returns_df = self.process_period_dfs()

        return final_returns_df

    


In [150]:
m = M8186(fund_code = 'A00002')
m.main()


Unnamed: 0_level_0,수정기준가,KOSPI,KOSPI200,KOSDAQ
기간,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1m,-4.130104,-7.589237,-6.473631,-12.475328
3m,-4.011951,-13.469296,-11.590764,-21.354317
전체 기간,2.808,-8.685352,-6.444996,-11.758712


In [151]:
m.open_df_SPX_index_raw()

In [156]:
m8186_instance = M8186(fund_code='some_code')

# S&P500 지수 데이터 불러오기
m8186_instance.open_df_SPX_index_raw()

# 불러온 S&P500 지수 데이터 출력
print(m8186_instance.df_spx)

         ticker SPX INDEX
0         field   PX_LAST
1          date       NaN
2    2021-01-04   3700.65
3    2021-01-05   3726.86
4    2021-01-06   3748.14
..          ...       ...
725  2023-11-16   4508.24
726  2023-11-17   4514.02
727  2023-11-20   4547.38
728  2023-11-21   4538.19
729  2023-11-22   4556.62

[730 rows x 2 columns]
