In [1]:
import sys

sys.path.insert(0, '..')

import shutil
import uuid
from verification_service.worker.jobs import Supervisor, Worker
from verification_service.storage.database import MongoDbConnector
from pymongo.mongo_client import MongoClient
from dotenv import load_dotenv
import os 
from functools import partial
from verification_service import unique_id


_outs = './test_outputs'
if os.path.exists(_outs):
    shutil.rmtree(_outs)


def jobid(): return str(uuid.uuid4())


load_dotenv("../verification_service/.env")
uri = os.getenv("MONGO_DB_URI")
omex_source_dir = './examples/sbml-core'
omex_name = 'Elowitz-Nature-2000-Repressilator.omex'
omex_fp = os.path.join(omex_source_dir, omex_name)
out_dir = './test_outputs'
simulators = ['amici', 'copasi', 'tellurium']
spec_name = 'cI mRNA'
job_id = jobid()

db_connector = MongoDbConnector(connection_uri=uri, database_id="service_requests")

Smoldyn is not properly installed in this environment and thus its process implementation cannot be registered. Please consult smoldyn documentation.
Cannot register SimpleNeuron. Error:
**
No module named 'pyneuroml'
**


In [2]:
supervisor = Supervisor(db_connector=db_connector)

In [3]:
supervisor.jobs

{'completed_jobs': [{'_id': ObjectId('667a45c9e50fa55eb77a45f7'),
   'job_id': '053b2c1b-51a6-4a10-8252-594fce1efa8a',
   'status': 'COMPLETED',
   'timestamp': '2024-06-25 04:21:29.634498',
   'comparison_id': 'test',
   'results': None}],
 'in_progress_jobs': [{'_id': ObjectId('667a45c9e50fa55eb77a45f6'),
   'job_id': 'fa70243e-3602-4a23-84f9-ed44ea868f0b',
   'status': 'IN_PROGRESS',
   'timestamp': '2024-06-25 04:21:29.282452',
   'comparison_id': 'test',
   'worker_id': '72e9ffa0-e9bc-4d30-9e41-29a549ae36d2'}],
 'pending_jobs': [{'_id': ObjectId('6679fb9ef0a7c3bb9d2ab155'),
   'status': 'PENDING',
   'job_id': '0fb705b6-4b63-4348-94aa-27d37ba9e60f',
   'omex_path': '../tmp/Elowitz-Nature-2000-Repressilator.omex',
   'simulators': ['amici', 'copasi', 'tellurium'],
   'comparison_id': 'test',
   'timestamp': '2024-06-24 23:05:02.045342',
   'ground_truth_report_path': '../tmp/reports.h5',
   'include_outputs': True}]}

In [4]:
# re-create loop here

import asyncio
from verification_service import load_arrows


db_connector = MongoDbConnector(connection_uri=uri, database_id="service_requests")
supervisor = Supervisor(db_connector=db_connector)
        

async def fetch_jobs(supervisor: Supervisor, max_retries=5, delay=5):
    pending_jobs = [job for job in supervisor.db_connector.db['pending_jobs'].find()]
    async def _run_check():
        if len(pending_jobs):
            print('There are pending jobs')
            result = await check_jobs(supervisor)
            print('There are no pending jobs')
            return result
        else:
            return None 
            
    n_retries = 0
    run = True
    while run:
        check = await _run_check()
        if check is None:
            n_retries += 1
            await asyncio.sleep(delay)
        if n_retries == max_retries:
            run = False
        else:
            continue 
    return 0 
        
async def check_jobs(supervisor, max_retries=5, delay=5) -> int:
    job_queue = supervisor.pending_jobs
    n_tries = 0
    n_retries = 0
    while True:
        # count tries
        n_tries += 1
        if n_tries == max_retries + 1:
            print(f'Max retries {max_retries} reached!')
            break
        
        # if x is greater than 1 then it is a retry
        if n_tries > 1:
            print(f'{n_retries} is a retry. Running sim again.')
            n_retries += 1

        if len(job_queue):
            print('There are pending jobs.')
            for i, job in enumerate(job_queue):
                # get the next job in the queue based on the preferred_queue_index
                job_doc = supervisor.pending_jobs.pop(supervisor.preferred_queue_index)
                job_comparison_id = job_doc['comparison_id']
                unique_id_query = {'comparison_id': job_comparison_id}
                in_progress_job = supervisor.db_connector.db.in_progress_jobs.find_one(unique_id_query) or None
                
                _job_exists = partial(supervisor._job_exists, comparison_id=job_comparison_id)
                if not _job_exists(collection_name='in_progress_jobs'):
                    print(f"In progress job does not yet exist for {job_comparison_id}")
                    in_progress_job_id = unique_id()
                    worker_id = unique_id()
                    id_kwargs = ['job_id', 'worker_id']
                    in_prog_kwargs = dict(zip(
                        id_kwargs,
                        list(map(lambda k: unique_id(), id_kwargs))
                    ))
                    in_prog_kwargs['comparison_id'] = job_comparison_id
                    
                    supervisor.db_connector.insert_in_progress_job(**in_prog_kwargs)
                    print(f"Successfully created new progress job for {job_comparison_id}")
                    # await supervisor.async_refresh_jobs()
                    
                if not _job_exists(collection_name='completed_jobs'):
                    print(f"Completed job does not yet exist for {job_comparison_id}")
                    # pop in-progress job from internal queue and use it parameterize the worker
                    in_prog_id = [job for job in db_connector.in_progress_jobs.find()].pop(supervisor.preferred_queue_index)['job_id']
                    
                    # double-check and verify doc
                    in_progress_doc = supervisor.db_connector.db.in_progress_jobs.find_one({'job_id': in_prog_id})
                    
                    # generate new worker
                    workers_id = in_progress_doc['worker_id']
                    worker = supervisor.call_worker(job_params=job_doc, worker_id=workers_id)
                    
                    # add the worker to the list of workers (for threadsafety)
                    supervisor.workers.insert(supervisor.preferred_queue_index, worker.worker_id)
                    
                    # the worker returns the job result to the supervisor who saves it as part of a new completed job in the database
                    completed_doc = supervisor.db_connector.insert_completed_job(job_id=unique_id(), comparison_id=job_comparison_id, results=worker.job_result)
                    
                    # release the worker from being busy and refresh jobs
                    supervisor.workers.pop(supervisor.preferred_queue_index)
                    print(f"Successfully created new completed job for {job_comparison_id}")
                    # await supervisor.async_refresh_jobs()
            
                # remove the job from queue
                supervisor.pending_jobs.pop(i)
        # sleep
        print(f'Sleeping for {delay} seconds...')
        await load_arrows(delay)
                 
    return 0

