In [2]:
using Distributed,ClusterManagers

cpus_per_task = 5;
addprocs(cpus_per_task, exeflags=["--threads=5", "--project=$(Base.active_project())"])

for i in workers()
    id, pid, host = fetch(@spawnat i (myid(), getpid(), gethostname()))
    println(id, " " , pid, " ", host)
end

Threads.@threads for i in workers()
    tid = Threads.threadid()
    println(tid)
end

2 829618 della-vis1.princeton.edu
3 829620 della-vis1.princeton.edu
4 829621 della-vis1.princeton.edu
5 829622 della-vis1.princeton.edu
6 829624 della-vis1.princeton.edu
2
4
3
1
5


In [None]:
using Distributed,ClusterManagers
using Base.Threads
using Dates

@everywhere function init_proc()
    

# do work on a processor in a multithreaded fashion
@everywhere function do_proc_work_multithreaded()
    # Shared channel for task distribution
    global task_queue = RemoteChannel(()->Channel{Function}(32));

    # Worker function that continuously takes tasks from the queue and executes them
    function worker()
        while true
            func = take!(task_queue)  # Take a function from the queue
            func()  # Execute the function
        end
    end

    worker_tasks = []
    
    # Start a set of worker threads
    for i in 1:Threads.nthreads()
        push!(worker_tasks, Threads.@spawn worker())
    end

    # Example function to put tasks into the queue
    function enqueue_tasks(n)
        for i in 1:n
            put!(task_queue, () -> begin
                thread_id = Threads.threadid()
                println("Task running on thread $thread_id at time $(now())")
                sleep(rand())  # Random delay to simulate work
            end)
            sleep(0.1)  # Throttle task creation
        end
    end

    # Start task enqueueing in a separate thread
    et = Threads.@spawn enqueue_tasks(5)
    wait(et)
    
    for task in worker_tasks
        wait(task)
    end
end

do_proc_work_multithreaded()

Task running on thread 2 at time 2024-05-13T18:26:18.082
Task running on thread 5 at time 2024-05-13T18:26:18.173
Task running on thread 4 at time 2024-05-13T18:26:18.275
Task running on thread 3 at time 2024-05-13T18:26:18.376
Task running on thread 4 at time 2024-05-13T18:26:18.478


In [None]:
using Distributed,ClusterManagers
addprocs(4);

global jobs = RemoteChannel(()->Channel{Int}(32));
global results = RemoteChannel(()->Channel{Tuple}(32));

n=12;
exec_time=1

@everywhere function do_work(jobs, results) # define work function everywhere
    while true
        job_id = try
            take!(jobs)
        catch
            return # exit early if channel closed
        end
        exec_time = rand()
        sleep(exec_time) # simulates elapsed time doing actual work
        put!(results, (job_id, exec_time, myid()))
    end
end

function make_jobs(n)
   for i in 1:n
       put!(jobs, i)
   end
end;

for p in workers()
    remote_do(do_work, p, jobs, results)
end

function mono_mga()
    make_jobs(n)

    # Take results
    try
        for _ in 1:5
            job_id, exec_time, worker_id = take!(results)
            println("Job $job_id finished in $exec_time seconds on worker $worker_id")
        end
    catch e
        println("Failed to take from results: $e")
    end
end

@elapsed mono_mga()

In [3]:
function initialize_dist_helpers()
    ##### Initialize a distributed arrays of JuMP models
    ## Start pre-solve timer
    subproblem_generation_time = time()
    
    num_procs = ENV["SLURM_CPUS_PER_TASK"]
    helpers_all = distribute([Dict() for i in 1:num_procs]);
    workers_all = workers()

    @sync for i in 1:num_procs
        p = workers_all[i]
        @async @spawnat p begin
            W_local = localindices(helpers_all)[1];
            inputs_local = [inputs_decomp[k] for k in W_local];
            SUBPROB_OPTIMIZER =  configure_benders_subprob_solver(setup["Solver"], setup["settings_path"]);
            init_local_helper!(setup,inputs_local,localpart(helpers_all),master_vars,master_cons,SUBPROB_OPTIMIZER);
        end
    end

    # master_vars_sub = Dict();
    # for i in eachindex(helpers_all)
    #   w = helpers_all[i]["SubPeriod"];
    #   master_vars_sub[w] = helpers_all[i]["master_vars_sub"];
    # end

    p_id = workers_all[1:num_sub];
    np_id = length(p_id);

    master_vars_sub = [Dict() for k in 1:np_id];

    @sync for k in 1:np_id
              @async master_vars_sub[k]= @fetchfrom p_id[k] get_local_master_vars(localpart(helpers_all))
    end

    master_vars_sub = merge(master_vars_sub...);

    ## Record pre-solver time
    subproblem_generation_time = time() - subproblem_generation_time
    println("Distributed operational subproblems generation took $subproblem_generation_time seconds")

    return helpers_all,master_vars_sub

end

4-element Vector{Int64}:
 2
 3
 4
 5