In [2]:
from croniter import croniter

In [None]:
from croniter import croniter
from datetime import datetime
base = datetime.now()
iter = croniter('*/5 * * * *', base)  # every 5 minutes


2024-12-22 11:30:00


In [85]:
wc = WorkingDayCroniter("0 0 * * 0 *", datetime.now())

ValueError: Cron expression must have exactly 5 parts.

In [84]:
wc.get_next()

datetime.datetime(2024, 12, 29, 0, 0)

In [None]:
from datetime import datetime, timedelta
import calendar
from typing import List, Dict, Optional
from collections import Counter
from croniter import croniter


class WorkingDayCroniter:
    def __init__(self, expr: str, base: datetime, holidays: Optional[List[datetime]] = None):
        """
        Initialize the WorkingDayCroniter instance.

        :param expr: A cron expression, supporting 'W' for the nth working days of the month.
        :param base: The base datetime to calculate from.
        :param holidays: A list of dates that are considered holidays.
        """
        self.expr = expr
        self.base = base
        self.holidays = set(holidays or [])  # Use a set for faster lookups
        self.last_date = None  # Track the last returned working day
        self._validate_expression(expr)

        self._is_working_day_expr = "W" in expr
        self._cron_iter = None if self._is_working_day_expr else croniter(expr, base)

    def _validate_expression(self, expr: str):
        """
        Validate the cron expression.
        """
        parts = expr.split()
        if len(parts) != 5:
            raise ValueError("Cron expression must have exactly 5 parts.")

        for part in parts[2].split(","):
            if "W" in part:
                try:
                    int(part.replace("W", ""))
                except ValueError:
                    raise ValueError(f"Invalid working day number in expression: {part}")

    def get_next(self, date_class=datetime) -> datetime:
        """
        Get the next occurrence based on the custom cron expression.
        If not dealing with working days, delegate to croniter's `get_next`.
        """
        if not self._is_working_day_expr:
            return self._cron_iter.get_next(date_class)

        # Custom logic for 'W'
        parts = self.expr.split()
        day_of_month = parts[2]
        if "W" in day_of_month:
            parts[2] = "*"  # Replace 'W' with '*' for croniter to iterate normally.
        cron_expr = " ".join(parts)

        working_days = self._parse_working_days(day_of_month)
        normal_days = self._parse_normal_days(day_of_month)

        iter_base = croniter(cron_expr, self.last_date or self.base)
        while True:
            candidate_date = iter_base.get_next(date_class)
            if self._is_valid_working_day(candidate_date, working_days):
                self.last_date = candidate_date
                return candidate_date
            elif self._is_valid_normal_day(candidate_date, normal_days):
                self.last_date = candidate_date
                return candidate_date

    def _parse_working_days(self, day_of_month: str) -> List[int]:
        """
        Parse the day-of-month field to extract multiple working days.

        :param day_of_month: The day-of-month field from the cron expression.
        :return: A list of working day numbers (e.g., [3, 5] for "3W,5W").
        """
        working_days = []
        for part in day_of_month.split(","):
            if "W" in part:
                working_days.append(int(part.replace("W", "")))
        return working_days
    
    def _parse_normal_days(self, day_of_month: str) -> List[int]:
        """
        Parse the day-of-month field to extract multiple working days.

        :param day_of_month: The day-of-month field from the cron expression.
        :return: A list of working day numbers (e.g., [3, 5] for "3W,5W").
        """
        normal_days = []
        for part in day_of_month.split(","):
            if part != "*" and "W" not in part:
                normal_days.append(int(part))
        return normal_days

    def _is_valid_working_day(self, date: datetime, working_days: List[int]) -> bool:
        """
        Check if the given date matches the 'W' logic for any specified working day.

        :param date: The date to check.
        :param working_days: A list of nth working days to match against.
        :return: True if the date matches any of the specified working days, False otherwise.
        """
        if date.weekday() >= 5 or date in self.holidays:
            return False

        nth_working_day = self._get_nth_working_day(date)
        return nth_working_day in working_days
    
    def _is_valid_normal_day(self, date: datetime, normal_days: List[int]) -> bool:
        """
        Check if the given date matches the normal day logic.

        :param date: The date to check.
        :param normal_days: A list of days of the month to match against.
        :return: True if the date matches any of the specified normal days, False otherwise.
        """
        return date.day in normal_days

    def _get_nth_working_day(self, date: datetime) -> int:
        """
        Determine the nth working day of the month for a given date.

        :param date: The date to evaluate.
        :return: The nth working day of the month.
        """
        month_start = date.replace(day=1)
        nth_working_day = 0

        for day in range(1, date.day + 1):
            try:
                candidate = month_start.replace(day=day)
            except ValueError:
                break  # Reached the end of the month

            if candidate.weekday() < 5 and candidate not in self.holidays:
                nth_working_day += 1
                if candidate == date:
                    return nth_working_day
        return 0


