## py_collector

Generic file, api, and data monitoring system. Platform and db agnostic.



In [3]:
from datetime import datetime, timezone, timedelta
from threading import Thread, Event
import threading
import signal

class Scheduler(Thread):
    ''' Controls scheduling for py_curator
    
    Based havily on Timer implementation here
    https://github.com/python/cpython/blob/3.9/Lib/threading.py
    '''
    initial_run = True
    alive = True
    
    def __init__(self, start_time=None, milliseconds=0, seconds=0, days=0,minutes=0, weeks=0,
                 timezone = None, count=1, separator=1 ):
        signal.signal(signal.SIGINT, self.cancel)
        Thread.__init__(self)
        self.finished = Event()
        self.count = count
        self.separator_delta =separator
        start_time = datetime.now() if start_time == None else start_time
        
        self.start_diff = abs(round((datetime.now() - start_time).total_seconds(),3))
        self.delta = timedelta(weeks=weeks, 
                               days=days, 
                               minutes=minutes, 
                               seconds=seconds, 
                               milliseconds=milliseconds).total_seconds()
    
    def cancel(self):
        print('Killing...')
        self.alive = False
        #self.finished.set()
        
    def init_run(self, func):
        ''' initial run to handle the wait'''
        if self.initial_run:
            self.initial_run = False
            self.finished.wait(self.start_diff)
            func()
    
            
    def schedule(self, func):
        ''' takes in a last run datetime, if the specified time interval + last run = 
        _now, return true, otherwise false'''
        
        while self.alive:
            self.init_run(func)

            self.finished.wait(self.delta)
            for i in range(self.count):
                func()
                self.finished.wait(self.separator_delta)

            self.schedule(func)

class Collector:
    
    def upload(self)->None:
        ''' What errors should be here?'''
        raise NotImplementedError('Please see documentation for implementation example')
    
    def is_new(self)->bool:
        raise NotImplementedError('Please see documentation for implementation example')
        
    def orchestrate(self):
        if self.is_new():
            self.upload()
            self.last_run = datetime.now()
        
    def monitor(self)->None:
        
        self.scheduler.schedule(self.orchestrate)
        

In [4]:
class test(Collector):
    #start_time = datetime(year=2021, month=6, day=28, hour=21, minute=39, second=0)
    scheduler = Scheduler(seconds=3, count=1, separator=1)
    
    def upload(self):
        print(datetime.now())
        print('OG Collector')
        
    def is_new(self):
        return True
    
class test1(Collector):
    scheduler = Scheduler(seconds=3, count=1, separator=1)
    
    def upload(self):
        print('Collector 1')
        
    def is_new(self):
        return True

In [5]:
t0 = test()
t1 = test1()

In [6]:
th1 = threading.Thread( target=t0.monitor, daemon = True)
th2 = threading.Thread( target=t1.monitor, daemon = True)

In [7]:
th1.start()
th2.start()

2021-06-28 21:30:23.272921
OG Collector
Collector 1
2021-06-28 21:30:26.277837
OG Collector
Collector 1
2021-06-28 21:30:30.282809
OG Collector
Collector 1
2021-06-28 21:30:34.286503
OG Collector
Collector 1


In [None]:
th1.start()
th2.start()for i in range(2):
    print(i)

In [None]:
th1.join()
th2.start()

In [6]:
dir(th1)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_args',
 '_bootstrap',
 '_bootstrap_inner',
 '_daemonic',
 '_delete',
 '_exc_info',
 '_ident',
 '_initialized',
 '_is_stopped',
 '_kwargs',
 '_name',
 '_reset_internal_locks',
 '_set_ident',
 '_set_tstate_lock',
 '_started',
 '_stderr',
 '_stop',
 '_target',
 '_tstate_lock',
 '_wait_for_tstate_lock',
 'daemon',
 'getName',
 'ident',
 'isAlive',
 'isDaemon',
 'is_alive',
 'join',
 'name',
 'run',
 'setDaemon',
 'setName',
 'start']

2021-06-28 21:13:53.682080
OG Collector
Collector 1
2021-06-28 21:13:57.689147
OG Collector
Collector 1
2021-06-28 21:14:01.690788
OG Collector
Collector 1
