# 特定の日時のリストを取得する 

In [1]:
import jpholiday
from pytz import timezone, utc
import datetime
import pickle
from pathlib import Path

In [2]:
import numpy as np
import pandas as pd

In [3]:
def check_jst_datetimes_to_naive(*arg_datetimes):
    """
    ＊*今のところ，ローカルが東京でないnaiveなdatetimeはそのまま通してしまう
    引数のタイムゾーンが同じかどうかチェックし，存在するなら日本であるかチェック
    awareな場合は，naiveに変更
    """
    jst_timezone = timezone("Asia/Tokyo")
    tz_info_set = set([one_datetime.tzinfo for one_datetime in arg_datetimes])
    if len(tz_info_set) > 1:
        raise Exception("timezones are different")
        
    datetimes_tzinfo = list(tz_info_set)[0]
    
    if datetimes_tzinfo is not None:  # 長さが1のはず
        if timezone(str(datetimes_tzinfo)) != jst_timezone:
            raise Exception("timezones must be Asia/Tokyo")
        # naiveなdatetimeに変更
        arg_datetimes = [one_datetime.replace(tzinfo=None) for one_datetime in arg_datetimes]
    
    # 引数が一つであるかどうか
    if len(arg_datetimes) > 1:  # 引数が複数の場合
        return tuple(arg_datetimes)
    else:  # 引数が一つの場合
        return arg_datetimes[0]

## 営業日・休日を取得する 

Optionの簡単化のため，HolidayGetterクラスを作成する．jpholidayを利用するものとCSVファイルを読み込むものの二つを用意する．

### 祝日のndarrayを取得する(jpholidayを利用)

以下はjpholidayを利用した素直な実装だが，遅い．そこで一度しか呼ばないようにすべき

In [4]:
class JPHolidayGetter:
    def get_holidays(self, start_date, end_date, with_name=False):
        """
        期間を指定して祝日を取得．jpholidayを利用して祝日を取得している．

        start_date: datetime.date
            開始時刻のdate
        end_datetime: datetime.date
            終了時刻のdate
        with_name: bool
            休日の名前を出力するかどうか
        to_date: bool
            出力をdatetime.datetimeにするかdatetime.dateにするか
        """
        assert isinstance(start_date, datetime.date) and isinstance(end_date, datetime.date)

        holidays_array = np.array(jpholiday.between(start_date, end_date))

        if not with_name:  # 祝日名がいらない場合
            return holidays_array[:,0].copy()

        return holidays_array
        

In [5]:
start_date = datetime.date(1960, 1, 1)
end_date = datetime.date(2021, 12, 31)

holidays_array = JPHolidayGetter().get_holidays(start_date, end_date, with_name=True)

In [6]:
holidays_array

array([[datetime.date(1960, 1, 1), '元日'],
       [datetime.date(1960, 1, 15), '成人の日'],
       [datetime.date(1960, 3, 20), '春分の日'],
       ...,
       [datetime.date(2021, 9, 23), '秋分の日'],
       [datetime.date(2021, 11, 3), '文化の日'],
       [datetime.date(2021, 11, 23), '勤労感謝の日']], dtype=object)

In [7]:
len(holidays_array)

937

### 祝日のndarrayを取得(CSVファイルを利用) 

思ったより速度が速いので，毎回これを呼ぶことも可能であるが，jpholidayのものを踏襲してこれも一度あるいはOptionを変更したときのみ呼ばれるようにする．

In [8]:
class CSVHolidayGetter:
    def __init__(self, csv_paths):
        if not isinstance(csv_paths, list):  # リストでないなら，リストにしておく
            csv_paths = [csv_paths]
            
        self.csv_paths = csv_paths
        
    def get_holidays(self, start_date, end_date, with_name=False):
        """
        期間を指定して祝日を取得．csvファイルを利用して祝日を取得している．

        start_date: datetime.date
            開始時刻のdate
        end_datetime: datetime.date
            終了時刻のdate
        with_name: bool
            休日の名前を出力するかどうか
        to_date: bool
            出力をdatetime.datetimeにするかdatetime.dateにするか
        """
        assert isinstance(start_date, datetime.date) and isinstance(end_date, datetime.date)
        
        # datetime.dateをpd.Timestampに変換(datetime.dateは通常pd.DatetimeIndexと比較できないため)
        start_timestamp = pd.Timestamp(start_date)
        end_timestamp = pd.Timestamp(end_date)
        
        for i, csv_path in enumerate(self.csv_paths):
            holiday_df = pd.read_csv(csv_path, 
                                     header=None,
                                     names=["date", "holiday_name"],
                                     index_col="date",
                                     parse_dates=True
                                    )
            if i == 0:
                left_df = holiday_df
            else:
                append_bool = ~holiday_df.index.isin(left_df.index)  # 左Dataframeに存在しない部分を追加
                left_df = left_df.append(holiday_df.loc[append_bool], sort=True)

        
        # 指定範囲内の祝日を取得
        holiday_in_span_index = (start_timestamp<=left_df.index)&(left_df.index<end_timestamp)
        holiday_in_span_df = left_df.loc[holiday_in_span_index]
        
        holiday_in_span_date_array = holiday_in_span_df.index.date
        holiday_in_span_name_array = holiday_in_span_df.loc[:,"holiday_name"].values
        holiday_in_span_array = np.stack([holiday_in_span_date_array,
                                          holiday_in_span_name_array
                                         ],
                                         axis=1
                                        )
        
        if not with_name:  # 祝日名がいらない場合
            return holiday_in_span_date_array
            
        return holiday_in_span_array
            

In [9]:
source_paths = [Path("source/holiday_naikaku.csv"),
                Path("source/holiday_api.csv")
               ]

holiday_getter = CSVHolidayGetter(source_paths)

start_date = datetime.date(1960, 1, 1)
end_date = datetime.date(2021, 12, 31)

holidays_array = holiday_getter.get_holidays(start_date, end_date, with_name=True)
holidays_array

array([[datetime.date(1960, 1, 1), '元日'],
       [datetime.date(1960, 1, 15), '成人の日'],
       [datetime.date(1960, 3, 20), '春分の日'],
       ...,
       [datetime.date(2021, 9, 23), '秋分の日'],
       [datetime.date(2021, 11, 3), '文化の日'],
       [datetime.date(2021, 11, 23), '勤労感謝の日']], dtype=object)

In [10]:
len(holidays_array)

913

In [11]:
def temp_func():
    holidays_array = holiday_getter.get_holidays(start_date, end_date, with_name=True)

from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
prf.add_module(CSVHolidayGetter)                                                                                          
#prf.add_function()                                                                                      
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0 s
File: <ipython-input-8-73157d16bb73>
Function: __init__ at line 2

Line #      Hits         Time  Per Hit   % Time  Line Contents
     2                                               def __init__(self, csv_paths):
     3                                                   if not isinstance(csv_paths, list):  # リストでないなら，リストにしておく
     4                                                       csv_paths = [csv_paths]
     5                                                       
     6                                                   self.csv_paths = csv_paths

Total time: 0.259739 s
File: <ipython-input-8-73157d16bb73>
Function: get_holidays at line 8