class ProcessExecutionAnalyzer:
    def __init__(self, historical_data: List[datetime], holidays: List[datetime] = None):
        self.historical_data = sorted(set(historical_data))
        self.holidays = set(holidays) if holidays else set()

    def detect_pattern(self) -> Dict[str, any]:
        self.weekday_count = self._count_by_weekday()
        self.day_of_month_count = self._count_by_day_of_month()
        self.working_day_count = self._count_by_working_day()

        # Generate all possible cron expressions
        cron_expressions = self._generate_cron_expressions(
            self.weekday_count, self.day_of_month_count, self.working_day_count
        )

        # Evaluate accuracy for each cron expression
        best_cron = None
        highest_accuracy = 0
        excludes_holidays = True

        for cron_expr in cron_expressions:
            accuracy = self._evaluate_prediction_accuracy(cron_expr, holiday=False)
            print(f"cron: {cron_expr}, accuracy: {accuracy}")
            if accuracy > highest_accuracy:
                best_cron = cron_expr
                highest_accuracy = accuracy

        for cron_expr in cron_expressions:
            accuracy = self._evaluate_prediction_accuracy(cron_expr, holiday=True)
            print(f"cron: {cron_expr}, accuracy: {accuracy}")
            if accuracy > highest_accuracy:
                best_cron = cron_expr
                highest_accuracy = accuracy
                excludes_holidays  = False

        return {
            "best_cron_expression": best_cron,
            "highest_accuracy": highest_accuracy,
            "excludes_holidays": excludes_holidays,
        }

    def _count_by_weekday(self) -> Dict[int, int]:
        weekday_count = Counter(date.weekday() for date in self.historical_data)
        max_count = max(weekday_count.values(), default=0)
        for weekday in list(weekday_count):
            if weekday_count[weekday] * 1.5 < max_count:
                del weekday_count[weekday]
        return weekday_count

    def _count_by_day_of_month(self) -> Dict[int, int]:
        day_of_month_count = Counter(date.day for date in self.historical_data)
        max_count = max(day_of_month_count.values(), default=0)
        for day_of_month in list(day_of_month_count):
            if day_of_month_count[day_of_month] * 1.5 < max_count:
                del day_of_month_count[day_of_month]
        return day_of_month_count

    def _count_by_working_day(self) -> Dict[int, int]:
        working_day_count = Counter()
        for date in self.historical_data:
            year, month = date.year, date.month
            working_days = self._get_working_days(year, month)
            if date in working_days:
                index = working_days.index(date) + 1
                working_day_count[index] += 1

        max_count = max(working_day_count.values(), default=0)
        for working_day in list(working_day_count):
            if working_day_count[working_day] * 1.5 < max_count:
                del working_day_count[working_day]

        return working_day_count

    def _generate_cron_expressions(self, weekday_count: Dict[int, int], day_of_month_count: Dict[int, int], working_day_count: Dict[int, int]) -> List[str]:
        crons = []

        # Daily execution
        crons.append("0 0 * * *")

        # Weekly execution based on weekdays
        weekdays = [str(day) for day, count in weekday_count.items() if count > 0]
        if weekdays:
            crons.append(f"0 0 * {','.join(weekdays)} *")

        # Monthly execution based on specific days of the month
        days = [str(day) for day, count in day_of_month_count.items() if count > 0]
        if days:
            crons.append(f"0 0 {','.join(days)} * *")

        # Monthly execution based on working days
        working_days = [f"{day}W" for day, count in working_day_count.items() if count > 0]
        if working_days:
            crons.append(f"0 0 {','.join(working_days)} * *")

        return crons

    def _evaluate_prediction_accuracy(self, cron_expr: str, holiday) -> float:
        if holiday == True:
            cron = WorkingDayCroniter(cron_expr, base=self.historical_data[0] - timedelta(hours=1), holidays=self.holidays)
        else:
            cron = WorkingDayCroniter(cron_expr, base=self.historical_data[0] - timedelta(hours=1))

        predicted_dates = set()
        for _ in range(len(self.historical_data)):
            next_date = cron.get_next(datetime)
            if next_date in self.holidays:
                continue  # Skip holidays if they are excluded
            predicted_dates.add(next_date)

        return len(predicted_dates.intersection(self.historical_data)) / len(self.historical_data)

    def _get_working_days(self, year: int, month: int) -> List[datetime]:
        working_days = []
        _, last_day = calendar.monthrange(year, month)
        for day in range(1, last_day + 1):
            try:
                date = datetime(year, month, day)
                if date.weekday() < 5 and date not in self.holidays:
                    working_days.append(date)
            except ValueError:
                continue
        return working_days


In [76]:
from datetime import datetime, timedelta
import calendar
from typing import List, Dict, Optional
from collections import Counter
from croniter import croniter
from functools import lru_cache


class WorkingDayCroniter:
    def __init__(self, expr: str, base: datetime, holidays: Optional[List[datetime]] = None):
        """
        Initialize the WorkingDayCroniter instance.

        :param expr: A cron expression, supporting 'W' for the nth working days of the month.
        :param base: The base datetime to calculate from.
        :param holidays: A list of dates that are considered holidays.
        """
        self.expr = expr
        self.base = base
        self.holidays = set(holidays or [])  # Use a set for faster lookups
        self.last_date = None  # Track the last returned working day
        self._validate_expression(expr)

        self._is_working_day_expr = "W" in expr
        self._cron_iter = None if self._is_working_day_expr else croniter(expr, base)

    def _validate_expression(self, expr: str):
        """
        Validate the cron expression.
        """
        parts = expr.split()
        if len(parts) != 5:
            raise ValueError("Cron expression must have exactly 5 parts.")

        for part in parts[2].split(","):
            if "W" in part:
                try:
                    int(part.replace("W", ""))
                except ValueError:
                    raise ValueError(f"Invalid working day number in expression: {part}")

    def get_next(self, date_class=datetime) -> datetime:
        """
        Get the next occurrence based on the custom cron expression.
        Handles both standard cron logic and 'W' working day logic.
        """
        if not self._is_working_day_expr:
            return self._cron_iter.get_next(date_class)

        cron_expr = self._get_base_cron_expr()
        working_days = self._parse_working_days(self.expr.split()[2])
        normal_days = self._parse_normal_days(self.expr.split()[2])
        iter_base = croniter(cron_expr, self.last_date or self.base)

        max_iterations = 1500  # To prevent infinite loops in case of invalid expressions
        for _ in range(max_iterations):
            candidate_date = iter_base.get_next(date_class)
            if self._matches_working_day(candidate_date, working_days) or \
               self._matches_normal_day(candidate_date, normal_days):
                self.last_date = candidate_date
                return candidate_date

        raise RuntimeError("Exceeded maximum iterations while finding the next valid date.")

    def _get_base_cron_expr(self) -> str:
        """
        Replace 'W' in the cron expression with '*' to get a base expression for iteration.
        """
        parts = self.expr.split()
        if "W" in parts[2]:
            parts[2] = "*"
        return " ".join(parts)

    def _parse_working_days(self, day_of_month: str) -> List[int]:
        """
        Parse the day-of-month field to extract multiple working days.

        :param day_of_month: The day-of-month field from the cron expression.
        :return: A list of working day numbers (e.g., [3, 5] for "3W,5W").
        """
        return [int(part.replace("W", "")) for part in day_of_month.split(",") if "W" in part]

    def _parse_normal_days(self, day_of_month: str) -> List[int]:
        """
        Parse the day-of-month field to extract regular days.

        :param day_of_month: The day-of-month field from the cron expression.
        :return: A list of regular day numbers.
        """
        return [int(part) for part in day_of_month.split(",") if part.isdigit()]

    def _matches_working_day(self, date: datetime, working_days: List[int]) -> bool:
        """
        Check if the date matches the working day criteria.
        """
        if date.weekday() >= 5 or date in self.holidays:
            return False
        return self._get_nth_working_day(date) in working_days

    def _matches_normal_day(self, date: datetime, normal_days: List[int]) -> bool:
        """
        Check if the date matches the normal day criteria.
        """
        return date.day in normal_days

    def _get_nth_working_day(self, date: datetime) -> int:
        """
        Determine the nth working day of the month for a given date.
        """
        month_start = date.replace(day=1)
        nth_working_day = 0

        for day in range(1, date.day + 1):
            try:
                candidate = month_start.replace(day=day)
            except ValueError:
                break  # Reached the end of the month

            if candidate.weekday() < 5 and candidate not in self.holidays:
                nth_working_day += 1
                if candidate == date:
                    return nth_working_day
        return 0


