In [1]:
import threading
from datetime import datetime
from datetime import timedelta
from datetime import timezone as tzone
import time


In [2]:
class Scheduler:
    """
    A class to represent a schedule for jobs with start and end dates.
    
    Attributes:
    -----------
    threading : boolean 
        Whether or not to use threading while running the jobs 
    
    Methods:
    --------
    job():
        Method to create and configure a job
    run_all():
        Runs all jobs with or without threading
    """

    def __init__(self, threading = False, start_date = None, time_zone=None):
        
        # ensuring that we get appropriate type of parameters for Scheduler 
        if not isinstance(time_zone, int) and time_zone is not None:
            raise ValueError("Timezone must be an integer")

        if not isinstance(start_date, str) and start_date is not None:
            raise ValueError("Date must be an str format %Y-%m-%d %H:%M")

        self.threads = threading   #threading 
        self.scheduler_timezone = tzone(timedelta(hours=time_zone)) if time_zone  else datetime.now().astimezone().tzinfo # timezone of scheduler
        self.scheduler_startdate = (datetime.strptime(start_date, '%Y-%m-%d %H:%M') if start_date else datetime.now().replace(second=0, microsecond=0)).astimezone(self.scheduler_timezone) # job running start date passed to scheduler


        # validating that passed startdate isn't more than current time
        if  self.scheduler_startdate.strftime('%Y-%m-%d %H:%M') < datetime.now().strftime('%Y-%m-%d %H:%M'):
            raise ValueError("Start date cannot be less than the current date")

        self.jobs = []  # empty list for adding jobs later


    def job(self, start_date=None, time_zone=None):

        """
        Creates and configures a job with a specified start date and time zone.
        
        Args:
        -----
        startdate : str, optional
            The start date of the job in the format '%Y-%m-%d %H:%M'. If not provided, the scheduler's default start date is used.
        
        timezone : int, optional
            The timezone offset in hours. If not provided, the scheduler's default timezone is used.
        
        Raises:
        -------
        ValueError
            If the timezone is not an integer or if the start date is not in the correct string format.
            If the job start date is set to a time earlier than the current date and time.
        
        Returns:
        --------
        JobWrapper
            A JobWrapper object that allows further configuration of the job.
        """
        # ensuring that we get appropriate type of parameters for job
        if not isinstance(time_zone, int) and time_zone is not None:
            raise ValueError("Timezone must be an integer")

        if not isinstance(start_date, str) and start_date is not None:
            raise ValueError("Date must be an str fromat %Y-%m-%d %H:%M")

        job_timezone = tzone(timedelta(hours=time_zone)) if time_zone else  self.scheduler_timezone # timezone of job
        job_startdate = datetime.strptime(start_date, '%Y-%m-%d %H:%M').astimezone(job_timezone) if start_date else self.scheduler_startdate # job running start date passed to job

        # ensuring that start date isn't in the past
        if  job_startdate.strftime('%Y-%m-%d %H:%M') < datetime.now().strftime('%Y-%m-%d %H:%M'):
            raise ValueError("Job start date cannot be less than the current date")

        # creating the job configuration as a dictionary with default values for job parameters
        job = {
            'startdate': job_startdate,
            'time_zone': job_timezone,
            'end_date': None,
            'func': None,
            'name': None,
            'unit': None,
            'interval': None,
            'next_run': None,
            'repeats': None,  # New attribute for repeats
            'repeat_count': 0,  # Counter for completed runs
            'args': (),
            'kwargs': {}
        }


        class JobWrapper:
            """
            A wrapper class to represent an individual job within the scheduler.
            
            Attributes:
            -----------
            job : dict
                Dictionary containing configuration parameters of the job.
            
            Methods:
            --------
            second(every=1):
                Sets the job to run every specified number of seconds. (code runs in every 1 second as default)
            minute(every=1):
                Sets the job to run every specified number of minutes. (code runs in every 1 minute as default)
            hour(every=1):
                Sets the job to run every specified number of hours. (code runs in every 1 hour as default)
            day(every=1, hour=None):
                Sets the job to run every specified number of days at an optional specific time. (code runs in every 1 day as default)
            week(every=1, week_day=None, hour=None):
                Sets the job to run every specified number of weeks on specified weekdays and time. (code runs in every 1 week as default)
            do(func, name):
                Configures the function to run with the job and assigns a name to the job.
            until(end date):
                Sets the end date for the job.
            repeat(times):
                Sets the number of times the job should repeat.
            calculate_next_run(current_time):
                Calculates the next run time of the job based on its schedule.
            run(*args, **kwargs):
                Executes the job at its scheduled times with the provided arguments.
            """
            
            def __init__(self, job):
                self.job = job  # passes the job that should get executed
                
                
            def second(self, every=1):
                """
                Sets the job to run every `every` seconds and defines unit as a second
                
                Args:
                -----
                every : Sets the job to run every specified number of seconds.
                """
                
                self.job['unit'] = 'second'  # time unit for the job
                self.job['interval'] = every # interval 
                return self

            def minute(self, every=1):
                """Set the job to run every `every` minutes."""
                self.job['unit'] = 'minute'
                self.job['interval'] = every
                return self

            def hour(self, every=1):
                """Set the job to run every `every` hours."""
                self.job['unit'] = 'hour'
                self.job['interval'] = every
                return self

            def day(self, every=1, hour=None):
                """Set the job to run every `every` days, optionally at a specific time."""
                self.job['unit'] = 'day'
                self.job['interval'] = every
                self.job['at'] = hour
                return self
            
            def week(self, every=1, week_day=None, hour=None):
                """
                Set the job to run every `every` weeks on specific weekdays at a specific time.
                If `week_day` is None, the job will every week. 
                
                Parameters:
                -----------
                every : int
                    Interval in weeks.
                week_day : int, list of int, or None
                    Single integer or list of weekdays where 0 = Monday, ..., 6 = Sunday. If None, run every day.
                hour : str
                    Time in HH:MM format at which the job should run.
                """
                if  week_day and  type(week_day) != int and type(week_day)!=list:
                    raise ValueError("Weekday must be an integer or a list of integers")
                elif week_day and isinstance(week_day, list):
                    # Check if all elements are integers
                    if not all(isinstance(i, int) for i in week_day):
                        raise ValueError('List of Weekdays must contain integers only')
                    
                    # Check if the list length is more than 6
                    if len(week_day) > 6:
                        raise ValueError('List of Weekdays cannot contain more than 6 items')
                    
                    # Check if each number in the list is between 0 and 6
                    if not all(0 <= i <= 6 for i in week_day):
                        raise ValueError('Each Weekday number must be between 0 and 6')
                elif week_day and type(week_day)==int and week_day>6:
                    raise ValueError('Weekday is Specified to be between 0 and 6')

                self.job['unit'] = 'week'
                self.job['interval'] = every

                # passed week days are saved in list then turned into sets to avoid repeating values without raising errror
                # then sets are once again turned into lists
                self.job['week_day'] = [datetime.today().weekday()] if week_day is None else [week_day] if isinstance(week_day, int) else list(set(week_day))
                print( self.job['week_day'])
                # defining at what hour of the day code should run
                self.job['at'] = hour
                return self
                                


            def do(self, func, name):
                self.job['func'] = func
                self.job['name'] = name
                return self


            def until(self, end_date):
                end_date = datetime.strptime(end_date, '%Y-%m-%d %H:%M')
                end_date = end_date.astimezone(self.job['time_zone'])
                if end_date < self.job['startdate']:
                    raise ValueError('Startdate must be earlier than End date')
                self.job['end_date'] = end_date
                return self

            def repeat(self, times):
                """Set the number of times the job should repeat."""
                self.job['repeats'] = times
                return self

            def calculate_next_run(self, current_time):
                # Truncate the current_time to remove milliseconds
                current_time = current_time.replace(microsecond=0)

                if self.job['unit'] == 'second':
                    self.job['next_run'] = current_time + timedelta(seconds=self.job['interval'])
                elif self.job['unit'] == 'minute':
                    current_time = current_time.replace(second=0)
                    self.job['next_run'] = current_time + timedelta(minutes=self.job['interval'])
                elif self.job['unit'] == 'hour':
                    current_time = current_time.replace(minute=0, second=0)
                    self.job['next_run'] = current_time + timedelta(hours=self.job['interval'])
                elif self.job['unit'] == 'day':
                    # Handling the 'at' parameter for the day interval
                    if 'at' in self.job and self.job['at']:
                        # Parse the 'at' time from the job settings
                        hour, minute = map(int, self.job['at'].split(':'))
                        next_run = current_time.replace(hour=hour, minute=minute, second=0, microsecond=0)

                        # Ensure next_run is set on the correct day interval
                        if next_run <= current_time:
                            next_run += timedelta(days=self.job['interval'])
                        self.job['next_run'] = next_run
                    else:
                        # Default behavior without 'at' attribute
                        current_time = current_time.replace(hour=0, minute=0, second=0)
                        self.job['next_run'] = current_time + timedelta(days=self.job['interval'])
                elif self.job['unit'] == 'week':
                    # Set up the initial next run time based on the current time and interval
                    runlist = []
                    next_run = current_time + timedelta(weeks=self.job['interval'])
                    print(f'Added next run{next_run.weekday()}')
                    # Default to current weekday if no week_day is specified
                    weekdays = self.job['week_day'] if self.job['week_day'] else [current_time.weekday()]
                    for day in weekdays:
                        print( current_time.weekday(),day)
                        days_difference = current_time.weekday() - day
                        next_run = next_run - timedelta(days=days_difference)
                        print(next_run)

                        if 'at' in self.job and self.job['at']:
                            hour, minute = map(int, self.job['at'].split(':'))
                            next_run = next_run.replace(hour=hour, minute=minute, second=0, microsecond=0)

                        # If the next run time is in the past, add the weekly interval
                        if next_run <= current_time:
                            next_run += timedelta(weeks=self.job['interval'])

                        runlist.append(next_run)
                        next_run = current_time + timedelta(weeks=self.job['interval'])
            
                    self.job['next_run'] = min(runlist)
                else:
                    raise ValueError("Unsupported unit. Please extend the `calculate_next_run` method to support other units.")
                print(f"calc next run: {self.job['next_run']}")

                return [self.job['next_run']].pop(0)
                

            def run(self, *args, **kwargs):
                # Wait until jstartdate occurs
                now = datetime.now(self.job['time_zone']).replace(microsecond=0)
                if now < self.job['startdate']:
                    wait_time = (self.job['startdate'] - now).total_seconds()
                    time.sleep(wait_time)

                # Check if end date is today and at time is in the past
                if self.job['end_date'] and self.job['end_date'].date() == now.date() and 'at' in self.job and self.job['at']:
                    hour, minute = map(int, self.job['at'].split(':'))
                    at_time = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
                    if at_time < now.replace(second=0, microsecond=0):
                        raise ValueError("The specified hour is earlier than the current time for today's end date.")

                while True:
                    # defining now 1st time so we can run the function at the very first place
                    now = datetime.now(self.job['time_zone']).replace(microsecond=0)
                    
                    # ensuring that code runs for the first time or at the time that it has to run next
                    if self.job['next_run'] is None or now >= self.job['next_run']:
                        # if code reaches it's upper limit time it stops running 
                        if self.job['end_date'] and now.replace(second=0,microsecond=0) >= self.job['end_date']:
                            print(self.job['end_date'], now.replace(second=0,microsecond=0))
                            print('in first break')
                            break
                        
                        # ensuring there is function to run 
                        if self.job['func']:
                            try:
                                self.job['func'](*self.job['args'], **self.job['kwargs'])  # Run the job
                                self.job['repeat_count'] += 1
                                self.calculate_next_run(now)
                            except Exception as e:
                                print(f"Job {self.job['name']} failed with exception: {e}")
                                self.calculate_next_run(now)
                                
                            # # executing the function itself
                            # self.job['func'](*self.job['args'], **self.job['kwargs'])
                            # self.job['repeat_count'] += 1
                            
                            # overwriting on now variable so we can define it as the time when function ends running 
                            now = datetime.now(self.job['time_zone']).replace(microsecond=0)
                            
                            # if code reaches it's upper limit time it stops running 
                            if self.job['end_date'] and now.replace(second=0,microsecond=0) >= self.job['end_date']:
                                print(self.job['end_date'], now.replace(second=0,microsecond=0))
                                print('in mid break')
                                break
                            
                            # calculating nex time run after function gets executed
                            # self.calculate_next_run(now)
                            if self.job['repeats'] is not None and self.job['repeat_count'] >= self.job['repeats']:
                                print('repeatbreak?')
                                break
                        
                    if self.job['end_date'] and now >= self.job['end_date']:
                        print('in second break')
                        break


        job_wrapper = JobWrapper(job)
        self.jobs.append(job_wrapper)
        return job_wrapper

    def run_all(self):
        if self.threads:
            threads = []
            for job in self.jobs:
                thread = threading.Thread(target=job.run())
                threads.append(thread)
                thread.start()
            for thread in threads:
                thread.join()
        else:
            for job in self.jobs:
                job.run()




