In [2]:
#!/usr/bin/env python

# Copyright (c) 2021- The University of Notre Dame.
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.

# This program is a demonstration of using Work Queue with PyTask
# PyTask creates a Work Queue task using a python function and its argument and returns the output
# This example program finds prime numbers in a range
# Finding the primality of each number is created as a seperate task
# The tasks for all numbers in the given range are sent out to workers and then computed and returned

# How to run this program:
# Run this cell to execute the entire work queue program, from creating workers, generating tasks, and then recieving the output
# It is recommended that the default values are left for the first run through
# Real time output, as well as some informative diagnostic graphs, will be shown at the very bottom of the notebook
# Grey cells are tasks that have been submitted, yellow cells are tasks that are currently being worked on,
# Green tasks are completed tasks returned by workers. Light green represents that the number is prime, dark green is composite
# After the Workqueue Status textbox displays "All tasks complete!", the program is finished! 
# Afterwards, the four variables below (min_workers, max_workers, starting_task, ending_task) can be adjusted to test out different combinations
# Then feel free to play around with the number range and number of workers

import work_queue as wq
import ipywidgets as widgets
import time
import widget_control 

display = widget_control.Display() # display class to handle all the fancy output widgets
display.min_workers = 1 # controls the minimum number of workers allowed
display.max_workers = 30 # controls the maximum number of workers allowed to connect
display.starting_task = 2 # controls the first number to determine the primality of
display.ending_task = 100 # controls the last number to determine the primality of

# function to create the widget display at the bottom of the notebook
display.create_output_widgets()

# The function below takes in a number as an input and returns 0 if the number is composite or 1 if the number is prime
# PyTask uses this function to create the tasks given to workers to determine the primality of numbers in a range
def is_prime(number): 
    import math # PyTask requires that all libraries used in the function are included inside the function
    for i in range(2, int(math.sqrt(number)) + 1):
            if (number % i == 0):
                return 0 # if the number has a divisor, it is composite
    return 1 # otherwise it is prime

# create the Work Queue queue and specify the number of workers as well as the properties of those workers
q = wq.WorkQueue(0) # specify the port to create the workers on. 0 selects a random unused port
workers = wq.Factory('local', manager_host_port=f'localhost:{q.port}') # create the worker factory which handles the workers
# specify the resources of the workers as well as the maximum and minimum amount of them
workers.cores = 1
workers.memory = 100
workers.disk = 100
workers.min_workers = 1
workers.max_workers = display.max_workers

# the line below is a fault tolerant method of instantiating the Work Queue workers as well as cleaning them up when finished
with workers: 
    # loop through the range of numbers desired and use PyTask to create tasks to determine their primality
    for i in range(display.starting_task, display.ending_task + 1):
        p_task = wq.PythonTask(is_prime, i) # create the task using PyTask, supplying the function and its argument(s)
        p_task.specify_environment("worker-env.tar.gz") # Because the function uses the math library and PyTask requires the dill library, we must specify an enviroment for the workers that contains these elements
        q.submit(p_task) # submit the tasks into the queue
        
    while not q.empty(): # continue until all the tasks have been completed
        t = q.wait(5) # give control to Work Queue so that it can talk to workers
        if t:
            if t.return_status != 0: # The task failed
                continue
            display.update_output_widgets(q, t) # update output widgets 

HBox(children=(Text(value='Initializing workers', description='Workqueue Status', disabled=True, style=Descrip…

HBox(children=(IntProgress(value=0, description='Percent of tasks complete:', max=98, style=ProgressStyle(desc…

VBox(children=(HBox(children=(ToggleButton(value=False, description='2', layout=Layout(height='20px', width='2…

VBox(children=(Figure(fig_margin={'top': 60, 'bottom': 60, 'left': 60, 'right': 60}, marks=[Pie(colors=['tomat…