Line #      Hits         Time  Per Hit   % Time  Line Contents
     8                                               def get_holidays(self, start_date, end_date, with_name=False):
     9                                                   """
    10                                                   期間を指定して祝日を取得

このメソッドはそれほど呼ばないので速度が遅くてもかまわない．それに速度の向上は期待できない．

## Option

jpholidayの休日取得速度が遅いため(csvとしても毎回読み込むのは非効率なため)，パッケージとして読み込む時にあらかじめ利用する休日をarrayとして保持しておくOptionを作成する．さらに利用する休日の期間やcsvとjpholidayの切り替え，休日曜日の切り替え，日中時間の切り替えなどを行うことができる．

In [29]:
class Option():
    """
    オプションの指定のためのクラス
    holiday_start_year: int
        利用する休日の開始年
    holiday_end_year: int
        利用する休日の終了年
    backend: str
        休日取得のバックエンド．csvかjpholidayのいずれかが選べる
    csv_source_paths: list of str or pathlib.Path
        バックエンドをcsvにした場合の休日のソースcsvファイル
    holiday_weekdays: list of int
        休日曜日の整数のリスト
    intraday_borders: list of list of 2 datetime.time
        日中を指定する境界時間のリストのリスト
    """
    def __init__(self):
        self._holiday_start_year = datetime.datetime.now().year-5
        self._holiday_end_year = datetime.datetime.now().year
        
        self._backend = "csv"
        self._csv_source_paths = [Path("source/holiday_naikaku.csv"),]
        
        self.make_holiday_getter()  # HolidayGetterを作成
        self.make_holidays()  # アトリビュートに追加
        
        self._holiday_weekdays = [5,6]  # 土曜日・日曜日
        self._intraday_borders = [[datetime.time(9,0), datetime.time(11,30)],
                                  [datetime.time(12,30), datetime.time(15,0)]]
        
    
    def make_holiday_getter(self):
        if self.backend == "jp_holiday":
            self._holiday_getter = JPHolidayGetter()
        elif self.backend == "csv":
            self._holiday_getter = CSVHolidayGetter(self.csv_source_paths)
        
    
    def make_holidays(self):
        """
        利用する休日のarrayとDatetimeIndexをアトリビュートとして作成
        """
        self._holidays_date_array = self._holiday_getter.get_holidays(start_date=datetime.date(self.holiday_start_year,1,1),
                                                    end_date=datetime.date(self.holiday_end_year,12,31),
                                                    with_name=False,
                                                   )
        self._holidays_datetimeindex =  pd.DatetimeIndex(self._holidays_date_array)
        
    
    @property
    def holiday_start_year(self):
        return self._holiday_start_year
    
    @holiday_start_year.setter
    def holiday_start_year(self, year):
        assert isinstance(year,int)
        self._holiday_start_year = year
        self.make_holidays()  # アトリビュートに追加
    
    @property
    def holiday_end_year(self):
        return self._holiday_end_year

    @holiday_end_year.setter
    def holiday_end_year(self, year):
        assert isinstance(year,int)
        self._holiday_end_year = year
        self.make_holidays()  # アトリビュートに追加
    
    @property
    def backend(self):
        return self._backend
    
    @backend.setter
    def backend(self, backend_str):
        if backend_str not in ("jp_holiday","csv"):
            raise Exception("backend must be 'jp_holiday' or 'csv'.")
        self._backend = backend_str
        self.make_holiday_getter()  # HolidayGetterを作成
    
    @property
    def csv_source_paths(self):
        #中身の確認
        for csv_source_path in self._csv_source_paths:
            if not isinstance(csv_source_path, str) and not isinstance(csv_source_path, Path):
                raise Exception("csv_source_paths must be list of str or pathlib.Path")
        
        return self._csv_source_paths
    
    @csv_source_paths.setter
    def csv_source_paths(self, path_list):
        self.csv_source_paths = path_list
        
    @property
    def holidays_date_array(self):
        return self._holidays_date_array
    
    @property
    def holidays_datetimeindex(self):
        return self._holidays_datetimeindex
    
    @property
    def holiday_weekdays(self):
        # 中身の確認
        for weekday in self._holiday_weekdays:
            if not isinstance(weekday, int):
                raise Exception("holiday_weekdays must be list of integer(0<=x<=6)")
                
        return self._holiday_weekdays
    
    @holiday_weekdays.setter
    def holiday_weekdays(self, weekdays_list):
        self._holiday_weekdays = weekdays_list
    
    @property
    def intraday_borders(self):
        # 中身の確認
        for border in self._intraday_borders:
            if not isinstance(border, list) or len(border) != 2:
                raise Exception("intraday_borders must be list of list whitch has 2 datetime.time")
                
            for border_time in border:
                if not isinstance(border_time, datetime.time):
                    raise Exception("intraday_borders must be list of list whitch has 2 datetime.time")
             
        return self._intraday_borders
    
    @intraday_borders.setter
    def intraday_borders(self, borders_list):
        self._intraday_borders = borders_list

### Optionのコンストラクト 

In [30]:
option = Option()

### 休日を取得する関数

In [31]:
def get_holidays_jp(start_date, end_date, with_name=False, independent=False):
        """
        期間を指定して祝日を取得．

        start_date: datetime.date
            開始時刻のdate
        end_datetime: datetime.date
            終了時刻のdate
        with_name: bool
            休日の名前を出力するかどうか
        to_date: bool
            出力をdatetime.datetimeにするかdatetime.dateにするか
        independent: bool，default:False
            休日をoptionから独立させるかどうか．FalseならばOption内で保持する休日が取得される
        """
        assert isinstance(start_date, datetime.date) and isinstance(end_date, datetime.date)
        
        if not independent:
            # datetime.dateをpd.Timestampに変換(datetime.dateは通常pd.DatetimeIndexと比較できないため)
            start_timestamp = pd.Timestamp(start_date)
            end_timestamp = pd.Timestamp(end_date)

            holidays_in_span_index = (start_timestamp<=option.holidays_datetimeindex)&(option.holidays_datetimeindex<end_timestamp)  # DatetimeIndexを使うことに注意
            holidays_in_span_array = option.holidays_date_array[holidays_in_span_index]  # ndarrayを使う

            return holidays_in_span_array
        else:
            if option.backend == "jp_holiday":
                holiday_getter = JPHolidayGetter()
            elif option.backend == "csv":
                holiday_getter = CSVHolidayGetter(option.csv_source_paths)
                
            holidays_array = holiday_getter.get_holidays(start_date=start_date,
                                                         end_date=end_date,
                                                         with_name=with_name
                                                        ) 
            return  holidays_array

In [32]:
start_date = datetime.date(1960, 1, 1)
end_date = datetime.date(2021, 12, 31)

holidays_array = get_holidays_jp(start_date, 
                                end_date,
                                with_name=True,
                                independent=False
                               )
holidays_array

array([datetime.date(2016, 1, 1), datetime.date(2016, 1, 11),
       datetime.date(2016, 2, 11), datetime.date(2016, 3, 20),
       datetime.date(2016, 3, 21), datetime.date(2016, 4, 29),
       datetime.date(2016, 5, 3), datetime.date(2016, 5, 4),
       datetime.date(2016, 5, 5), datetime.date(2016, 7, 18),
       datetime.date(2016, 8, 11), datetime.date(2016, 9, 19),
       datetime.date(2016, 9, 22), datetime.date(2016, 10, 10),
       datetime.date(2016, 11, 3), datetime.date(2016, 11, 23),
       datetime.date(2016, 12, 23), datetime.date(2017, 1, 1),
       datetime.date(2017, 1, 2), datetime.date(2017, 1, 9),
       datetime.date(2017, 2, 11), datetime.date(2017, 3, 20),
       datetime.date(2017, 4, 29), datetime.date(2017, 5, 3),
       datetime.date(2017, 5, 4), datetime.date(2017, 5, 5),
       datetime.date(2017, 7, 17), datetime.date(2017, 8, 11),
       datetime.date(2017, 9, 18), datetime.date(2017, 9, 23),
       datetime.date(2017, 10, 9), datetime.date(2017, 11, 3),

In [33]:
def temp_func():
    holidays_array = get_holidays_jp(start_date, 
                                end_date,
                                with_name=True,
                                independent=False
                               )

from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
#prf.add_module()                                                                                          
prf.add_function(get_holidays_jp)                                                                                      
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0.0027388 s
File: <ipython-input-31-2ff14dc42e2a>
Function: get_holidays_jp at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def get_holidays_jp(start_date, end_date, with_name=False, independent=False):
     2                                                   """
     3                                                   期間を指定して祝日を取得．
     4                                           
     5                                                   start_date: datetime.date
     6                                                       開始時刻のdate
     7                                                   end_datetime: datetime.date
     8                                                       終了時刻のdate
     9                                                   with_name: bool
    10                                                       休日の名前を出力するかどうか
    11                                          