class ProcessExecutionAnalyzer:
    def __init__(self, historical_data: List[datetime], holidays: List[datetime] = None):
        self.historical_data = sorted(set(historical_data))
        self.holidays = set(holidays) if holidays else set()

    def detect_pattern(self) -> Dict[str, any]:
        weekday_count = self._count_by_weekday()
        day_of_month_count = self._count_by_day_of_month()
        working_day_count = self._count_by_working_day()

        cron_expressions = self._generate_cron_expressions(weekday_count, day_of_month_count, working_day_count)

        best_cron = None
        highest_accuracy = 0
        includes_holidays = False

        for cron_expr in cron_expressions:
            accuracy = self._evaluate_prediction_accuracy(cron_expr, holiday=False)
            print(f"cron: {cron_expr}, accuracy: {accuracy}")
            if accuracy > highest_accuracy:
                best_cron = cron_expr
                highest_accuracy = accuracy

        for cron_expr in cron_expressions:
            accuracy = self._evaluate_prediction_accuracy(cron_expr, holiday=True)
            print(f"cron: {cron_expr}, accuracy: {accuracy}")
            if accuracy > highest_accuracy:
                best_cron = cron_expr
                highest_accuracy = accuracy
                includes_holidays = True

        return {
            "best_cron_expression": best_cron,
            "highest_accuracy": highest_accuracy,
            "includes_holidays": includes_holidays,
        }

    def _count_by_weekday(self) -> Dict[int, int]:
        weekday_count = Counter(date.weekday() for date in self.historical_data)
        max_count = max(weekday_count.values(), default=0)
        return {day: count for day, count in weekday_count.items() if count * 1.5 >= max_count}

    def _count_by_day_of_month(self) -> Dict[int, int]:
        day_of_month_count = Counter(date.day for date in self.historical_data)
        max_count = max(day_of_month_count.values(), default=0)
        return {day: count for day, count in day_of_month_count.items() if count * 1.5 >= max_count}

    def _count_by_working_day(self) -> Dict[int, int]:
        working_day_count = Counter()
        for date in self.historical_data:
            year, month = date.year, date.month
            working_days = self._get_working_days(year, month)
            if date in working_days:
                index = working_days.index(date) + 1
                working_day_count[index] += 1

        max_count = max(working_day_count.values(), default=0)
        return {day: count for day, count in working_day_count.items() if count * 1.5 >= max_count}

    def _generate_cron_expressions(self, weekday_count: Dict[int, int], day_of_month_count: Dict[int, int], working_day_count: Dict[int, int]) -> List[str]:
        crons = []

        crons.append("0 0 * * *")  # Daily execution

        weekdays = [str(day) for day, count in weekday_count.items() if count > 0]
        if weekdays:
            crons.append(f"0 0 * {','.join(weekdays)} *")

        days = [str(day) for day, count in day_of_month_count.items() if count > 0]
        if days:
            crons.append(f"0 0 {','.join(days)} * *")

        working_days = [f"{day}W" for day, count in working_day_count.items() if count > 0]
        if working_days:
            crons.append(f"0 0 {','.join(working_days)} * *")

        return crons

    def _evaluate_prediction_accuracy(self, cron_expr: str, holiday: bool) -> float:
        try:
            cron = WorkingDayCroniter(
                cron_expr, base=self.historical_data[0] - timedelta(hours=1), holidays=self.holidays if holiday else None
            )
            
            predicted_dates = set()
            for _ in range(len(self.historical_data)):
                next_date = cron.get_next(datetime)
                if holiday and next_date in self.holidays:
                    continue  # Skip holidays if they are excluded
                predicted_dates.add(next_date)

            return len(predicted_dates.intersection(self.historical_data)) / len(self.historical_data)
        except Exception as e:
            return 0.0

    @lru_cache(None)
    def _get_working_days(self, year: int, month: int) -> List[datetime]:
        working_days = []
        _, last_day = calendar.monthrange(year, month)
        for day in range(1, last_day + 1):
            try:
                date = datetime(year, month, day)
                if date.weekday() < 5 and date not in self.holidays:
                    working_days.append(date)
            except ValueError:
                continue
        return working_days


In [None]:
import unittest
from datetime import datetime, timedelta


class TestWorkingDayCroniter(unittest.TestCase):
    def setUp(self):
        self.holidays = [
            datetime(2024, 1, 1),  # New Year's Day
            datetime(2024, 12, 25),  # Christmas
            datetime(2024, 7, 4)  # Independence Day
        ]
        self.base_date = datetime(2024, 1, 1)

    def test_get_next_working_day(self):
        cron = WorkingDayCroniter("1W", self.base_date, holidays=self.holidays)
        results = []
        for _ in range(5):
            results.append(cron.get_next(datetime))
        self.assertEqual(len(results), 5)
        self.assertTrue(all(res.weekday() < 5 and res not in self.holidays for res in results))

    def test_get_next_normal_day(self):
        cron = WorkingDayCroniter("15", self.base_date, holidays=self.holidays)
        results = []
        for _ in range(5):
            results.append(cron.get_next(datetime))
        self.assertEqual(len(results), 5)
        self.assertTrue(all(res.day == 15 for res in results))

    def test_combined_working_and_normal_days(self):
        cron = WorkingDayCroniter("15,1W", self.base_date, holidays=self.holidays)
        results = []
        for _ in range(10):
            results.append(cron.get_next(datetime))
        self.assertEqual(len(results), 10)
        self.assertTrue(all(
            (res.day == 15 or (res.weekday() < 5 and res not in self.holidays)) for res in results
        ))

