# Parallel Julia (internals)
```


```

###### Amit Murthy, @amitmurthy
######JuliaCon 2015

Today's talk
- RemoteRefs
- addprocs and ClusterManagers
- AWS, MessageUtils
- MPI ClusterManager

In [None]:
type RemoteRef
    where::Int       # pid where data exists
    whence::Int      # pid where this ref was instantiated
    id::Int          # one-up identifier. tuple (whence, id) is 
                     # unique across all processors
end

Base.PGRP.refs::Dict # Refs whose "where" is current pid.
                     # (whence, id) -> RemoteValue.

Base.client_refs::WeakKeyDict  # single instance/key in local process

In [None]:
type RemoteValue
    done::Bool
    result            
    full::Condition   # waiting for a value
    empty::Condition  # waiting for value to be removed
    clientset::IntSet # clients holding a reference to this value 
    waitingfor::Int   # processor we need to hear from to fill this
end


In [None]:

addprocs(4) # creates workers with pids 2,3...
rr = RemoteRef(2) # create a reference on pid 2

In [None]:
# See if anything has actually been created on worker 2
remotecall_fetch(2, ()->println(keys(Base.PGRP.refs)))

In [None]:
# Nope, nothing. Put a value and test again.
put!(rr, :OK)
remotecall_fetch(2, ()->println(keys(Base.PGRP.refs)))

In [None]:
# Lets print the RemoteValue object
rrid = (rr.whence, rr.id)
remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].done), rrid)
remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].result), rrid)
remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].clientset), rrid)


In [None]:
# Lets take that value and see what happens to the RemoteValue store
take!(rr)

remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].done), rrid)
remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].result), rrid)
remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].clientset), rrid)

In [None]:
# Lets send the refernce to a different worker and see what happens
Base.remote_do(3, x->put!(x, :FROM_3), rr)

remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].done), rrid)
remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].result), rrid)
remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].clientset), rrid)

In [None]:
remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].done), rrid)
remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].result), rrid)
remotecall_fetch(2, rrid->println(Base.PGRP.refs[rrid].clientset), rrid)


In [None]:
take!(rr)

In [None]:
k1 = collect(keys(Base.PGRP.refs))
@schedule (remotecall_fetch(2, x->take!(x), rr); println("DONE!"))
k1

In [None]:
k2 = collect(keys(Base.PGRP.refs))
new_rrid = setdiff(k2, k1)[1]
println(k2)
println(Base.PGRP.refs[new_rrid].waitingfor)

In [None]:
put!(rr, :OK)

In [None]:
keys(Base.PGRP.refs)

###API
- execute tasks in parallel in current process 
    - `@async`, `@sync`

- remote function execution 
    - `remotecall`, `remotecall_fetch`, etc
        - not message passing
    
- execute expressions remotely 
    - `@spawn`, `@spawnat`, `@everywhere` etc


In [None]:
@sync begin
    for (idx,p) in enumerate(workers())
        @async begin
            remotecall_wait(p, idx->sleep(idx*2), idx)
            println("Executed on worker $p in task ", current_task())
        end
    end
    spawns = task_local_storage()[:SPAWNS] 
    for t in spawns[1]   
        println(t)
    end
end
println("SPAWNS : ", task_local_storage()[:SPAWNS])


- `addprocs`
    - launches workers
    - connects all workers to each other
        - 1000 workers -> 500K connections
    - support other topologies
    - route messages via intermediate nodes
    - optimize `@everywhere`


adding workers
----------------
- adding a worker
    - launch a julia process
    - listen on a free port, print (host,port) 
    - connect master to new worker
    - connect the new julia worker to existing workers

- `launch` and (optionally) `connect` is implemented by a ClusterManager

- adding a 1000 workers can be slow!
    - perform launch and connections in parallel
    - given a node, detect and launch as many workers as cores


In [None]:
Cluster Manager
--------------

Required
- `launch(m::ClusterManager, params::Dict, wrkrs::Array, c::Condition)`
- `manage(m::ClusterManager, id, cfg::WorkerConfig, op)`