ボトルネックはなさそう

## 営業日の取得 

optionの休日を参照する．

### 指定した期間の営業日のndarrayを取得する

In [34]:
def get_workdays_jp(start_date, end_date, return_as="date", end_include=False):
    """
    営業日を取得
    
    start_date: datetime.date
        開始時刻のdate
    end_datetime: datetime.date
        終了時刻のdate
    return_as: str, defalt: 'dt'
        返り値の形式
        - 'dt':pd.DatetimeIndex
        - 'date': datetime.date array
    end_include: bool
        最終日も含めて出力するか
    """
    assert isinstance(start_date, datetime.date) and isinstance(end_date, datetime.date)
    # 返り値の形式の指定
    return_as_set = {"dt", "date"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))
    
    # datetime.dateをpd.Timestampに変換(datetime.dateは通常pd.DatetimeIndexと比較できないため)
    start_timestamp = pd.Timestamp(start_date)
    end_timestamp = pd.Timestamp(end_date)
    
    # 期間中のholidayを取得
    holidays_in_span_index = (start_timestamp<=option.holidays_datetimeindex)&(option.holidays_datetimeindex<end_timestamp)  # DatetimeIndexを使うことに注意
    holidays_in_span_array = option.holidays_date_array[holidays_in_span_index]  # ndarrayを使う

    # 期間中のdatetimeのarrayを取得
    if end_include:
        days_datetimeindex = pd.date_range(start=start_date, end=end_date, freq="D")  # 最終日も含める
    else:
        days_datetimeindex = pd.date_range(start=start_date, end=end_date-datetime.timedelta(days=1), freq="D")  # 最終日は含めない
    
    
    # 休日に含まれないもの，さらに土日に含まれないもののboolインデックスを取得
    holiday_bool_array = np.in1d(days_datetimeindex.date, holidays_in_span_array)  # 休日であるかのブール(pd.DatetimeIndex.isin)でもいい
    holiday_weekday_each_bool_arrays = [days_datetimeindex.weekday==weekday for weekday in option.holiday_weekdays]  # inを使うのを回避
    holiday_weekday_bool_array = np.logical_or.reduce(holiday_weekday_each_bool_arrays)  # 休日曜日
    
    workdays_bool_array = (~holiday_bool_array)&(~holiday_weekday_bool_array)  # 休日でなく休日曜日でない
    
    workdays_datetimeindex = days_datetimeindex[workdays_bool_array].copy()
    if return_as=="dt":
        return workdays_datetimeindex
    elif return_as=="date":
        return workdays_datetimeindex.date

In [35]:
start_date = datetime.datetime(2021, 1, 1)
end_date = datetime.datetime(2021, 12, 31)

workdays = get_workdays_jp(start_date, end_date, return_as="date", end_include=False)

In [167]:
workdays[:50]

