Skip to content

Commit

Permalink
implement affinity for unread FileRefs
Browse files Browse the repository at this point in the history
return processor affinity (as given out by MemPool.jl) and size in `affinity` method if `FileRef` has not been read anywhere yet
  • Loading branch information
tanmaykm committed Apr 30, 2018
1 parent 25d6e00 commit 8cec439
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/chunks.jl
Expand Up @@ -76,7 +76,7 @@ function affinity(r::FileRef)
if haskey(MemPool.who_has_read, r.file)
Pair{OSProc, UInt64}[OSProc(dref.owner) => r.size for dref in MemPool.who_has_read[r.file]]
else
return Pair{OSProc, UInt64}[]
Pair{OSProc, UInt64}[OSProc(MemPool.get_worker_at(r.host)) => r.size]
end
end

Expand Down
9 changes: 6 additions & 3 deletions test/array.jl
Expand Up @@ -173,7 +173,7 @@ end
end

@testset "sort" begin
@show x = shuffle(1:10)
x = shuffle(1:10)
X = distribute(x, 4)
@test collect(sort(X)) == sort(x)

Expand Down Expand Up @@ -201,8 +201,11 @@ end
@test aff[1][1] == Dagger.OSProc(myid())
@test aff[1][2] == sizeof(Int)*10
@test Dagger.tochunk(x) === x
f = MemPool.FileRef("/tmp/d", 1)
@test isempty(Dagger.affinity(f))
f = MemPool.FileRef("/tmp/d", aff[1][2])
aff = Dagger.affinity(f)
@test length(aff) == 1
@test (aff[1][1]).pid in procs()
@test aff[1][2] == sizeof(Int)*10
end

@testset "show_plan" begin
Expand Down

0 comments on commit 8cec439

Please sign in to comment.