In [1]:
using Distributed

In [2]:
addprocs(4)

4-element Vector{Int64}:
 2
 3
 4
 5

In [3]:
@everywhere begin
    import Pkg
    Pkg.activate(joinpath(@__DIR__, "SlurmCLI"))
    Pkg.instantiate()
end

      From worker 5:	[32m[1m  Activating[22m[39m project at `/global/u1/b/blaschke/analyze-iris/SlurmCLI`
      From worker 3:	[32m[1m  Activating[22m[39m project at `/global/u1/b/blaschke/analyze-iris/SlurmCLI`
      From worker 4:	[32m[1m  Activating[22m[39m project at `/global/u1/b/blaschke/analyze-iris/SlurmCLI`
      From worker 2:	[32m[1m  Activating[22m[39m project at `/global/u1/b/blaschke/analyze-iris/SlurmCLI`


[32m[1m  Activating[22m[39m project at `/global/u1/b/blaschke/analyze-iris/SlurmCLI`


In [4]:
@everywhere using SlurmCLI

In [5]:
@everywhere begin
    using Base: @kwdef
    using CodecZlib
    using LMDB
    using Chain
    using JSON
    using Dates
end

In [6]:
@everywhere @kwdef struct Job
    id::Int64                    = 0
    admin_comment::Vector{UInt8} = UInt8[]
    valid::Bool                  = false
end

In [7]:
fn_ret_type(fn, in_type::DataType) = Base.return_types(fn, (in_type,))[1]

function launch_monitor(processor; buffer_size=32)
    function remote_monitor(fn, jobs, results)
        @sync while true
            job = try
                take!(jobs)
            catch y
                break
            end
            t = @async fn(job)
            @async put!(results, fetch(t))
        end
    end

    jobs    = RemoteChannel(()->Channel{Job}(buffer_size))
    results = RemoteChannel(
            ()->Channel{fn_ret_type(processor, Job)}(buffer_size)
        )

    for p in workers()
        remote_do(remote_monitor, p, processor, jobs, results)
    end

    return jobs, results
end


function collect!(
        results::A; collect_time=1,
    ) where {
             T <: Job,
             S <: AbstractChannel{T},
             A <: Union{S, RemoteChannel{S}}
            }

    collected = Vector{Job}()

    t = @async while true
        fd = take!(results)
        push!(collected, fd)
    end

    sleep(collect_time)
    schedule(t, InterruptException(), error=true)

    return collected
end

collect! (generic function with 1 method)

In [8]:
@everywhere function lmdb_add(lmdb_d, key, value)
    LMDB.txn_dbi_do(lmdb_d) do txn, dbi
        if !( key in keys(lmdb_d) )
            put!(txn, dbi, key, SlurmCLI.Bits.to_bits([0]))
        end

        idx = SlurmCLI.Bits.from_bits(get(txn, dbi, key, Vector{UInt8}))[1] + 1
        put!(txn, dbi, "$(key):$(idx)", value)
        put!(txn, dbi, key, SlurmCLI.Bits.to_bits([idx]))
    end
end

@everywhere function lmdb_get(lmdb_d, key)
    vals = []
    
    if !( key in keys(lmdb_d) )
        return vals
    end
    
    LMDB.txn_dbi_do(lmdb_d) do txn, dbi
        idx = SlurmCLI.Bits.from_bits(get(txn, dbi, key, Vector{UInt8}))[1]
        for i in 1:idx
            push!(vals, get(txn, dbi, "$(key):$(i)", Vector{UInt8}))
        end
    end
    return vals
end

In [9]:
lmdb_d = LMDBDict{Int64, Vector{UInt8}}(
    joinpath(ENV["CFS"], "nstaff", "blaschke", "slurm", "cori")
)

LMDBDict{Int64, Vector{UInt8}}(Environment is opened
DB path: /global/cfs/cdirs/nstaff/blaschke/slurm/cori
Size of the data memory map: 1048576
ID of the last used page: 1
ID of the last committed transaction: 0
Max reader slots in the environment: 126
Max reader slots used in the environment: 0, DBI(0x00000001, ""))

In [10]:
lmdb_d.env[:MapSize] = 1024^3

1073741824

In [11]:
lmdb_d

LMDBDict{Int64, Vector{UInt8}}(Environment is opened
DB path: /global/cfs/cdirs/nstaff/blaschke/slurm/cori
Size of the data memory map: 1073741824
ID of the last used page: 1
ID of the last committed transaction: 0
Max reader slots in the environment: 126
Max reader slots used in the environment: 0, DBI(0x00000001, ""))

In [12]:
jobs, admin_comments = launch_monitor(
    x->begin
        sacct = SlurmCLI.shell(`sacct -nPXo AdminComment -j $(x.id)`)
        valid = length(sacct.stdout)>0 && length(sacct.stderr)==0
        sacct_compressed = transcode(ZlibCompressor, sacct.stdout)
        Job(id=x.id, admin_comment=sacct_compressed, valid=valid)
    end
    ; buffer_size=1024
)

(RemoteChannel{Channel{Job}}(1, 1, 78), RemoteChannel{Channel{Job}}(1, 1, 79))

In [13]:
# db, results = launch_monitor(
#     x->begin
#         lmdb_d = LMDBDict{String, Vector{UInt8}}(
#            joinpath(ENV["CFS"], "nstaff", "blaschke", "slurm", "cori")
#         )
#         if ! x.valid
#             return x
#         end
#         lmdb_add(lmdb_d, "$(x.id)", x.admin_comment)
#         close(lmdb_d)
#         x
#     end
# )

In [14]:
@async for i=1:100000000
    put!(jobs, Job(id=i))
end

Task (runnable) @0x0000155529ad6120

In [15]:
using ProgressMeter
ProgressMeter.ijulia_behavior(:clear)

false

In [16]:
p  = Progress(
    100; desc="Collected: ", showspeed=true, enabled=true
)
update!(p, 0)

while true
    admn = collect!(admin_comments)
    next!(p)

    if length(admn) == 0
        sleep(10)
        continue
    end
    
    # last = admn[end]
    # if last.valid
    #     dt = @chain last.admin_comment begin
    #         transcode(ZlibDecompressor, _)
    #         String
    #         JSON.parse(_; dicttype=Dict{Symbol, Any})
    #         _[:submitTime]
    #         unix2datetime
    #     end
    #     println(dt)
    # end
    
    # for job in admn
    #     put!(db, job)
    # end
    
    for job in admn
        if ! job.valid
            continue
        end
        lmdb_d[job.id] = job.admin_comment
        # lmdb_add(lmdb_d, "$(job.id)", job.admin_comment)
    end
    
    # next!(p, showvalues=[("length(keys) = ", length(keys(lmdb_d)))])
end

[32mCollected: 100%|██████████████████████████| Time: 0:03:53 ( 2.34  s/it)[39m


LoadError: Code[-30792]: MDB_MAP_FULL: Environment mapsize limit reached

In [31]:
keys(lmdb_d)

136516-element Vector{Int64}:
  65536
 131072
    256
  65792
 131328
    512
 131584
    768
  66304
 131840
   1024
  66560
 132096
      ⋮
  64255
 129791
  64511
 130047
  64767
 130303
  65023
 130559
  65279
 130815
  65535
 131071

In [28]:
length(keys(lmdb_d))

136516