Optional
- `connect(m::ClusterManager, pid::Int, cfg::WorkerConfig)`
- `kill(m::ClusterManager, pid::Int, cfg::WorkerConfig)`

In [None]:
# Lets look at a ClusterManager using unix domain sockets
type UnixDomainCM <: ClusterManager
    np::Integer
end

function Base.launch(manager::UnixDomainCM, params::Dict, launched::Array, c::Condition)
    println("launch $(manager.np)")
    for i in 1:manager.np
        sockname = tempname()
        try
            cmd = `$(params[:exename]) udcm.jl $sockname`
            io, pobj = open (cmd, "r")

            wconfig = WorkerConfig()
            wconfig.userdata = Dict(:sockname=>sockname, :io=>io, :process=>pobj)
            push!(launched, wconfig)
            notify(c)
        catch e
            println(e)
        end
    end
end


In [None]:
function Base.connect(manager::UnixDomainCM, pid::Int, config::WorkerConfig)
    println("connect $(myid()) -> $pid")
    if myid() == 1
        config.connect_at = get(config.userdata)[:sockname]
        io = get(config.userdata)[:io]    
        @schedule while !eof(io)
            line = readline(io)
            print("\tFrom worker $(pid):\t$line")
        end
    else
        sockname = get(config.connect_at)
        config.userdata = Dict{Symbol, Any}(:sockname=>sockname)
    end

    t = time()
    while true
        try
            address = get(config.userdata)[:sockname]
            sock = connect(ascii(address))
            return (sock, sock)
        catch e
            if (time() - t) > 10.0
                rethrow(e)
            else
                gc()
                sleep(0.1)
            end
        end
    end
end


In [None]:
function Base.manage(manager::UnixDomainCM, id::Int, config::WorkerConfig, op)
    println("manage id $id op $op")
end


In [None]:
# On the worker
function worker_main(sockname)
    Base.init_worker(UnixDomainCM(0))

    srvr = listen(ascii(sockname))
    while true
        sock = accept(srvr)
        Base.process_messages(sock, sock)  # One task for processing incoming 
                                           # messages per connection
    end
end

if length(ARGS) > 0 
    # script has been launched as a worker
    worker_main(ARGS[1])
end



In [None]:
rmprocs(workers())
addprocs(UnixDomainCM(4))

In [None]:
rmprocs(workers())

In [None]:
@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED
type Worker
    id::Int
    del_msgs::Array{Any,1}
    add_msgs::Array{Any,1}
    gcflag::Bool
    state::WorkerState
    c_state::Condition      # wait for state changes
    ct_time::Float64        # creation time

    r_stream::AsyncStream
    w_stream::AsyncStream
    manager::ClusterManager
    config::WorkerConfig

    function Worker(id, r_stream, w_stream, manager, config)
        ....
        w.w_stream = buffer_writes(w_stream)
        ....
    end

    ....
    
    Worker() = Worker(get_next_pid())
end

Base.PGRP.workers   # Array of workers


In [None]:
type WorkerConfig
    # Common fields relevant to all cluster managers
    io::Nullable{IO}
    host::Nullable{AbstractString}
    port::Nullable{Integer}

    # Used when launching additional workers at a host
    count::Nullable{Union{Int, Symbol}}
    exename::Nullable{AbstractString}
    exeflags::Nullable{Cmd}

    # External cluster managers can use this to store information at a per-worker level
    # Can be a dict if multiple fields need to be stored.
    userdata::Nullable{Any}

    # SSHManager / SSH tunnel connections to workers
    tunnel::Nullable{Bool}
    bind_addr::Nullable{AbstractString}
    sshflags::Nullable{Cmd}
    max_parallel::Nullable{Integer}

    # Used by Local/SSH managers
    connect_at::Nullable{Any}

    process::Nullable{Process}
    ospid::Nullable{Integer}

    # Private dictionary used to store temporary information by Local/SSH managers.
    environ::Nullable{Dict}

    ....
end


- `message_handler_loop` - one task per worker connection