class TestProcessExecutionAnalyzer(unittest.TestCase):
    def setUp(self):
        self.historical_data = [
            datetime(2024, 1, 2), datetime(2024, 1, 3), datetime(2024, 1, 4), datetime(2024, 1, 5),
            datetime(2024, 2, 1), datetime(2024, 2, 2), datetime(2024, 2, 5), datetime(2024, 2, 6),
            datetime(2024, 2, 22), # Added trash
            datetime(2024, 3, 1), datetime(2024, 3, 4), datetime(2024, 3, 5), # Removed entry so simulate failed execution
            datetime(2024, 3, 24), # Added trash
            datetime(2024, 4, 1), datetime(2024, 4, 2), datetime(2024, 4, 3), datetime(2024, 4, 4),
        ]
        self.holidays = [
            datetime(2024, 1, 1), datetime(2024, 12, 25), datetime(2024, 7, 4)
        ]

    def test_detect_pattern(self):
        analyzer = ProcessExecutionAnalyzer(self.historical_data, self.holidays)
        pattern = analyzer.detect_pattern()
        self.assertIn("best_cron_expression", pattern)
        self.assertGreater(pattern["highest_accuracy"], 0.5)

    def test_evaluate_prediction_accuracy(self):
        analyzer = ProcessExecutionAnalyzer(self.historical_data, self.holidays)
        cron_expr = "0 0 2 * *"  # 2nd day of every month
        accuracy = analyzer._evaluate_prediction_accuracy(cron_expr, holiday=True)
        self.assertGreater(accuracy, 0.5)

    def test_count_by_weekday(self):
        analyzer = ProcessExecutionAnalyzer(self.historical_data, self.holidays)
        weekday_count = analyzer._count_by_weekday()
        self.assertGreater(len(weekday_count), 0)
        self.assertTrue(all(isinstance(key, int) for key in weekday_count))

    def test_count_by_day_of_month(self):
        analyzer = ProcessExecutionAnalyzer(self.historical_data, self.holidays)
        day_of_month_count = analyzer._count_by_day_of_month()
        self.assertGreater(len(day_of_month_count), 0)
        self.assertTrue(all(isinstance(key, int) for key in day_of_month_count))

    def test_count_by_working_day(self):
        analyzer = ProcessExecutionAnalyzer(self.historical_data, self.holidays)
        working_day_count = analyzer._count_by_working_day()
        self.assertGreater(len(working_day_count), 0)
        self.assertTrue(all(isinstance(key, int) for key in working_day_count))




In [None]:
historical_data = [
    datetime(2024, 1, 2), datetime(2024, 1, 3), datetime(2024, 1, 4), datetime(2024, 1, 5),
    datetime(2024, 2, 1), datetime(2024, 2, 2), datetime(2024, 2, 5), datetime(2024, 2, 6),
    datetime(2024, 2, 22), # Added trash
    datetime(2024, 3, 1), datetime(2024, 3, 4), datetime(2024, 3, 5), # Removed entry so simulate failed execution
    datetime(2024, 3, 24), # Added trash
    datetime(2024, 4, 1), datetime(2024, 4, 2), datetime(2024, 4, 3), datetime(2024, 4, 4),
]
holidays = [
    datetime(2024, 1, 1), datetime(2024, 12, 25), datetime(2024, 7, 4)
]


analyzer = ProcessExecutionAnalyzer(historical_data, holidays)
pattern = analyzer.detect_pattern()


cron: 0 0 * * *, accuracy: 0.23529411764705882
cron: 0 0 * 1,3,4,0 *, accuracy: 0.0
cron: 0 0 2,3,4,5,1 * *, accuracy: 0.7647058823529411
cron: 0 0 1W,2W,3W,4W * *, accuracy: 0.8235294117647058
cron: 0 0 * * *, accuracy: 0.23529411764705882
cron: 0 0 * 1,3,4,0 *, accuracy: 0.0
cron: 0 0 2,3,4,5,1 * *, accuracy: 0.7647058823529411
cron: 0 0 1W,2W,3W,4W * *, accuracy: 0.8823529411764706


In [74]:
pattern

{'best_cron_expression': '0 0 1W,2W,3W,4W * *',
 'highest_accuracy': 0.8823529411764706,
 'includes_holidays': True}

In [47]:
historical_data = [
            datetime(2025, 1, 1), datetime(2025, 1, 15),
            datetime(2025, 2, 1), datetime(2025, 2, 15),
            datetime(2025, 3, 1), datetime(2025, 3, 15)
        ]
analyzer = ProcessExecutionAnalyzer(historical_data)
result = analyzer.detect_pattern()


cron: 0 0 * * *, accuracy: 0.16666666666666666
cron: 0 0 * 5 *, accuracy: 0.0
cron: 0 0 1,15 * *, accuracy: 1.0
cron: 0 0 1W,11W * *, accuracy: 0.3333333333333333
cron: 0 0 * * *, accuracy: 0.16666666666666666
cron: 0 0 * 5 *, accuracy: 0.0
cron: 0 0 1,15 * *, accuracy: 1.0
cron: 0 0 1W,11W * *, accuracy: 0.3333333333333333


In [43]:
result

{'best_cron_expression': '0 0 1,15 * *',
 'highest_accuracy': 1.0,
 'excludes_holidays': True}

In [199]:
import unittest
from datetime import datetime