In [3]:


# Example functions
def report1(num1=1,num2 = 2):
    print(f"\nthis is starting time {datetime.now()}")
    if num1 == 1:
        raise ValueError ('num1 is invalid')
    # time.sleep(10)


def report2():
    print("Function report2 is running.")

def report3():
    print("Function report3 is running.")

In [4]:
# scheduler = Scheduler(threading=True, time_zone=5)
# 
# # scheduler.job(time_zone=4).do(report1, 'report1').day(every=1, hour='15:38').repeat(1000).until('2024-09-02 15:40')
# scheduler.job(time_zone=4).do(report1, 'report2').second().repeat(1000).until('2024-10-01 19:44')
# 
# scheduler.run_all()


## IDK YET BUT TEST CASES I GUESS

In [10]:
import unittest
from datetime import datetime, timedelta

class TestScheduler(unittest.TestCase):

    def setUp(self):
        """Set up a basic scheduler for testing"""
        self.scheduler = Scheduler(threading=False, time_zone=5)

    def test_scheduler_initialization(self):
        """Test if the Scheduler initializes with the correct timezone and start date"""
        # Create the scheduler instance
        scheduler = Scheduler(time_zone=3, start_date="2024-10-15 15:00")
    
        # Check that the timezone is correctly set
        expected_timezone = tzone(timedelta(hours=3))  # Create a timezone object with a 3-hour offset
        print('asserting')
        self.assertEqual(scheduler.scheduler_timezone, expected_timezone)
        print('asserting2')
    
        # Check that the start date is correctly set
        self.assertEqual(scheduler.scheduler_startdate.strftime('%Y-%m-%d %H:%M'), "2024-10-15 15:00")

    # 
    # def test_invalid_timezone(self):
    #     """Test invalid timezone input"""
    #     with self.assertRaises(ValueError):
    #         Scheduler(time_zone="invalid")
    # 
    # def test_invalid_start_date(self):
    #     """Test invalid start date input"""
    #     with self.assertRaises(ValueError):
    #         Scheduler(start_date="invalid_date")
    # 
    # def test_job_creation(self):
    #     """Test if a job can be created and initialized correctly"""
    #     job = self.scheduler.job(time_zone=2, start_date="2024-10-15 10:00")
    #     self.assertEqual(job.job['startdate'].strftime('%Y-%m-%d %H:%M'), "2024-10-15 10:00")
    #     self.assertEqual(job.job['time_zone'].utcoffset(None).total_seconds() / 3600, 2)
    # 
    # def test_job_repeat(self):
    #     """Test job repeat functionality"""
    #     job = self.scheduler.job().do(report2, "report2").repeat(3)
    #     self.assertEqual(job.job['repeats'], 3)
    # 
    # def test_job_until(self):
    #     """Test job until functionality"""
    #     job = self.scheduler.job().do(report2, "report2").until("2024-10-15 11:00")
    #     self.assertEqual(job.job['end_date'].strftime('%Y-%m-%d %H:%M'), "2024-10-15 11:00")
    # 
    # def test_job_second_interval(self):
    #     """Test job interval of every second"""
    #     job = self.scheduler.job().do(report2, "report2").second(every=5)
    #     job.calculate_next_run(datetime.now())
    #     self.assertEqual(job.job['interval'], 5)
    #     self.assertEqual(job.job['unit'], 'second')
    # 
    # def test_job_day_interval_with_at(self):
    #     """Test job interval of every day with specific hour"""
    #     job = self.scheduler.job().do(report2, "report2").day(every=1, hour="10:00")
    #     now = datetime.now()
    #     next_run = job.calculate_next_run(now)
    #     self.assertEqual(next_run.hour, 10)
    #     self.assertEqual(next_run.minute, 0)
    # 
    # def test_threaded_jobs(self):
    #     """Test if threaded jobs run without error"""
    #     scheduler = Scheduler(threading=True)
    #     scheduler.job().do(report2, "report2").second(every=1).repeat(3)
    #     scheduler.run_all()
    #     # No assertion here, just ensuring that threaded jobs run without error.
    # 
    # def test_job_with_exception_handling(self):
    #     """Test job exception handling"""
    #     job = self.scheduler.job().do(report1, "report1").second(every=2)
    #     try:
    #         job.run()
    #     except ValueError as e:
    #         self.assertTrue("num1 is invalid" in str(e))
    # 
    # def test_week_interval(self):
    #     """Test job interval of every week"""
    #     job = self.scheduler.job().do(report2, "report2").week(every=1, week_day=[1, 3], hour="10:00")
    #     now = datetime.now()
    #     next_run = job.calculate_next_run(now)
    #     self.assertIn(next_run.weekday(), [1, 3])
    # 
    # def test_multiple_jobs(self):
    #     """Test running multiple jobs"""
    #     scheduler = Scheduler(threading=False)
    #     scheduler.job().do(report2, "report2").second(every=1).repeat(2)
    #     scheduler.job().do(report3, "report3").minute(every=1)
    #     scheduler.run_all()
    #     # Ensures that multiple jobs are executed without errors.
    # 
    # def test_run_all_without_threading(self):
    #     """Test the run_all method without threading"""
    #     scheduler = Scheduler(threading=False)
    #     scheduler.job().do(report2, "report2").second(every=1).repeat(2)
    #     scheduler.run_all()
    #     # Ensures that jobs run sequentially without threading.


if __name__ == "__main__":
    unittest.main(argv=[''], exit=False)


F
FAIL: test_scheduler_initialization (__main__.TestScheduler.test_scheduler_initialization)
Test if the Scheduler initializes with the correct timezone and start date
----------------------------------------------------------------------
Traceback (most recent call last):
  File "C:\Users\kristinedzneladze\AppData\Local\Temp\ipykernel_11884\3711124835.py", line 22, in test_scheduler_initialization
    self.assertEqual(scheduler.scheduler_startdate.strftime('%Y-%m-%d %H:%M'), "2024-10-15 15:00")
AssertionError: '2024-10-15 14:00' != '2024-10-15 15:00'
- 2024-10-15 14:00
?             ^
+ 2024-10-15 15:00
?             ^


----------------------------------------------------------------------
Ran 1 test in 0.004s

FAILED (failures=1)


asserting
asserting2