| message | action |
|---------|---------
| `:call` | execute and store result |
| `:call_fetch` | execute, send result |
| `:call_wait` | execute, notify completion | 
| `:do` | remote @schedule |
| `:result` | recv result from remote |
| `:identify_socket` | peer connection from worker |
| `:join_pgrp` | connection request from master |
| `:join_complete` | worker setup complete |
    


<img src="files/tasks_in_process.png">

##An AWS example

In [None]:
using AWS.EC2

#instances = ec2_launch("ami-3b946f50", "jublr"; insttype="r3.8xlarge", n=10, clustername="juliacon_demo")
instances = ec2_instances_by_tag ("ClusterName", "juliacon_demo")


In [None]:
@time ec2_addprocs(instances, "/Users/amitm/keys/jublr.pem";
                        dir="/home/ubuntu",
                        exename="/home/ubuntu/julia/julia")


In [None]:
@time @parallel (+) for i = 1:nworkers()*10^9
    Int(rand(Bool))
end


In [None]:
rmprocs(workers())
ec2_terminate(instances)

###MessageUtils.jl

- Co-ordinate work between Julia "services"

ctasks
------
- tasks communicate via messages
- long running function with two channels
    - incoming q
    - outbound q
- named
- local state

In [None]:


nworkers() < 4 && addprocs(4-nworkers())
@everywhere using MessageUtils

@everywhere begin
    function echo()
        while true
            put!((:echo, myid(), take!()))
        end
    end
end

echo2 = ctask(echo; pid=2, name="echo_on_2");



In [None]:
put!(echo2, ("Hello from 1",))
("Hello again!",) |> echo2


In [None]:
take!(echo2)

In [None]:
remotecall_fetch(3, ct->take!(ct), echo2)

In [None]:
@everywhere begin
    function store_and_compute()
        local v1
        local v2

        while true
            msg = take!()
            op = msg[1]
            if op == :store
                v1 = msg[2]
                v2 = msg[3]
            elseif op == :generate
                n = msg[2]
                for i in 1:n
                    # write to output channel of current task
                    put!(("from $(myid())", rand(1:v1), rand(1:v2)))
                end
            elseif op == :fetch
                from_channel = msg[2]    # write to this channel
                put!(from_channel, (v1, v2))
            elseif op == :multiply_and_fetch
                from_channel = msg[2]    # write to this channel
                v = msg[3]
                put!(from_channel, v1*v2*v)
            end
        end
    end
end

snc = ctask(store_and_compute; pid=2, name="snc")
put!(snc, (:store, 10, 20))
put!(snc, (:generate, 5));


In [None]:
[take!(snc) for i in 1:5]



In [None]:
chnl = channel()
put!(snc, (:fetch, chnl))
take!(chnl)


In [None]:

put!(snc, (:multiply_and_fetch, chnl, 200))
remotecall_fetch(3, c->take!(c), chnl)


### Tuple spaces and KV spaces

In [None]:


rmprocs(workers())
addprocs(4)
@everywhere using MessageUtils

ts = tspace()
results_c = channel()

@everywhere begin
    function process_jobs(ts, key, results_c)
        while true
            rqst = take!(ts, key)
            sleep(rand())
            put!(results_c, (myid(), rqst[1]))
        end
    end
end

In [None]:
@spawnat 2 process_jobs(ts, "foo", results_c)
@spawnat 3 process_jobs(ts, "bar", results_c)
@spawnat 4 process_jobs(ts, r".*", results_c)

const results=Dict()
@async begin
    while true
        msg = take!(results_c)
        respones = get(results, msg[1], [])
        results[msg[1]] = push!(respones, msg[2])
    end
end

In [None]:
for i in 1:50
    key = rand(Bool) ? "foo" : "bar"
    put!(ts, (key, :some, :data))
end


In [None]:
collect(keys(results))

In [None]:
results[4]

##MPI Cluster Manager

- MPI.jl wraps MPI
- includes an MPI cluster manager
    - use both MPI-style and Julia-style parallelism
    
#####code walkthrough