class TestProcessExecutionAnalyzer(unittest.TestCase):
    def test_daily_execution(self):
        # Test case: Process runs every day
        historical_data = [
            datetime(2023, 1, 1), datetime(2023, 1, 2), datetime(2023, 1, 3),
            datetime(2023, 1, 4), datetime(2023, 1, 5)
        ]
        analyzer = ProcessExecutionAnalyzer(historical_data)
        result = analyzer.detect_pattern()
        self.assertEqual(result["pattern"], "daily")
        self.assertFalse(result["excludes_holidays"])

    def test_weekly_execution(self):
        # Test case: Process runs every Monday, Wednesday, and Friday
        historical_data = [
            datetime(2023, 1, 2), datetime(2023, 1, 4), datetime(2023, 1, 6),
            datetime(2023, 1, 9), datetime(2023, 1, 11)
        ]
        analyzer = ProcessExecutionAnalyzer(historical_data)
        result = analyzer.detect_pattern()
        self.assertEqual(result["pattern"], "weekly")

    def test_monthly_execution(self):
        # Test case: Process runs on the 1st and 15th of each month
        historical_data = [
            datetime(2023, 1, 1), datetime(2023, 1, 15),
            datetime(2023, 2, 1), datetime(2023, 2, 15),
            datetime(2023, 3, 1), datetime(2023, 3, 15)
        ]
        analyzer = ProcessExecutionAnalyzer(historical_data)
        result = analyzer.detect_pattern()
        self.assertEqual(result["pattern"], "monthly")

    def test_working_days_execution(self):
        # Test case: Process runs on the first working day of the month
        historical_data = [
            datetime(2023, 1, 2), datetime(2023, 2, 1), datetime(2023, 3, 1),
            datetime(2023, 4, 3), datetime(2023, 5, 1)
        ]
        analyzer = ProcessExecutionAnalyzer(historical_data)
        result = analyzer.detect_pattern()
        self.assertEqual(result["pattern"], "working_days")

    def test_excludes_holidays(self):
        # Test case: Process avoids holidays
        historical_data = [
            datetime(2023, 1, 2), datetime(2023, 1, 3), datetime(2023, 1, 4),
            datetime(2023, 1, 5), datetime(2023, 1, 6)
        ]
        holidays = [datetime(2023, 1, 6)]
        analyzer = ProcessExecutionAnalyzer(historical_data, holidays)
        result = analyzer.detect_pattern()
        self.assertEqual(result["pattern"], "daily")
        self.assertTrue(result["excludes_holidays"])

    def test_no_pattern(self):
        # Test case: Process has no discernible pattern
        historical_data = [
            datetime(2023, 1, 2), datetime(2023, 1, 5), datetime(2023, 1, 10),
            datetime(2023, 1, 20)
        ]
        analyzer = ProcessExecutionAnalyzer(historical_data)
        result = analyzer.detect_pattern()
        self.assertNotEqual(result["pattern"], "daily")
        self.assertNotEqual(result["pattern"], "weekly")
        self.assertNotEqual(result["pattern"], "monthly")
        self.assertNotEqual(result["pattern"], "working_days")

if __name__ == "__main__":
    unittest.main()


usage: ipykernel_launcher.py [-h] [-v] [-q] [--locals] [--durations N] [-f]
                             [-c] [-b] [-k TESTNAMEPATTERNS]
                             [tests ...]
ipykernel_launcher.py: error: argument -f/--failfast: ignored explicit argument 'c:\\Users\\marco_lmx85at\\AppData\\Roaming\\jupyter\\runtime\\kernel-v3ed3908620e19d44fd10d353df34df47fff83d97f.json'


AttributeError: 'tuple' object has no attribute 'tb_frame'

In [163]:
from datetime import datetime, timedelta
import calendar
from typing import List, Dict, Optional, Any
from collections import Counter
from croniter import croniter
from functools import lru_cache

class WorkingDayCroniter:
    def __init__(self, expr: str, base: datetime, holidays: Optional[List[datetime]] = None):
        """
        Initialize the WorkingDayCroniter instance.

        :param expr: A cron expression, supporting 'W' for the nth working days of the month.
        :param base: The base datetime to calculate from.
        :param holidays: A list of dates that are considered holidays.
        """
        self.expr = expr
        self.base = base
        self.holidays = set(holidays or [])  # Use a set for faster lookups
        self.last_date = None  # Track the last returned working day
        self._validate_expression(expr)

        self._is_working_day_expr = "W" in expr
        self._cron_iter = None if self._is_working_day_expr else croniter(expr, base)

    def _validate_expression(self, expr: str):
        """
        Validate the cron expression.
        """
        parts = expr.split() 
        if len(parts) != 5:
            raise ValueError("Cron expression must have exactly 5 parts.")

        for part in parts[2].split(","):
            if "W" in part:
                try:
                    int(part.replace("W", ""))
                except ValueError:
                    raise ValueError(f"Invalid working day number in expression: {part}")

    def get_next(self, date_class=datetime) -> datetime:
        """
        Get the next occurrence based on the custom cron expression.
        Handles both standard cron logic and 'W' working day logic.
        """
        if not self._is_working_day_expr:
            return self._cron_iter.get_next(date_class)

        cron_expr = self._get_base_cron_expr()
        working_days = self._parse_working_days(self.expr.split()[2])
        normal_days = self._parse_normal_days(self.expr.split()[2])
        iter_base = croniter(cron_expr, self.last_date or self.base)

        max_iterations = 1500  # To prevent infinite loops in case of invalid expressions
        for _ in range(max_iterations):
            candidate_date = iter_base.get_next(date_class)
            if self._matches_working_day(candidate_date, working_days) or \
               self._matches_normal_day(candidate_date, normal_days):
                self.last_date = candidate_date
                return candidate_date

        raise RuntimeError("Exceeded maximum iterations while finding the next valid date.")

    def _get_base_cron_expr(self) -> str:
        """
        Replace 'W' in the cron expression with '*' to get a base expression for iteration.
        """
        parts = self.expr.split()
        if "W" in parts[2]:
            parts[2] = "*"
        return " ".join(parts)

    def _parse_working_days(self, day_of_month: str) -> List[int]:
        """
        Parse the day-of-month field to extract multiple working days.

        :param day_of_month: The day-of-month field from the cron expression.
        :return: A list of working day numbers (e.g., [3, 5] for "3W,5W").
        """
        return [int(part.replace("W", "")) for part in day_of_month.split(",") if "W" in part]

    def _parse_normal_days(self, day_of_month: str) -> List[int]:
        """
        Parse the day-of-month field to extract regular days.

        :param day_of_month: The day-of-month field from the cron expression.
        :return: A list of regular day numbers.
        """
        return [int(part) for part in day_of_month.split(",") if part.isdigit()]

    def _matches_working_day(self, date: datetime, working_days: List[int]) -> bool:
        """
        Check if the date matches the working day criteria.
        """
        if date.weekday() >= 5 or date in self.holidays:
            return False
        return self._get_nth_working_day(date) in working_days

    def _matches_normal_day(self, date: datetime, normal_days: List[int]) -> bool:
        """
        Check if the date matches the normal day criteria.
        """
        return date.day in normal_days

    def _get_nth_working_day(self, date: datetime) -> int:
        """
        Determine the nth working day of the month for a given date.
        """
        month_start = date.replace(day=1)
        nth_working_day = 0

        for day in range(1, date.day + 1):
            try:
                candidate = month_start.replace(day=day)
            except ValueError:
                break  # Reached the end of the month

            if candidate.weekday() < 5 and candidate not in self.holidays:
                nth_working_day += 1
                if candidate == date:
                    return nth_working_day
        return 0


