# jobs

> Fill in a module description here

In [None]:
#| default_exp jobs

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export

import schedule
from triggerkit import util
import time
import json
from triggerkit.snowflake import get_view_data
from triggerkit.actions import run, register
from datetime import datetime, time as datetime_time
from typing import Dict, List, Callable, Optional, Any, Union

2025-05-14 15:36:31,286 - numexpr.utils - INFO - NumExpr defaulting to 10 threads.
2025-05-14 15:36:31,502 - triggerkit - INFO - Registered action: Register Views
2025-05-14 15:36:31,503 - triggerkit - INFO - Registered action: Get DDL and Register Views


In [None]:
#| export

def create(view_name: str, action_names: Union[str, List[str]], job_name: Optional[str] = None):
    """
    Create a job function that fetches data from a view and runs specified actions.
    
    **Args:**
        \n • `view_name`: Name of the registered view
        \n • `action_names`: Name or list of names of registered actions
        \n • `job_name`: Optional name for the job
        
    **Returns:**
        Job function
    """
    util.logger.info(f"Creating Job for '{view_name}'.")
    if isinstance(action_names, str):   action_names = [action_names]
    if not job_name:                    job_name = f"{view_name}_{'_'.join(action_names)}_job"
    
    def job():
        util.logger.info(f"Running job '{job_name}' ...")
        print(f"🕚 Running job '{job_name}' ...")
        try:
            # Fetch data from view
            data = get_view_data(view_name)
            
            # Run each action
            results = {}
            for action_name in action_names:
                results[action_name] = run(action_name, data)
            
            util.logger.info(f"Job '{job_name}' completed successfully")
            print(f"✅ '{job_name}' Finished!")
            return results
        except Exception as e:
            util.logger.error(f"Job '{job_name}' failed: {str(e)}")
            raise
    
    job.__name__ = job_name
    util.logger.info(f"Job Function Created for '{view_name}'.")
    return job

In [None]:
#| export
util.SCHEDULED_JOBS = {}

def schedule_jobs(config: Dict[str, Any]):
    """
    Schedule jobs based on TOML configuration.
    
    **Args:**
        \n • `config:` Dictionary containing configuration sections
        
    **Returns:**
        None
    """
    jobs = config.get('jobs', [])
    
    for job_config in jobs:
        name = job_config.get('name')
        view_name = job_config.get('view')
        actions = job_config.get('actions', [])
        schedule_str = job_config.get('schedule')
        enabled = job_config.get('enabled', True)
        run_at = job_config.get('run_at')
        
        if not enabled:
            util.logger.info(f"Job '{name}' is disabled, skipping")
            continue
            
        if not all([view_name, actions, schedule_str]):
            util.logger.warning(f"Skipping invalid job configuration: {job_config}")
            continue
        
        job = create(view_name, actions, name)
        
        # Parse schedule string and set up schedule
        schedule_parts = schedule_str.split()
        if len(schedule_parts) >= 3 and schedule_parts[0] == "every":
            try:
                interval = int(schedule_parts[1])
                unit = schedule_parts[2].lower()
                
                scheduler = None
                if unit in ('minute', 'minutes'):
                    scheduler = schedule.every(interval).minutes
                elif unit in ('hour', 'hours'):
                    scheduler = schedule.every(interval).hours
                elif unit in ('day', 'days'):
                    scheduler = schedule.every(interval).days
                else:
                    util.logger.warning(f"Unsupported schedule unit: {unit}")
                    continue
                
                # Add specific time if provided
                if run_at and unit in ('day', 'days'):
                    try:
                        hour, minute = map(int, run_at.split(':'))
                        run_time = datetime_time(hour=hour, minute=minute)
                        scheduler = scheduler.at(run_at)
                    except (ValueError, AttributeError):
                        util.logger.warning(f"Invalid run_at time format: {run_at}, expected HH:MM. Using default.")
                
                scheduler.do(job)
                util.logger.info(f"Scheduled job '{name}' for view '{view_name}' with actions {actions} to run {schedule_str}" + 
                           (f" at {run_at}" if run_at and unit in ('day', 'days') else ""))
                util.SCHEDULED_JOBS[name] = {
                    'actions': actions,
                    'view_name': view_name,
                    'schedule_str': schedule_str,
                    'run_at': run_at
                    }
            except ValueError:
                util.logger.warning(f"Invalid schedule format: {schedule_str}")
                continue
        else:
            util.logger.warning(f"Unsupported schedule format: {schedule_str}")
            continue