array([datetime.date(2021, 1, 4), datetime.date(2021, 1, 5),
       datetime.date(2021, 1, 6), datetime.date(2021, 1, 7),
       datetime.date(2021, 1, 8), datetime.date(2021, 1, 12),
       datetime.date(2021, 1, 13), datetime.date(2021, 1, 14),
       datetime.date(2021, 1, 15), datetime.date(2021, 1, 18),
       datetime.date(2021, 1, 19), datetime.date(2021, 1, 20),
       datetime.date(2021, 1, 21), datetime.date(2021, 1, 22),
       datetime.date(2021, 1, 25), datetime.date(2021, 1, 26),
       datetime.date(2021, 1, 27), datetime.date(2021, 1, 28),
       datetime.date(2021, 1, 29), datetime.date(2021, 2, 1),
       datetime.date(2021, 2, 2), datetime.date(2021, 2, 3),
       datetime.date(2021, 2, 4), datetime.date(2021, 2, 5),
       datetime.date(2021, 2, 8), datetime.date(2021, 2, 9),
       datetime.date(2021, 2, 10), datetime.date(2021, 2, 12),
       datetime.date(2021, 2, 15), datetime.date(2021, 2, 16),
       datetime.date(2021, 2, 17), datetime.date(2021, 2, 18),
    

In [37]:
def temp_func():
    workdays = get_workdays_jp(start_date, end_date, return_as="date", end_include=False)

from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
#prf.add_module()                                                                                          
prf.add_function(get_workdays_jp)                                                                                      
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0.0264773 s
File: <ipython-input-34-e1372a9d4670>
Function: get_workdays_jp at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def get_workdays_jp(start_date, end_date, return_as="date", end_include=False):
     2                                               """
     3                                               営業日を取得
     4                                               
     5                                               start_date: datetime.date
     6                                                   開始時刻のdate
     7                                               end_datetime: datetime.date
     8                                                   終了時刻のdate
     9                                               return_as: str, defalt: 'dt'
    10                                                   返り値の形式
    11                                                   - 'dt':pd.DatetimeIn

In [38]:
days_datetimeindex = pd.date_range(start=start_date, end=end_date, freq="D")

ボトルネックは以下の部分である．

In [39]:
%%timeit 
[days_datetimeindex.weekday==weekday for weekday in option.holiday_weekdays]

2.85 ms ± 429 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


ここで，weekdayアトリビュートの取得にほぼすべての時間が費やされていることに注意すれば

In [40]:
%%timeit
weekday_array = days_datetimeindex.weekday.values
[weekday_array==weekday for weekday in option.holiday_weekdays]

594 µs ± 44.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


これでholiday_weekdaysの個数に依存しなくなる．

### 営業日であるかどうか 

In [41]:
def check_workday_jp(select_date):
    """
    与えられたdatetime.dateが営業日であるかどうかを出力する
    select_date: datetime.date
        入力するdate
    """
    assert isinstance(select_date, datetime.date)
    select_date_array = np.array([select_date])  # データ数が一つのndarray
    # 休日であるかどうか
    is_holiday = np.in1d(select_date_array, option.holidays_date_array).item()  # データ数が一つのため，item
    
    # 休日曜日であるかどうか
    is_holiday_weekday = select_date.weekday() in option.holiday_weekdays
    
    is_workday = not any([is_holiday, is_holiday_weekday])
    
    return is_workday

In [42]:
select_date = datetime.date(2021, 1, 3)
check_workday_jp(select_date)

False

In [43]:
def temp_func():
    check_workday_jp(select_date)
    
from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
#prf.add_module()                                                                                          
prf.add_function(check_workday_jp)                                                                                      
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0.0068659 s
File: <ipython-input-41-45ef695fe0ae>
Function: check_workday_jp at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def check_workday_jp(select_date):
     2                                               """
     3                                               与えられたdatetime.dateが営業日であるかどうかを出力する
     4                                               select_date: datetime.date
     5                                                   入力するdate
     6                                               """
     7         1        102.0    102.0      0.1      assert isinstance(select_date, datetime.date)
     8         1        459.0    459.0      0.7      select_date_array = np.array([select_date])  # データ数が一つのndarray
     9                                               # 休日であるかどうか
    10         1      67670.0  67670.0     98.6      is_holiday = np.in1d(select_date_array, option.holi

np.in1dの速度とコンストラクタの速度の問題がある．

In [144]:
%%timeit
select_date = datetime.date(2021, 1, 3)
select_date_array = select_date_array = np.array([select_date])
np.in1d(select_date_array, option.holidays_date_array).item()  # データ数が一つのため，item

5.99 ms ± 715 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


この問題では探すのが1つであるので，以下のようにdatetime.dateの比較演算子を利用できる．

In [145]:
%%timeit 
select_date = datetime.date(2021, 1, 3)
(option.holidays_date_array == select_date).sum() > 0

71.7 µs ± 10.3 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


### 非営業日を取得

同様に，すでに取得したholidayと照らし合わせる．

In [44]:
def get_not_workdays_jp(start_date, end_date, return_as="date", end_include=False):
    """
    非営業日を取得(土日or祝日)
    
    start_date: datetime.date
        開始時刻のdate
    end_datetime: datetime.date
        終了時刻のdate
    return_as: str, defalt: 'dt'
        返り値の形式
        - 'dt':pd.DatetimeIndex
        - 'date': datetime.date array
    end_include: bool
        最終日も含めて出力するか
    """
    assert isinstance(start_date, datetime.date) and isinstance(end_date, datetime.date)
    # 返り値の形式の指定
    return_as_set = {"dt", "date"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))
    
    # datetime.dateをpd.Timestampに変換(datetime.dateは通常pd.DatetimeIndexと比較できないため)
    start_timestamp = pd.Timestamp(start_date)
    end_timestamp = pd.Timestamp(end_date)
    
    # 期間中のholidayを取得
    holidays_in_span_index = (start_timestamp<=option.holidays_datetimeindex)&(option.holidays_datetimeindex<end_timestamp)  # DatetimeIndexを使うことに注意
    holidays_in_span_array = option.holidays_date_array[holidays_in_span_index]  # ndarrayを使う

    # 期間中のdatetimeのarrayを取得
    if end_include:
        days_datetimeindex = pd.date_range(start=start_date, end=end_date, freq="D")  # 最終日も含める
    else:
        days_datetimeindex = pd.date_range(start=start_date, end=end_date-datetime.timedelta(days=1), freq="D")  # 最終日は含めない
    
    # 休日に含まれないもの，さらに休日曜日に含まれないもののboolインデックスを取得
    holiday_bool_array = np.in1d(days_datetimeindex.date, holidays_in_span_array)  # 休日であるかのブール(pd.DatetimeIndex.isin)でもいい
    holiday_weekday_each_bool_arrays = [days_datetimeindex.weekday==weekday for weekday in option.holiday_weekdays]  # inを使うのを回避
    holiday_weekday_bool_array = np.logical_or.reduce(holiday_weekday_each_bool_arrays)  # 休日曜日
    
    not_workdays_bool_array = holiday_bool_array | holiday_weekday_bool_array  # 休日あるいは休日曜日
    
    not_workdays_datetimeindex = days_datetimeindex[not_workdays_bool_array].copy()
    if return_as=="dt":
        return not_workdays_datetimeindex
    elif return_as=="date":
        return not_workdays_datetimeindex.date

In [45]:
start_datetime = datetime.date(2019, 1, 1)
end_datetime = datetime.date(2019, 12, 31)

not_workdays = get_not_workdays_jp(start_date, end_date, return_as="dt")

In [46]:
not_workdays

DatetimeIndex(['2021-01-01', '2021-01-02', '2021-01-03', '2021-01-09',
               '2021-01-10', '2021-01-11', '2021-01-16', '2021-01-17',
               '2021-01-23', '2021-01-24',
               ...
               '2021-11-27', '2021-11-28', '2021-12-04', '2021-12-05',
               '2021-12-11', '2021-12-12', '2021-12-18', '2021-12-19',
               '2021-12-25', '2021-12-26'],
              dtype='datetime64[ns]', length=119, freq=None)

プロファイリングは省略

## 指定した日数後の営業日を取得

In [191]:
def get_next_workday_jp(select_date, days=1, select_include=False, return_as="date"):
    """
    指定した日数後の営業日を取得
    select_date: datetime.date
        指定する日時
    days: int
        日数
    return_as: str, defalt: 'dt'
        返り値の形式
        - 'dt':pd.Timstamp
        - 'datetime': datetime.datetime array
    """
    assert isinstance(select_date, datetime.date)
    # 返り値の形式の指定
    return_as_set = {"dt", "date"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))
        

    def get_next_workday_jp_gen(select_date):
        add_days = 1  # 追加日数の初期値
        while True:
            next_day = select_date + datetime.timedelta(days=add_days)
            if check_workday_jp(next_day):
                yield next_day
            add_days += 1  # 追加日数をさらに追加
    
    next_workday_gen = get_next_workday_jp_gen(select_date)
    
    for i in range(days):
        next_day = next(next_workday_gen)
    
    if return_as=="date":
        return next_day
    elif return_as=="dt":
        return pd.Timestamp(next_day)

In [192]:
select_date = datetime.date(2021, 1, 1)
next_workday = get_next_workday_jp(select_date, days=10, return_as="date")
next_workday

datetime.date(2021, 1, 18)

In [193]:
def temp_func():
    select_date = datetime.date(2021, 1, 1)
    next_workday = get_next_workday_jp(select_date, days=43, return_as="date")
    
from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
#prf.add_module()                                                                                          
prf.add_function(get_next_workday_jp)                                                                                      
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0.0262004 s
File: <ipython-input-191-eab34ca25371>
Function: get_next_workday_jp at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def get_next_workday_jp(select_date, days=1, select_include=False, return_as="date"):
     2                                               """
     3                                               select_date: datetime.date
     4                                                   指定する日時
     5                                               return_as: str, defalt: 'dt'
     6                                                   返り値の形式
     7                                                   - 'dt':pd.Timstamp
     8                                                   - 'datetime': datetime.datetime array
     9                                               """
    10         1        218.0    218.0      0.1      assert isinstance(select_date, datetime.date)
   

ボトルネックは明らかであり，毎回get_next_workday_jpで判定していることである．結局のところ，普通の日にちのイテレーターと休日イテレーターを回しながら一つづつ判定するのが効果的と思われる．

In [195]:
select_date = datetime.date(2021, 1, 1)
select_days = 10

counter = 0
holiday_weekdays_set = set(option.holiday_weekdays)  #setにした方が高速？

holiday_bigger_select_index = (option.holidays_date_array<=select_date).sum()
holiday_iter = iter(option.holidays_date_array[holiday_bigger_select_index:])
def days_gen(select_date):
    add_days = 1  # select_dateを含まない
    while True:
        yield select_date + datetime.timedelta(days=add_days)
        add_days += 1
days_iter = days_gen(select_date)

# 以下二つのイテレーターを比較し，one_dayが休日に含まれる場合，カウントしカウントが指定に達した場合終了する
one_day = next(days_iter)

one_holiday = next(holiday_iter)

while True:
    if one_day==one_holiday:  #その日が祝日である
        one_holiday = next(holiday_iter)
    else:
        if not one_day.weekday() in holiday_weekdays_set:  #その日が休日曜日である
            #print(one_day)
            counter += 1  # カウンターをインクリメント
    
    if counter >= select_days:
        break
    
    one_day = next(days_iter)

In [196]:
one_day

datetime.date(2021, 1, 18)

In [197]:
datetime.date(2021,1,1) == datetime.date(2021,1,1)

True

## 指定した日数分の営業日を取得

In [200]:
def get_workdays_number_jp(start_date, days, return_as="date"):
    """
    指定した日数分の営業日を取得
    start_date: datetime.date
        開始日時
    days: int
        日数
    return_as: str, defalt: 'dt'
        返り値の形式
        - 'dt':pd.Timstamp
        - 'datetime': datetime.datetime array
    """
    end_date = get_next_workday_jp(start_date, days=days, return_as="date")
    return get_workdays_jp(start_date, end_date, end_include=True, return_as=return_as)

In [201]:
start__date = datetime.date(2021, 1, 1)
workdays = get_workdays_number_jp(select_date, days=10, return_as="date")
workdays

array([datetime.date(2021, 1, 4), datetime.date(2021, 1, 5),
       datetime.date(2021, 1, 6), datetime.date(2021, 1, 7),
       datetime.date(2021, 1, 8), datetime.date(2021, 1, 12),
       datetime.date(2021, 1, 13), datetime.date(2021, 1, 14),
       datetime.date(2021, 1, 15), datetime.date(2021, 1, 18)],
      dtype=object)

## データフレームに関しての処理

### データの用意 

awareなdatetimeを持つDataFrame

In [52]:
with open("aware_stock_df.pickle", "rb") as f:
    aware_stock_df = pickle.load(f)

aware_stock_df

Unnamed: 0_level_0,Open_6502,High_6502,Low_6502,Close_6502,Volume_6502
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2020-11-04 00:00:00+09:00,,,,,
2020-11-04 00:01:00+09:00,,,,,
2020-11-04 00:02:00+09:00,,,,,
2020-11-04 00:03:00+09:00,,,,,
2020-11-04 00:04:00+09:00,,,,,
...,...,...,...,...,...
2020-11-30 23:55:00+09:00,,,,,
2020-11-30 23:56:00+09:00,,,,,
2020-11-30 23:57:00+09:00,,,,,
2020-11-30 23:58:00+09:00,,,,,


naiveなdatetimeを持つDataFrame

In [53]:
with open("naive_stock_df.pickle", "rb") as f:
    naive_stock_df = pickle.load(f)

naive_stock_df.at_time(datetime.time(9,0))

Unnamed: 0_level_0,Open_6502,High_6502,Low_6502,Close_6502,Volume_6502
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2020-11-04 09:00:00,2669.0,2670.0,2658.0,2664.0,93000.0
2020-11-05 09:00:00,2631.0,2641.0,2630.0,2639.0,55000.0
2020-11-06 09:00:00,2615.0,2644.0,2609.0,2617.0,90700.0
2020-11-09 09:00:00,2711.0,2719.0,2707.0,2710.0,0.0
2020-11-10 09:00:00,2780.0,2782.0,2770.0,2777.0,142700.0
2020-11-11 09:00:00,,,,,
2020-11-12 09:00:00,2788.0,2800.0,2781.0,2789.0,108400.0
2020-11-13 09:00:00,2750.0,2754.0,2741.0,2750.0,71500.0
2020-11-14 09:00:00,,,,,
2020-11-15 09:00:00,,,,,


### 営業日データを取得

高速化のため，既存のholidayと照らし合わせる方法でとる

In [54]:
def extract_workdays_jp_index(dt_index, return_as="index"):
    """
    pd.DatetimeIndexから，営業日のデータのものを抽出
    dt_index: pd.DatetimeIndex
        入力するDatetimeIndex，すでにdatetimeでソートしていることが前提
    return_as: str
        出力データの形式
        - "index": 引数としたdfの対応するインデックスを返す
        - "bool": 引数としたdfに対応するboolインデックスを返す
    """
    # 返り値の形式の指定
    return_as_set = {"index", "bool"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))
        
    # すでにtimestampでソートさてている前提
    start_datetime = dt_index[0].to_pydatetime()
    end_datetime = dt_index[-1].to_pydatetime()
    
    start_datetime, end_datetime = check_jst_datetimes_to_naive(start_datetime, end_datetime)  # 二つのdatetimeのタイムゾーンをチェック・naiveに変更
    
    # 期間内のholidayを取得
    holidays_in_span_index = ((start_datetime-datetime.timedelta(days=1))<option.holidays_datetimeindex)&\
    (option.holidays_datetimeindex<=end_datetime)  # DatetimeIndexを使うことに注意, 当日を含めるため，startから1を引いている．
    holidays_in_span_array = option.holidays_date_array[holidays_in_span_index]  # ndarrayを使う
    
    # 休日に含まれないもの，さらに土日に含まれないもののboolインデックスを取得    
    holiday_bool_array = np.in1d(dt_index.date, holidays_in_span_array)  # 休日
    holiday_weekday_each_bool_arrays = [dt_index.weekday==weekday for weekday in option.holiday_weekdays]  # inを使うのを回避
    holiday_weekday_bool_array = np.logical_or.reduce(holiday_weekday_each_bool_arrays)  # 休日曜日
    
    workdays_bool_array = (~holiday_bool_array)&(~holiday_weekday_bool_array)  # 休日でなく休日曜日でない
    if return_as=="bool":  # boolで返す場合
        return workdays_bool_array
    
    elif return_as=="index":  # indexで返す場合
        workdays_df_indice = dt_index[workdays_bool_array]
        return workdays_df_indice

In [55]:
def extract_workdays_jp(df, return_as="df"):
    """
    データフレームから，営業日のデータのものを抽出．出力データ形式をreturn_asで指定する．
    df: pd.DataFrame(インデックスとしてpd.DatetimeIndex)
        入力データ
    return_as: str
        出力データの形式
        - "df": 抽出した新しいpd.DataFrameを返す
        - "index": 引数としたdfの対応するインデックスを返す
        - "bool": 引数としたdfに対応するboolインデックスを返す
    """
    
    # 返り値の形式の指定
    return_as_set = {"df", "index", "bool"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))
    
    workdays_bool_array = extract_workdays_jp_index(df.index, return_as="bool")
    if return_as=="bool":
        return workdays_bool_array
    
    workdays_df_indice = df.index[workdays_bool_array]
    if return_as=="index":
        return workdays_df_indice

    out_df = df.loc[workdays_df_indice].copy()
    return out_df

In [56]:
extracted_stock_df = extract_workdays_jp(aware_stock_df, return_as="df")
extracted_stock_df.at_time(datetime.time(9,0))

Unnamed: 0_level_0,Open_6502,High_6502,Low_6502,Close_6502,Volume_6502
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2020-11-04 09:00:00+09:00,2669.0,2670.0,2658.0,2664.0,93000.0
2020-11-05 09:00:00+09:00,2631.0,2641.0,2630.0,2639.0,55000.0
2020-11-06 09:00:00+09:00,2615.0,2644.0,2609.0,2617.0,90700.0
2020-11-09 09:00:00+09:00,2711.0,2719.0,2707.0,2710.0,0.0
2020-11-10 09:00:00+09:00,2780.0,2782.0,2770.0,2777.0,142700.0
2020-11-11 09:00:00+09:00,,,,,
2020-11-12 09:00:00+09:00,2788.0,2800.0,2781.0,2789.0,108400.0
2020-11-13 09:00:00+09:00,2750.0,2754.0,2741.0,2750.0,71500.0
2020-11-16 09:00:00+09:00,2714.0,2716.0,2705.0,2710.0,111000.0
2020-11-17 09:00:00+09:00,2748.0,2748.0,2729.0,2734.0,124400.0


In [57]:
def temp_func():
    extracted_stock_df = extract_workdays_jp(aware_stock_df, return_as="df")
    
from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
#prf.add_module()                                                                                          
prf.add_function(extract_workdays_jp)
prf.add_function(extract_workdays_jp_index)
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0.118979 s
File: <ipython-input-54-2a9ab9585719>
Function: extract_workdays_jp_index at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def extract_workdays_jp_index(dt_index, return_as="index"):
     2                                               """
     3                                               pd.DatetimeIndexから，営業日のデータのものを抽出
     4                                               dt_index: pd.DatetimeIndex
     5                                                   入力するDatetimeIndex，すでにdatetimeでソートしていることが前提
     6                                               return_as: str
     7                                                   出力データの形式
     8                                                   - "index": 引数としたdfの対応するインデックスを返す
     9                                                   - "bool": 引数としたdfに対応するboolインデックスを返す
    10                                               """
  

In [58]:
start_datetime = aware_stock_df.index[0].to_pydatetime()
end_datetime = aware_stock_df.index[-1].to_pydatetime()
start_datetime, end_datetime = check_jst_datetimes_to_naive(start_datetime, end_datetime)  # 二つのdatetimeのタイムゾーンをチェック・naiveに変更

holidays_in_span_index = ((start_datetime-datetime.timedelta(days=1))<option.holidays_datetimeindex)&(option.holidays_datetimeindex<=end_datetime)  # DatetimeIndexを使うことに注意, 当日を含めるため，startから1を引いている．
holidays_in_span_array = option.holidays_date_array[holidays_in_span_index]  # ndarrayを使う
holidays_in_span_datetimeindex = option.holidays_datetimeindex[holidays_in_span_index]

こちらはobjectのndarrayのin1dである．これは遅いことが知られている

In [59]:
holiday_bool_array1 = np.in1d(aware_stock_df.index.date, holidays_in_span_array)

In [69]:
%%timeit
holiday_bool_array1 = np.in1d(aware_stock_df.index.date, holidays_in_span_array)

72.8 ms ± 15 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


一方int(unix時間)に変更してしまえば，in1dは高速になる．しかし変更するのに多少時間がかかる．

In [72]:
%%timeit
df_index_date_int = aware_stock_df.index.floor("D").astype(int).values
holidays_in_span_int = holidays_in_span_datetimeindex.astype(int).values
holiday_bool_array2 = np.in1d(df_index_date_int, holidays_in_span_int)

19 ms ± 3.15 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [73]:
%timeit np.in1d(df_index_date_int, holidays_in_span_int)

237 µs ± 41.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [66]:
np.array_equal(holiday_bool_array1, holiday_bool_array2)

True

もちろんもとのarrayと同じである．

他の選択肢としてpd.DatetimeIndexのisinメソッドを利用してみる．こっちの方がintに変換しない分早い?

In [76]:
%%timeit 
holiday_bool_array3 = aware_stock_df.index.floor("D").isin(holidays_in_span_datetimeindex)

18.8 ms ± 2.66 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [78]:
np.array_equal(holiday_bool_array1, holiday_bool_array3)

True

### 日中データを取得

おそらく使わない，以下の営業日＋日中内で使う．

In [79]:
def extract_intraday_jp_index(dt_index, return_as="index"):
    """
    pd.DatetimeIndexから，日中のデータのものを抽出．出力データ形式をreturn_asで指定する．
    dt_index: pd.DatetimeIndex
        入力するDatetimeIndex
    return_as: str
        出力データの形式
        - "index": 引数としたdfの対応するインデックスを返す
        - "bool": 引数としたdfに対応するboolインデックスを返す
    """
    
    # 返り値の形式の指定
    return_as_set = {"index", "bool"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))    
  
    bool_array = np.full(len(dt_index), False)
    
    # ボーダー内のboolをTrueにする
    for borders in option.intraday_borders:
        start_time, end_time = borders[0], borders[1]  # 開始時刻と終了時刻
        in_border_indice = dt_index.indexer_between_time(start_time=start_time, end_time=end_time, include_end=False)
        bool_array[in_border_indice] = True
    
    if return_as=="bool":
        return bool_array

    elif return_as=="index":
        intraday_indice = dt_index[bool_array]
        return intraday_indice

In [80]:
def extract_intraday_jp(df, return_as="df"):
    """
    データフレームから，日中のデータのものを抽出．出力データ形式をreturn_asで指定する．
    df: pd.DataFrame(インデックスとしてpd.DatetimeIndex)
        入力データ
    return_as: str
        出力データの形式
        - "df": 抽出した新しいpd.DataFrameを返す
        - "index": 引数としたdfの対応するインデックスを返す
        - "bool": 引数としたdfに対応するboolインデックスを返す
    """
    
    # 返り値の形式の指定
    return_as_set = {"df", "index", "bool"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))    
  
    intraday_bool_array = extract_intraday_jp_index(df.index, return_as="bool")
    if return_as=="bool":
        return intraday_bool_array
    
    intraday_indice = df.index[intraday_bool_array]
    if return_as=="index":
        return intraday_indice
    
    out_df = df.loc[intraday_indice].copy()
    return out_df

以下の例では，朝8時のデータは存在しない

In [81]:
extracted_df = extract_intraday_jp(naive_stock_df, return_as="df")
extracted_df.at_time(datetime.time(8,0))

Unnamed: 0_level_0,Open_6502,High_6502,Low_6502,Close_6502,Volume_6502
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1


In [82]:
def temp_func():
    extracted_df = extract_intraday_jp(naive_stock_df, return_as="df")
    
from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
#prf.add_module()                                                                                          
prf.add_function(extract_intraday_jp)
prf.add_function(extract_intraday_jp_index)
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0.0132707 s
File: <ipython-input-79-6cd31f91260a>
Function: extract_intraday_jp_index at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def extract_intraday_jp_index(dt_index, return_as="index"):
     2                                               """
     3                                               pd.DatetimeIndexから，日中のデータのものを抽出．出力データ形式をreturn_asで指定する．
     4                                               dt_index: pd.DatetimeIndex
     5                                                   入力するDatetimeIndex
     6                                               return_as: str
     7                                                   出力データの形式
     8                                                   - "index": 引数としたdfの対応するインデックスを返す
     9                                                   - "bool": 引数としたdfに対応するboolインデックスを返す
    10                                               """
   

ボトルネックはないようだ

### 営業日＋日中データを取得 

In [33]:
def extract_workdays_intraday_jp_index(dt_index, return_as="index"):
    """
    pd.DatetimeIndexから，営業日+日中のデータのものを抽出．出力データ形式をreturn_asで指定する．
    dt_index: pd.DatetimeIndex
        入力するDatetimeIndex
    return_as: str
        出力データの形式
        - "index": 引数としたdfの対応するインデックスを返す
        - "bool": 引数としたdfに対応するboolインデックスを返す
    """

    # 返り値の形式の指定
    return_as_set = {"index", "bool"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))
        
    workday_bool_array = extract_workdays_jp_index(dt_index, return_as="bool")
    intraday_bool_array = extract_intraday_jp_index(dt_index, return_as="bool")
    
    workday_intraday_bool_array = workday_bool_array & intraday_bool_array
    if return_as=="bool":
        return workday_intraday_bool_array
    elif return_as=="index":
        workday_intraday_indice = dt_index[workday_intraday_bool_array]
        return workday_intraday_indice

In [34]:
def extract_workdays_intraday_jp(df, return_as="df"):
    """
    データフレームから，営業日+日中のデータのものを抽出．出力データ形式をreturn_asで指定する．
    df: pd.DataFrame(インデックスとしてpd.DatetimeIndex)
        入力データ
    return_as: str
        出力データの形式
        - "df": 抽出した新しいpd.DataFrameを返す
        - "index": 引数としたdfの対応するインデックスを返す
        - "bool": 引数としたdfに対応するboolインデックスを返す
    """
    
    # 返り値の形式の指定
    return_as_set = {"df", "index", "bool"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))    
       
    workday_intraday_bool_array = extract_workdays_intraday_jp_index(df.index, return_as="bool")
    
    if return_as=="bool":
        return workday_intraday_bool_array
    
    workday_intraday_indice = df.index[workday_intraday_bool_array]
    
    if return_as=="index":
        return workday_intraday_indice
    
    out_df = df.loc[workday_intraday_indice].copy()
    return out_df

In [35]:
extracted_df = extract_workdays_intraday_jp(aware_stock_df, return_as="df")
extracted_df.at_time(datetime.time(9,0))

Unnamed: 0_level_0,Open_6502,High_6502,Low_6502,Close_6502,Volume_6502
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2020-11-04 09:00:00+09:00,2669.0,2670.0,2658.0,2664.0,93000.0
2020-11-05 09:00:00+09:00,2631.0,2641.0,2630.0,2639.0,55000.0
2020-11-06 09:00:00+09:00,2615.0,2644.0,2609.0,2617.0,90700.0
2020-11-09 09:00:00+09:00,2711.0,2719.0,2707.0,2710.0,0.0
2020-11-10 09:00:00+09:00,2780.0,2782.0,2770.0,2777.0,142700.0
2020-11-11 09:00:00+09:00,,,,,
2020-11-12 09:00:00+09:00,2788.0,2800.0,2781.0,2789.0,108400.0
2020-11-13 09:00:00+09:00,2750.0,2754.0,2741.0,2750.0,71500.0
2020-11-16 09:00:00+09:00,2714.0,2716.0,2705.0,2710.0,111000.0
2020-11-17 09:00:00+09:00,2748.0,2748.0,2729.0,2734.0,124400.0


In [36]:
extracted_df.at_time(datetime.time(8,0))

Unnamed: 0_level_0,Open_6502,High_6502,Low_6502,Close_6502,Volume_6502
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1


## 以上のプロファイリングを考慮した新しいバージョン 

## 営業日の取得 

### 指定した期間の営業日のndarrayを取得する

In [87]:
def get_workdays_jp(start_date, end_date, return_as="date", end_include=False):
    """
    営業日を取得
    
    start_date: datetime.date
        開始時刻のdate
    end_datetime: datetime.date
        終了時刻のdate
    return_as: str, defalt: 'dt'
        返り値の形式
        - 'dt':pd.DatetimeIndex
        - 'date': datetime.date array
    end_include: bool
        最終日も含めて出力するか
    """
    assert isinstance(start_date, datetime.date) and isinstance(end_date, datetime.date)
    # 返り値の形式の指定
    return_as_set = {"dt", "date"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))
    
    # datetime.dateをpd.Timestampに変換(datetime.dateは通常pd.DatetimeIndexと比較できないため)
    start_timestamp = pd.Timestamp(start_date)
    end_timestamp = pd.Timestamp(end_date)
    
    # 期間中のholidayを取得
    holidays_in_span_index = (start_timestamp<=option.holidays_datetimeindex)&(option.holidays_datetimeindex<end_timestamp)  # DatetimeIndexを使うことに注意
    holidays_in_span_datetimeindex = option.holidays_datetimeindex[holidays_in_span_index]  # ndarrayを使う

    # 期間中のdatetimeのarrayを取得
    if end_include:
        days_datetimeindex = pd.date_range(start=start_date, end=end_date, freq="D")  # 最終日も含める
    else:
        days_datetimeindex = pd.date_range(start=start_date, end=end_date-datetime.timedelta(days=1), freq="D")  # 最終日は含めない
    
    
    # 休日に含まれないもの，さらに土日に含まれないもののboolインデックスを取得
    holiday_bool_array = days_datetimeindex.isin(holidays_in_span_datetimeindex)  # 休日であるかのブール
    
    days_weekday_array = days_datetimeindex.weekday.values
    holiday_weekday_each_bool_arrays = [days_weekday_array==weekday for weekday in option.holiday_weekdays]  # inを使うのを回避
    holiday_weekday_bool_array = np.logical_or.reduce(holiday_weekday_each_bool_arrays)  # 休日曜日
    
    workdays_bool_array = (~holiday_bool_array)&(~holiday_weekday_bool_array)  # 休日でなく休日曜日でない
    
    workdays_datetimeindex = days_datetimeindex[workdays_bool_array].copy()
    if return_as=="dt":
        return workdays_datetimeindex
    elif return_as=="date":
        return workdays_datetimeindex.date

In [88]:
def temp_func():
    start_date = datetime.datetime(2021, 1, 1)
    end_date = datetime.datetime(2021, 12, 31)
    workdays = get_workdays_jp(start_date, end_date, return_as="date", end_include=False)

from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
#prf.add_module()                                                                                          
prf.add_function(get_workdays_jp)                                                                                      
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0.0135881 s
File: <ipython-input-87-6acf899d182f>
Function: get_workdays_jp at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def get_workdays_jp(start_date, end_date, return_as="date", end_include=False):
     2                                               """
     3                                               営業日を取得
     4                                               
     5                                               start_date: datetime.date
     6                                                   開始時刻のdate
     7                                               end_datetime: datetime.date
     8                                                   終了時刻のdate
     9                                               return_as: str, defalt: 'dt'
    10                                                   返り値の形式
    11                                                   - 'dt':pd.DatetimeIn

### 営業日であるかどうか 

In [146]:
def check_workday_jp(select_date):
    """
    与えられたdatetime.dateが営業日であるかどうかを出力する
    select_date: datetime.date
        入力するdate
    """
    assert isinstance(select_date, datetime.date)
    # 休日であるかどうか
    is_holiday = (option.holidays_date_array==select_date).sum() > 0
    
    # 休日曜日であるかどうか
    is_holiday_weekday = select_date.weekday() in set(option.holiday_weekdays)
    
    is_workday = not any([is_holiday, is_holiday_weekday])
    
    return is_workday

In [148]:
def temp_func():
    select_date = datetime.date(2021, 1, 3)
    check_workday_jp(select_date)
    
from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
#prf.add_module()                                                                                          
prf.add_function(check_workday_jp)                                                                                      
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0.0017455 s
File: <ipython-input-146-32034ee082ba>
Function: check_workday_jp at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def check_workday_jp(select_date):
     2                                               """
     3                                               与えられたdatetime.dateが営業日であるかどうかを出力する
     4                                               select_date: datetime.date
     5                                                   入力するdate
     6                                               """
     7         1        450.0    450.0      2.6      assert isinstance(select_date, datetime.date)
     8                                               # 休日であるかどうか
     9         1      16474.0  16474.0     94.4      is_holiday = (option.holidays_date_array==select_date).sum() > 0
    10                                               
    11                                         

In [149]:
def temp_func():
    next_workday = get_next_workday_jp(select_datetime, days=43, return_as="date")

from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
#prf.add_module()                                                                                          
prf.add_function(check_workday_jp) 
prf.add_function(get_next_workday_jp)
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0.0126762 s
File: <ipython-input-135-eab34ca25371>
Function: get_next_workday_jp at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def get_next_workday_jp(select_date, days=1, select_include=False, return_as="date"):
     2                                               """
     3                                               select_date: datetime.date
     4                                                   指定する日時
     5                                               return_as: str, defalt: 'dt'
     6                                                   返り値の形式
     7                                                   - 'dt':pd.Timstamp
     8                                                   - 'datetime': datetime.datetime array
     9                                               """
    10         1        150.0    150.0      0.1      assert isinstance(select_date, datetime.date)
   

### 非営業日を取得 

In [99]:
def get_not_workdays_jp(start_date, end_date, return_as="date", end_include=False):
    """
    非営業日を取得(土日or祝日)
    
    start_date: datetime.date
        開始時刻のdate
    end_datetime: datetime.date
        終了時刻のdate
    return_as: str, defalt: 'dt'
        返り値の形式
        - 'dt':pd.DatetimeIndex
        - 'date': datetime.date array
    end_include: bool
        最終日も含めて出力するか
    """
    assert isinstance(start_date, datetime.date) and isinstance(end_date, datetime.date)
    # 返り値の形式の指定
    return_as_set = {"dt", "date"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))
    
    # datetime.dateをpd.Timestampに変換(datetime.dateは通常pd.DatetimeIndexと比較できないため)
    start_timestamp = pd.Timestamp(start_date)
    end_timestamp = pd.Timestamp(end_date)
    
    # 期間中のholidayを取得
    holidays_in_span_index = (start_timestamp<=option.holidays_datetimeindex)&(option.holidays_datetimeindex<end_timestamp)  # DatetimeIndexを使うことに注意
    holidays_in_span_datetimeindex = option.holidays_datetimeindex[holidays_in_span_index]  # pd.DatetimeIndexを使う

    # 期間中のdatetimeのarrayを取得
    if end_include:
        days_datetimeindex = pd.date_range(start=start_date, end=end_date, freq="D")  # 最終日も含める
    else:
        days_datetimeindex = pd.date_range(start=start_date, end=end_date-datetime.timedelta(days=1), freq="D")  # 最終日は含めない
    
    # 休日に含まれないもの，さらに休日曜日に含まれないもののboolインデックスを取得
    holiday_bool_array = days_datetimeindex.isin(holidays_in_span_datetimeindex)  # 休日であるかのブール
    
    days_weekday_array = days_datetimeindex.weekday.values
    holiday_weekday_each_bool_arrays = [days_weekday_array==weekday for weekday in option.holiday_weekdays]  # inを使うのを回避
    holiday_weekday_bool_array = np.logical_or.reduce(holiday_weekday_each_bool_arrays)  # 休日曜日
    
    not_workdays_bool_array = holiday_bool_array | holiday_weekday_bool_array  # 休日あるいは休日曜日
    
    not_workdays_datetimeindex = days_datetimeindex[not_workdays_bool_array].copy()
    if return_as=="dt":
        return not_workdays_datetimeindex
    elif return_as=="date":
        return not_workdays_datetimeindex.date

## 指定した日数後の営業日を取得 

In [185]:
def get_next_workday_jp(select_date, days=1, select_include=False, return_as="date"):
    """
    指定した日数後の営業日を取得
    select_date: datetime.date
        指定する日時
    days: int
        日数
    return_as: str, defalt: 'dt'
        返り値の形式
        - 'dt':pd.Timstamp
        - 'datetime': datetime.datetime array
    """
    assert isinstance(select_date, datetime.date)
    # 返り値の形式の指定
    return_as_set = {"dt", "date"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))
        
    day_counter = 0
    holiday_weekdays_set = set(option.holiday_weekdays)  #setにした方が高速？

    holiday_bigger_select_index = (option.holidays_date_array<=select_date).sum()
    # 祝日イテレータ
    holiday_iter = iter(option.holidays_date_array[holiday_bigger_select_index:])
    def days_gen(select_date):
        add_days = 1  # select_dateを含まない
        while True:
            yield select_date + datetime.timedelta(days=add_days)
            add_days += 1
    # 日にちイテレータ
    days_iter = days_gen(select_date)

    # 以下二つのイテレーターを比較し，one_dayが休日に含まれる場合，カウントしカウントが指定に達した場合終了する
    one_day = next(days_iter)

    one_holiday = next(holiday_iter)

    while True:
        if one_day==one_holiday:  #その日が祝日である
            one_holiday = next(holiday_iter)
        else:
            if not one_day.weekday() in holiday_weekdays_set:  #その日が休日曜日である
                #print(one_day)
                day_counter += 1  # カウンターをインクリメント

        if day_counter >= days:
            break

        one_day = next(days_iter)
        
    if return_as=="date":
        return one_day
    elif return_as=="dt":
        return pd.Timestamp(one_day)