class MonthlyExecutionAnalyzer:
    def __init__(self, historical_data: List[datetime], threshold: float = 0.8, deviation: int = 3):
        """
        Initializes the analyzer with historical execution data.

        :param historical_data: List of datetime objects representing execution dates.
        :param threshold: Proportion of intervals required to determine a consistent pattern.
        :param deviation: Allowed deviation in days for interval matching.
        """
        if not historical_data:
            raise ValueError("Historical data cannot be empty.")
        
        self.historical_data = sorted(
            dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) for dt in historical_data
        )
        self.threshold = threshold
        self.deviation = deviation

    def detect_pattern(self) -> Dict[str, Any]:
        """
        Detects the frequency pattern from the historical data.

        :return: Dictionary containing the detected pattern and frequencies.
        """
        intervals = self._calculate_intervals()
        frequencies = self._count_frequencies(intervals)
        starting_month = self._get_most_frequent_month()
        pattern = self._detect_frequency(frequencies, starting_month)
        
        return {
            "pattern": pattern,
            "frequencies": dict(frequencies)  # Convert Counter to a standard dictionary for output
        }

    def _count_by_month(self) -> Dict[int, int]:
        """
        Counts occurrences of dates by month.

        :return: Dictionary of months and their counts.
        """
        month_count = Counter(date.month for date in self.historical_data)
        max_count = max(month_count.values(), default=0)
        
        # Keep only months with counts close to the max count
        return {month: count for month, count in month_count.items() if count * 2 >= max_count}

    def _calculate_intervals(self) -> List[int]:
        """
        Calculates the day intervals between consecutive dates, filtering by frequent months.

        :return: List of intervals in days.
        """
        filtered_months = set(self._count_by_month().keys())
        filtered_data = [date for date in self.historical_data if date.month in filtered_months]
        
        return [
            (filtered_data[i] - filtered_data[i - 1]).days
            for i in range(1, len(filtered_data))
        ]

    def _count_frequencies(self, intervals: List[int]) -> Counter:
        """
        Counts the occurrence of intervals that match predefined monthly patterns.

        :param intervals: List of day intervals.
        :return: Counter of matching interval frequencies.
        """
        monthly_intervals = [30, 60, 90, 120, 180]
        frequencies = Counter()

        for interval in intervals:
            for base in monthly_intervals:
                if abs(interval - base) <= self.deviation:  # Allow small deviation
                    frequencies[base] += 1
                    break

        return frequencies

    def _get_most_frequent_month(self) -> int:
        """
        Finds the most frequent month in the historical data.

        :return: The most frequent month as an integer (1 for January, 12 for December).
        """
        months = [dt.month for dt in self.historical_data]
        month_frequencies = Counter(months)
        return month_frequencies.most_common(1)[0][0]

    def _detect_frequency(self, frequencies: Counter, starting_month: int) -> str:
        """
        Detects the most common frequency pattern.

        :param frequencies: Counter of interval frequencies.
        :param starting_month: The most frequent starting month.
        :return: A string representing the detected frequency pattern.
        """
        if not frequencies:
            return "*"

        most_common_interval, count = frequencies.most_common(1)[0]
        total_intervals = sum(frequencies.values())

        # Return '*' if the most common interval is 30, regardless of the threshold.
        if most_common_interval == 30:
            return "*"

        # Check if the most common interval meets the threshold.
        if count / total_intervals >= self.threshold:
            return self._generate_pattern(most_common_interval, starting_month)
        
        return "*"

    def _generate_pattern(self, interval: int, starting_month: int) -> str:
        """
        Generates a monthly pattern string based on the interval and starting month.

        :param interval: The detected interval in days.
        :param starting_month: The most frequent starting month.
        :return: A string representing the months in the pattern.
        """
        months = []

        for i in range(0, 12, interval // 30):  # Convert interval to months
            months.append((starting_month + i - 1) % 12 + 1)

        return ",".join(map(str, sorted(set(months))))


class DailyExecutionAnalyzer:
    def __init__(self, historical_data: List[datetime], holidays: List[datetime] = None, monthly_pattern: str = "*"):
        self.historical_data = sorted(set(dt.replace(hour=0, minute=0, second=0, microsecond=0) for dt in historical_data))
        self.monthly_pattern = monthly_pattern
        self.holidays = set(dt.replace(hour=0, minute=0, second=0, microsecond=0) for dt in holidays) if holidays else set()

    def detect_pattern(self) -> Dict[str, any]:
        weekday_count = self._count_by_weekday()
        day_of_month_count = self._count_by_day_of_month()
        working_day_count = self._count_by_working_day()

        cron_expressions = self._generate_cron_expressions(weekday_count, day_of_month_count, working_day_count, self.monthly_pattern)

        best_cron = None
        highest_accuracy = 0
        includes_holidays = False

        for cron_expr in cron_expressions:
            accuracy = self._evaluate_prediction_accuracy(cron_expr, holiday=False)
            print(f"cron: {cron_expr}, accuracy: {accuracy}")
            if accuracy > highest_accuracy:
                best_cron = cron_expr
                highest_accuracy = accuracy

        for cron_expr in cron_expressions:
            accuracy = self._evaluate_prediction_accuracy(cron_expr, holiday=True)
            print(f"cron: {cron_expr}, accuracy: {accuracy}")
            if accuracy > highest_accuracy:
                best_cron = cron_expr
                highest_accuracy = accuracy
                includes_holidays = True

        return {
            "best_cron_expression": best_cron,
            "highest_accuracy": highest_accuracy,
            "includes_holidays": includes_holidays,
        }

    def _count_by_weekday(self) -> Dict[int, int]:
        weekday_count = Counter(date.isoweekday() for date in self.historical_data)
        print(weekday_count)
        max_count = max(weekday_count.values(), default=0)
        return {day: count for day, count in weekday_count.items() if count * 1.5 >= max_count}

    def _count_by_day_of_month(self) -> Dict[int, int]:
        day_of_month_count = Counter(date.day for date in self.historical_data)
        max_count = max(day_of_month_count.values(), default=0)
        return {day: count for day, count in day_of_month_count.items() if count * 1.5 >= max_count}

    def _count_by_working_day(self) -> Dict[int, int]:
        working_day_count = Counter()
        for date in self.historical_data:
            year, month = date.year, date.month
            working_days = self._get_working_days(year, month)
            if date in working_days:
                index = working_days.index(date) + 1
                working_day_count[index] += 1

        max_count = max(working_day_count.values(), default=0)
        return {day: count for day, count in working_day_count.items() if count * 1.5 >= max_count}

    def _generate_cron_expressions(self, weekday_count: Dict[int, int], day_of_month_count: Dict[int, int], working_day_count: Dict[int, int], monthly_pattern: str) -> List[str]:
        crons = []

        crons.append(f"0 0 * {monthly_pattern} *")  # Daily execution

        weekdays = [str(day) for day, count in weekday_count.items() if count > 0]
        if weekdays:
            crons.append(f"0 0 * {monthly_pattern} {','.join(weekdays)}")

        days = [str(day) for day, count in day_of_month_count.items() if count > 0]
        if days:
            crons.append(f"0 0 {','.join(days)} {monthly_pattern} *")

        working_days = [f"{day}W" for day, count in working_day_count.items() if count > 0]
        if working_days:
            crons.append(f"0 0 {','.join(working_days)} {monthly_pattern} *")

        return crons

    def _evaluate_prediction_accuracy(self, cron_expr: str, holiday: bool) -> float:
        try:
            cron = WorkingDayCroniter(
                cron_expr, base=self.historical_data[0] - timedelta(hours=1), holidays=self.holidays if holiday else None
            )
            
            predicted_dates = set()
            for _ in range(len(self.historical_data)):
                next_date = cron.get_next(datetime)
                predicted_dates.add(next_date)

            return len(predicted_dates.intersection(self.historical_data)) / len(self.historical_data)
        except Exception as e:
            return 0.0

    @lru_cache(None)
    def _get_working_days(self, year: int, month: int) -> List[datetime]:
        working_days = []
        _, last_day = calendar.monthrange(year, month)
        for day in range(1, last_day + 1):
            try:
                date = datetime(year, month, day)
                if date.weekday() < 5 and date not in self.holidays:
                    working_days.append(date)
            except ValueError:
                continue
        return working_days


In [None]:
from datetime import datetime
from typing import List, Optional
from croniter import croniter
import threading
from copy import deepcopy

class WorkingDayCroniter:
    def __init__(self, expr: str, base: datetime, holidays: Optional[List[datetime]] = None):
        """
        Initialize the WorkingDayCroniter instance.

        :param expr: A cron expression, supporting 'W' for the nth working days of the month.
        :param base: The base datetime to calculate from.
        :param holidays: A list of dates that are considered holidays.
        """
        self.expr = expr
        self.base = base
        self.holidays = set(holidays or [])  # Use a set for faster lookups
        self._state = threading.local()  # Thread-local storage for state
        self._state.last_date = None  # Initialize thread-local last_date
        self._validate_expression(expr)

        self._is_working_day_expr = "W" in expr
        self._cron_iter = None if self._is_working_day_expr else croniter(expr, base)

    def _validate_expression(self, expr: str):
        """
        Validate the cron expression.
        """
        expr_parts = deepcopy(expr).split()

        if len(expr_parts) != 5:
            raise ValueError("Cron expression must have exactly 5 parts.")

        days_expr_part = expr_parts[2].split(",")
        for day in days_expr_part:
            if day.endswith("W"):
                try:
                    int(day[:-1])
                except ValueError:
                    raise ValueError(f"Invalid working day number in expression: {day}")

        expr_parts[2].replace("W","")

        cron_expression_to_validade = " ".join(expr_parts)
        croniter.is_valid(cron_expression_to_validade)


    def get_next(self, date_class=datetime) -> datetime:
        """
        Get the next occurrence based on the custom cron expression.
        Handles both standard cron logic and 'W' working day logic.
        """
        if not self._is_working_day_expr:
            return self._cron_iter.get_next(date_class)

        cron_expr = self._get_base_cron_expr()
        working_days = self._parse_working_days(self.expr.split()[2])
        normal_days = self._parse_normal_days(self.expr.split()[2])
        iter_base = croniter(cron_expr, self._state.last_date or self.base)

        max_iterations = 1500  # To prevent infinite loops in case of invalid expressions
        for _ in range(max_iterations):
            candidate_date = iter_base.get_next(date_class)
            if self._matches_working_day(candidate_date, working_days) or \
               self._matches_normal_day(candidate_date, normal_days):
                self._state.last_date = candidate_date
                return candidate_date

        raise RuntimeError("Exceeded maximum iterations while finding the next valid date.")

    def _get_base_cron_expr(self) -> str:
        """
        Replace 'W' in the cron expression with '*' to get a base expression for iteration.
        """
        parts = self.expr.split()
        if "W" in parts[2]:
            parts[2] = "*"
        return " ".join(parts)

    def _parse_working_days(self, day_of_month: str) -> List[int]:
        """
        Parse the day-of-month field to extract multiple working days.

        :param day_of_month: The day-of-month field from the cron expression.
        :return: A list of working day numbers (e.g., [3, 5] for "3W,5W").
        """
        return [int(part.replace("W", "")) for part in day_of_month.split(",") if "W" in part]

    def _parse_normal_days(self, day_of_month: str) -> List[int]:
        """
        Parse the day-of-month field to extract regular days.

        :param day_of_month: The day-of-month field from the cron expression.
        :return: A list of regular day numbers.
        """
        return [int(part) for part in day_of_month.split(",") if part.isdigit()]

    def _matches_working_day(self, date: datetime, working_days: List[int]) -> bool:
        """
        Check if the date matches the working day criteria.
        """
        if date.weekday() >= 5 or date in self.holidays:
            return False
        return self._get_nth_working_day(date) in working_days

    def _matches_normal_day(self, date: datetime, normal_days: List[int]) -> bool:
        """
        Check if the date matches the normal day criteria.
        """
        return date.day in normal_days

    def _get_nth_working_day(self, date: datetime) -> int:
        """
        Determine the nth working day of the month for a given date.
        """
        month_start = date.replace(day=1)
        nth_working_day = 0

        for day in range(1, date.day + 1):
            try:
                candidate = month_start.replace(day=day)
            except ValueError:
                break  # Reached the end of the month

            if candidate.weekday() < 5 and candidate not in self.holidays:
                nth_working_day += 1
                if candidate == date:
                    return nth_working_day
        return 0


In [290]:
wc = WorkingDayCroniter("0 0 1W * *", datetime.now())

In [320]:
wc.get_next().isoformat()

'2027-06-01T00:00:00'

In [321]:
from collections import Counter
from datetime import datetime
from typing import List, Dict, Any


class MonthlyExecutionAnalyzer:
    def __init__(self, historical_data: List[datetime], threshold: float = 0.8, deviation: int = 3):
        """
        Initializes the analyzer with historical execution data.

        :param historical_data: List of datetime objects representing execution dates.
        :param threshold: Proportion of intervals required to determine a consistent pattern.
        :param deviation: Allowed deviation in days for interval matching.
        """
        if not historical_data:
            raise ValueError("Historical data cannot be empty.")
        
        self.historical_data = sorted(
            dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) for dt in historical_data
        )
        self.threshold = threshold
        self.deviation = deviation

    def detect_pattern(self) -> Dict[str, Any]:
        """
        Detects the frequency pattern from the historical data.

        :return: Dictionary containing the detected pattern and frequencies.
        """
        intervals = self._calculate_intervals_between_executions()
        frequencies = self._count_intervals_frequencies(intervals)
        starting_month = self._get_most_frequent_month()
        pattern = self._gerate_monthly_pattern(frequencies, starting_month)
        
        return {
            "pattern": pattern,
            "frequencies": dict(frequencies)  # Convert Counter to a standard dictionary for output
        }

    def _count_by_month_and_filter_noise(self) -> Dict[int, int]:
        """
        Counts occurrences of dates by month.

        :return: Dictionary of months and their counts.
        """
        month_count = Counter(date.month for date in self.historical_data)
        max_count = max(month_count.values(), default=0)
        
        # Keep only months with counts close to the max count
        return {month: count for month, count in month_count.items() if count * 2 >= max_count}

    def _calculate_intervals_between_executions(self) -> List[int]:
        """
        Calculates the day intervals between consecutive dates, filtering by frequent months.

        :return: List of intervals in days.
        """
        filtered_months = set(self._count_by_month_and_filter_noise().keys())
        filtered_data = [date for date in self.historical_data if date.month in filtered_months]
        
        return [
            (filtered_data[i] - filtered_data[i - 1]).days
            for i in range(1, len(filtered_data))
        ]

    def _count_intervals_frequencies(self, intervals: List[int]) -> Counter:
        """
        Counts the occurrence of intervals that match predefined monthly patterns.

        :param intervals: List of day intervals.
        :return: Counter of matching interval frequencies.
        """
        monthly_intervals = [30, 60, 90, 120, 180]
        frequencies = Counter()

        for interval in intervals:
            for base in monthly_intervals:
                if abs(interval - base) <= self.deviation:  # Allow small deviation
                    frequencies[base] += 1
                    break

        return frequencies

    def _get_most_frequent_month(self) -> int:
        """
        Finds the most frequent month in the historical data.

        :return: The most frequent month as an integer (1 for January, 12 for December).
        """
        months = [dt.month for dt in self.historical_data]
        month_frequencies = Counter(months)
        return month_frequencies.most_common(1)[0][0]

    def _gerate_monthly_pattern(self, frequencies: Counter, starting_month: int) -> str:
        """
        Detects the most common frequency pattern.

        :param frequencies: Counter of interval frequencies.
        :param starting_month: The most frequent starting month.
        :return: A string representing the detected frequency pattern.
        """
        if not frequencies:
            return "*"
       
        most_common_interval = frequencies.most_common(1)[0][0]
        pattern = []

        if most_common_interval == 30:
            return "*"
        
        for i in range(0, 12, most_common_interval // 30):  # Convert interval to months
            pattern.append((starting_month + i - 1) % 12 + 1)

        return ",".join(map(str, sorted(set(pattern))))

In [323]:
# Example usage:
historical_data = [
            datetime(2023, 12, 2), datetime(2024, 1, 3), datetime(2024, 2, 4), datetime(2024, 3, 5),
            datetime(2024, 4, 1), datetime(2024, 5, 2), datetime(2024, 6, 3), datetime(2024, 7, 4),
            datetime(2024, 8, 1), datetime(2024, 7, 2), datetime(2024, 11, 3), datetime(2024, 12, 4),
        ]

analyzer = MonthlyExecutionAnalyzer(historical_data)
result = analyzer.detect_pattern()
print(result)

{'pattern': '*', 'frequencies': {30: 9, 90: 1}}


In [147]:
analyzer.historical_data

for date in analyzer.historical_data:
    print(date.month)

1
1
1
2
5
5
5
6
9
9
9
10