In [None]:
#| export

@register('Create Job From View','Creates scheduled jobs for views that have job configuration.')
def create_job_from_view(data):
    """
    Create new jobs from views that have job configuration.
    """
    util.logger.info(f"Creating jobs from views...")
    util.logger.debug(f'Data: {data}')
    for view in data:
        util.logger.info(f"Processing view: {view}")
        view_data = get_view_data(view['TABLE_NAME'])
        if not view_data:
            util.logger.warning(f"No data returned from view: {view}")
            continue
        if 'CONFIG' not in view_data[0] and 'TK_CONFIG' not in view_data[0]:
            util.logger.warning(f"No job configuration found in view: {view}")
            continue

        first_row = view_data[0]
        util.logger.debug(f"Found job configuration in first row: {first_row}")
        config_col = 'CONFIG' if 'CONFIG' in first_row else 'TK_CONFIG'
        view_config = json.loads(first_row[config_col])

        util.logger.debug(f'Config data: {view_config}')
        name = view_config.get('name', view['TABLE_NAME'])
        view_name = view_config.get('view',view['TABLE_NAME'])
        actions = view_config.get('actions', [])
        schedule_str = view_config.get('schedule')          
        enabled = view_config.get('enabled', True)
        run_at = view_config.get('run_at')                     
        
        if not enabled:
            util.logger.info(f"Job '{name}' is disabled, skipping")
            continue
            
        if not all([view_name, actions, schedule_str]) and not all([name, actions, run_at]):
            util.logger.info(f"Skipping invalid job configuration: {view}")
            util.logger.info(f"See debug for missing info.")
            util.logger.debug(f"Config Values:")
            util.logger.debug(f' - name: {name}')
            util.logger.debug(f' - view_name: {view_name}')
            util.logger.debug(f' - actions: {actions}')
            util.logger.debug(f' - schedule_str: {schedule_str}')
            util.logger.debug(f' - enabled: {enabled}')
            util.logger.debug(f' - run_at: {run_at}')
            continue
        
        if name in util.SCHEDULED_JOBS and actions == util.SCHEDULED_JOBS[name]['actions'] and view_name == view and schedule_str == util.SCHEDULED_JOBS[name]['schedule_str'] and run_at == util.SCHEDULED_JOBS[name]['run_at']:
            util.logger.info(f"Job '{name}' already exists, skipping")
            continue
        

        job = create(view_name, actions, name)

        util.logger.info(f"Scheduling Job for '{name}")
        # Parse schedule string and set up schedule
        schedule_parts = schedule_str.split()
        if len(schedule_parts) >= 3 and schedule_parts[0] == "every":
            try:
                interval = int(schedule_parts[1])
                unit = schedule_parts[2].lower()
                
                scheduler = None
                if unit in ('minute', 'minutes'):
                    scheduler = schedule.every(interval).minutes
                elif unit in ('hour', 'hours'):
                    scheduler = schedule.every(interval).hours
                elif unit in ('day', 'days'):
                    scheduler = schedule.every(interval).days
                else:
                    util.logger.warning(f"Unsupported schedule unit: {unit}")
                    continue
                
                # Add specific time if provided
                if run_at and unit in ('day', 'days'):
                    try:
                        hour, minute = map(int, run_at.split(':'))
                        run_time = datetime_time(hour=hour, minute=minute)
                        scheduler = scheduler.at(run_at)
                    except (ValueError, AttributeError):
                        util.logger.warning(f"Invalid run_at time format: {run_at}, expected HH:MM. Using default.")
                
                scheduler.do(job)
                util.logger.info(f"AUTO SCHEDULED job '{name}' based on view '{view_name}' with actions {actions} to run {schedule_str}" + 
                           (f" at {run_at}" if run_at and unit in ('day', 'days') else ""))
                util.SCHEDULED_JOBS[name] = {
                    'actions': actions,
                    'view_name': view_name,
                    'schedule_str': schedule_str,
                    'run_at': run_at
                    }
            except ValueError:
                util.logger.warning(f"Invalid schedule format: {schedule_str}")
                continue
        else:
            util.logger.warning(f"Unsupported schedule format: {schedule_str}")
            continue

2025-05-14 15:36:31,522 - triggerkit - INFO - Registered action: Create Job From View


In [None]:
#| export

def run_scheduler():
    """Run the scheduler loop."""
    util.logger.info("Starting scheduler")
    while True:
        schedule.run_pending()
        time.sleep(1)

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()