In [None]:
from workalendar.europe import *
import requests
from datetime import datetime, timedelta, date
import json
import itertools
from typing import List, Union
from typing_extensions import TypedDict, TypeAlias
from enum import Enum
import calendar

In [None]:
calendarific_api_key = "2a7fb01b189f3d6735a2ab8d797b7a12dcba03d9"

def get_public_holidays_by_country(country_code):
    current_year = datetime.now().year
    current_year_str = [current_year - 1, current_year, current_year + 1]
    public_holidays = []

    try:
        for year in current_year_str:
            url = f"https://calendarific.com/api/v2/holidays?&api_key={calendarific_api_key}&country={country_code}&year={year}&type=national"
            res = requests.get(url)
            if (res.status_code == 200):
                public_holidays += [{'date': holiday['date']['datetime'], 'country': holiday['country']['id'], 'holiday_name': holiday['name'], 'holiday_description': holiday['description']} for holiday in res.json()['response']['holidays']]
            else:
                raise Exception(res.reason)
    except Exception as error:
        print(error)

    return public_holidays

In [None]:
class DateDict(TypedDict, total=True):
  year: int
  month: int
  day: int

class ApiObject(TypedDict):
  date: DateDict
  country: str
  holiday_name: str
  holiday_description: str


class EventFrequency(Enum):
    Monthly = 'Monthly'
    Annually = 'Annually'
    Quaterly = 'Quarterly'
    SemiAnually = 'SemiAnually'


class SkipDateReason(TypedDict):
   holiday_name: str
   country: str
   description: str
   date: datetime

class FinalDate(TypedDict):
  date: datetime
  reasons: List[SkipDateReason]

class IsBusinessDayReturn(TypedDict): 
  is_business_day: bool
  reason: SkipDateReason

class DateTimeEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (datetime, date)):
            return obj.isoformat()
        return super().default(obj)


ApiObjectCollection: TypeAlias = List[ApiObject]

In [None]:
def is_business_day(date: datetime, holidays: ApiObjectCollection) -> IsBusinessDayReturn:
    if date.weekday() >= 5:
        return {
            'is_business_day': False,
            'reason': {
                'holiday_name': 'Weekend',
                'country': 'Global',
                'date': date
            }
        }
    for holiday in holidays:
        if date.date() == datetime(**holiday['date']).date():
            return {
                'is_business_day': False,
                'reason': {
                    'holiday_name': holiday['holiday_name'],
                    'country': holiday['country'],
                    'date': date.date(),
                    'description': holiday['holiday_description']
                }
            }
    return {
        'is_business_day': True
    }

def find_business_date(date: datetime, public_holidays: ApiObjectCollection, days_before: int) -> FinalDate:
    count = 0
    reasons: List[SkipDateReason] = []
    while count < days_before:
        date -= timedelta(days=1)
        is_business_day_info = is_business_day(date, public_holidays)
        if is_business_day_info['is_business_day']:
            count += 1
        else:
            reasons.append(is_business_day_info['reason'])

    return {
        'date': date,
        'reasons': reasons 
    }

def find_common_business_date(event_cut_off_date: FinalDate, global_holidays: ApiObjectCollection) -> FinalDate:
    reasons: List[SkipDateReason] = []
    while True:
        is_business_day_info = is_business_day(event_cut_off_date['date'], global_holidays)
        if (is_business_day_info['is_business_day']):
            return {
                'date': event_cut_off_date['date'],
                'reasons': event_cut_off_date['reasons']
            }
        event_cut_off_date['date'] -= timedelta(days=1)
        event_cut_off_date['reasons'].append(is_business_day_info['reason'])

def find_valid_cut_off_date(cut_off_date: datetime, days_before: int, fund_origin_country: str, countries_involved: List[str] ):
    country_holidays_map = {}
    all_countries = [fund_origin_country] + countries_involved

    for country_code in all_countries:
        country_holidays_map[country_code] = get_public_holidays_by_country(country_code)

    countries_involved_holidays = list(itertools.chain(*[country_holidays_map[it] for it in all_countries]))

    fund_cut_off_date = find_business_date(cut_off_date, country_holidays_map[fund_origin_country], days_before)
    fund_common_cut_off_date = find_common_business_date(fund_cut_off_date['date'], countries_involved_holidays)
    return fund_common_cut_off_date

def get_interval_dates(year: int, interval: EventFrequency, first_date=True):
    start_date = datetime(year, 1 ,1)
    _, end_day = calendar.monthrange(year, 12)
    end_date = datetime(year, 12, end_day)

    match interval:
        case EventFrequency.Quaterly:
            dates = []
            current_date = start_date
            while current_date <= end_date:
                if first_date:
                    dates.append(current_date)
                current_date += timedelta(days=91)
                if current_date <= end_date:
                    dates.append(current_date) if not first_date else None
            return dates

        case EventFrequency.Monthly:
            dates = [datetime(year, month, 1) for month in range(1, 13)]
            return dates if first_date else [datetime(year, month, calendar.monthrange(year, month)[1]) for month in range(1, 13)]

        case EventFrequency.Annually:
            return [start_date] if first_date else [end_date]

        case EventFrequency.SemiAnually:
            dates = []
            current_date = start_date
            while current_date <= end_date:
                if first_date:
                    dates.append(current_date)
                current_date += timedelta(days=182)
                if current_date <= end_date:
                    dates.append(current_date) if not first_date else None
            return dates
        case _:
            return []


def get_countries_holidays_map(countries: List[str]):
    countries_holidays_map = {}

    for country_code in countries:
        countries_holidays_map[country_code] = get_public_holidays_by_country(country_code)

    return countries_holidays_map
    # result = list(itertools.chain(*[countries_holidays_map[it] for it in countries]))
    # return result

def find_available_dates(
        days_before: int, 
        countries_involved: List[str], 
        subscription_frequency: EventFrequency, 
        first_date: bool,
        year: int
):
    countries_involved_holidays = get_countries_holidays_map(countries=countries_involved)
    countries_holidays_map = get_countries_holidays_map(countries=countries_involved)
    countries_involved_holidays = list(itertools.chain(*[countries_holidays_map[it] for it in countries_involved]))
    dates = get_interval_dates(year=year, interval=subscription_frequency, first_date=first_date)
    
    available_dates = []
    for date in dates:
        fund_cut_off_date = find_business_date(date=date, public_holidays=countries_holidays_map[countries_involved[0]], days_before=days_before)
        available_dates.append(find_common_business_date(event_cut_off_date=fund_cut_off_date, global_holidays=countries_involved_holidays))
    print(available_dates)

    return available_dates


In [None]:
test_date = datetime(2023, 11, 13)
date = find_valid_cut_off_date(
    cut_off_date=test_date,
    days_before=1,
    fund_origin_country='HK',
    countries_involved=['US']
)
print('date lei')
print(date)

In [None]:
datessss = find_available_dates(
    days_before=7, 
    countries_involved=['HK', 'US'], 
    subscription_frequency=EventFrequency.Monthly, 
    first_date=True,
    year=2023   
)
print('date lei')
print(datessss)

In [None]:
# json_data = json.dumps(datessss, indent=2, cls=DateTimeEncoder)
from IPython.display import JSON
datessss

for date in datessss:
  if not isinstance(date['date'], str):
    date['date'] = date['date'].isoformat()
  for reason in date['reasons']:
    if not isinstance(reason['date'], str):
      reason['date'] = reason['date'].isoformat()
    
json_data = json.dumps(datessss)
JSON(json_data)
json_data
# display(JSON(json_data))