In [186]:
def temp_func():
    select_date = datetime.date(2021, 1, 1)
    next_workday = get_next_workday_jp(select_date, days=43, return_as="date")
    
from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
#prf.add_module()                                                                                          
prf.add_function(get_next_workday_jp)                                                                                      
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0.0031899 s
File: <ipython-input-185-4a0fcab84af8>
Function: get_next_workday_jp at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def get_next_workday_jp(select_date, days=1, select_include=False, return_as="date"):
     2                                               """
     3                                               select_date: datetime.date
     4                                                   指定する日時
     5                                               return_as: str, defalt: 'dt'
     6                                                   返り値の形式
     7                                                   - 'dt':pd.Timstamp
     8                                                   - 'datetime': datetime.datetime array
     9                                               """
    10         1        214.0    214.0      0.7      assert isinstance(select_date, datetime.date)
   

## データフレームに関しての処理 

### 営業日データを取得 

In [103]:
def extract_workdays_jp_index(dt_index, return_as="index"):
    """
    pd.DatetimeIndexから，営業日のデータのものを抽出
    dt_index: pd.DatetimeIndex
        入力するDatetimeIndex，すでにdatetimeでソートしていることが前提
    return_as: str
        出力データの形式
        - "index": 引数としたdfの対応するインデックスを返す
        - "bool": 引数としたdfに対応するboolインデックスを返す
    """
    # 返り値の形式の指定
    return_as_set = {"index", "bool"}
    if not return_as in return_as_set:
        raise Exception("return_as must be any in {}".format(return_as_set))
        
    # すでにtimestampでソートさてている前提
    start_datetime = dt_index[0].to_pydatetime()
    end_datetime = dt_index[-1].to_pydatetime()
    
    start_datetime, end_datetime = check_jst_datetimes_to_naive(start_datetime, end_datetime)  # 二つのdatetimeのタイムゾーンをチェック・naiveに変更
    
    # 期間内のholidayを取得
    holidays_in_span_index = ((start_datetime-datetime.timedelta(days=1))<option.holidays_datetimeindex)&\
    (option.holidays_datetimeindex<=end_datetime)  # DatetimeIndexを使うことに注意, 当日を含めるため，startから1を引いている．
    holidays_in_span_datetimeindex = option.holidays_datetimeindex[holidays_in_span_index]  # pd.DatetimeIndexを使う
    
    # 休日に含まれないもの，さらに土日に含まれないもののboolインデックスを取得    
    holiday_bool_array = dt_index.floor("D").isin(holidays_in_span_datetimeindex)  # 休日
    
    dt_index_weekday = dt_index.weekday
    holiday_weekday_each_bool_arrays = [dt_index_weekday==weekday for weekday in option.holiday_weekdays]  # inを使うのを回避
    holiday_weekday_bool_array = np.logical_or.reduce(holiday_weekday_each_bool_arrays)  # 休日曜日
    
    workdays_bool_array = (~holiday_bool_array)&(~holiday_weekday_bool_array)  # 休日でなく休日曜日でない
    if return_as=="bool":  # boolで返す場合
        return workdays_bool_array
    
    elif return_as=="index":  # indexで返す場合
        workdays_df_indice = dt_index[workdays_bool_array]
        return workdays_df_indice

In [105]:
def temp_func():
    extracted_stock_df = extract_workdays_jp(aware_stock_df, return_as="df")
    
from line_profiler import LineProfiler
prf = LineProfiler()                                                                                         
#prf.add_module()                                                                                          
prf.add_function(extract_workdays_jp)
prf.add_function(extract_workdays_jp_index)
prf.runcall(temp_func)                                                                                          
prf.print_stats()

Timer unit: 1e-07 s

Total time: 0.0806603 s
File: <ipython-input-103-560262c406e5>
Function: extract_workdays_jp_index at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def extract_workdays_jp_index(dt_index, return_as="index"):
     2                                               """
     3                                               pd.DatetimeIndexから，営業日のデータのものを抽出
     4                                               dt_index: pd.DatetimeIndex
     5                                                   入力するDatetimeIndex，すでにdatetimeでソートしていることが前提
     6                                               return_as: str
     7                                                   出力データの形式
     8                                                   - "index": 引数としたdfの対応するインデックスを返す
     9                                                   - "bool": 引数としたdfに対応するboolインデックスを返す
    10                                               """