In [5]:
result = await check_jobs(supervisor, max_retries=5, delay=3)

There are pending jobs.
In progress job does not yet exist for test
Successfully created new progress job for test
Completed job does not yet exist for test
Successfully created new completed job for test


IndexError: pop from empty list

In [16]:
async def f():
    timer = 1
    n_tries = 0
    max_retries = 10
    n_retries = 0
    y = list(range(10))
    x = 0
    while True:
        print(f'n retries: {n_retries}')
        n_tries += 1
        
        # if x is greater than 1 then it is a retry
        if n_tries > 1:
            print(f'{x} is a retry. Running sim again.')
            n_retries += 1
         
        # break if reached max tries
        if n_retries == max_retries:
            print('done')
            break
            
        # exec 'simulation'
        if len(y):
            x += 1
            z = y.pop(0)
            print(f'Popped {z} out of y which has a len of {len(y)}')
        
        # regardless, sleep
        print(f'sleeping at {x}')
        await load_arrows(timer)
    return x

await f()

n retries: 0
Popped 0 out of y which has a len of 9
sleeping at 1
=>|
n retries: 0
1 is a retry. Running sim again.
Popped 1 out of y which has a len of 8
sleeping at 2
=>|
n retries: 1
2 is a retry. Running sim again.
Popped 2 out of y which has a len of 7
sleeping at 3
=>|
n retries: 2
3 is a retry. Running sim again.
Popped 3 out of y which has a len of 6
sleeping at 4
=>|
n retries: 3
4 is a retry. Running sim again.
Popped 4 out of y which has a len of 5
sleeping at 5
=>|
n retries: 4
5 is a retry. Running sim again.
Popped 5 out of y which has a len of 4
sleeping at 6
=>|
n retries: 5
6 is a retry. Running sim again.
Popped 6 out of y which has a len of 3
sleeping at 7
=>|
n retries: 6
7 is a retry. Running sim again.
Popped 7 out of y which has a len of 2
sleeping at 8
=>|
n retries: 7
8 is a retry. Running sim again.
Popped 8 out of y which has a len of 1
sleeping at 9
=>|
n retries: 8
9 is a retry. Running sim again.
Popped 9 out of y which has a len of 0
sleeping at 10
=>|
n 

10

In [None]:
from time import sleep


async def f():
    y = list(range(10))
    z = []
    n_tries = 0
    max_try = 5
    run = True 
    while run:
        n_tries += 1
        print(f'This is try number: {n_tries}')
        if len(y):
            v = y.pop(0)
            z.append(v)
        else:
            if n_tries == max_try:
                print('Max tries reached.')
                run = False 
                print('exiting')
                break
            
            
            
        
            
result = await f()

result

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [None]:
supervisor.pending_jobs

In [None]:
supervisor.check_jobs()

In [None]:
# 1. get an unassigned pending job by id
job_id = supervisor.jobs['pending_jobs'].pop(0)

In [None]:
# 2. fetch the respective document/job
job_doc = supervisor.db_connector.db.pending_jobs.find_one({'job_id': job_id})

In [None]:
job_doc

In [None]:
job_params = job_doc.copy()

In [None]:
import os 
from verification_service import unique_id

os.path.exists(job_params['omex_path'])

In [None]:
# 3. Create a new in process job for the pending job we just picked up
worker_id = unique_id()
in_progress_job_id = unique_id()
in_progress_doc = supervisor.db_connector.insert_in_progress_job(
    job_id=in_progress_job_id,
    worker_id=worker_id,
    comparison_id=job_doc['comparison_id'],
)

in_progress_doc

In [None]:
# 4. Call the worker who will automatically process the job
worker = Worker(job_params=job_params)

In [None]:
# 4. Get the result from the worker and insert the new completed job for that comparison_id
from verification_service import unique_id

comparison_id = job_doc['comparison_id']

completed_doc = supervisor.db_connector.insert_completed_job(
    job_id=unique_id(),
    comparison_id=comparison_id,
    results=worker.job_result
)

In [None]:
completed_doc

### Test `Supervisor.initialize()`

In [None]:
from functools import partial

# case: uncompleted/pending jobs exist
jobs_to_complete = pending
if len(pending):
    in_progress_jobs = supervisor.jobs['in_progress_jobs']
    preferred_queue_index = supervisor.preferred_queue_index  # TODO: How can we make this more robust/dyn
    

In [None]:
supervisor.get_jobs()