In [1]:
using DataStructures
using CPUTime
using DelimitedFiles
using Base

In [2]:
numVertex=4
Edges = [(4,1,2), (1,2,1), (2,3,1), (3,4,1), (4,2,3), (1,3,5)]

6-element Array{Tuple{Int64,Int64,Int64},1}:
 (4, 1, 2)
 (1, 2, 1)
 (2, 3, 1)
 (3, 4, 1)
 (4, 2, 3)
 (1, 3, 5)

In [3]:
numVertex=6
Edges = [
(6, 1, 1),
(6, 2, 3),
(1, 2, 1),
(1, 3, 7),
(2, 4, 1),
(1, 4, 3),
(3, 4, 1),
(3, 5, 1),
(4, 5, 6)]
# source, destionation, weight

9-element Array{Tuple{Int64,Int64,Int64},1}:
 (6, 1, 1)
 (6, 2, 3)
 (1, 2, 1)
 (1, 3, 7)
 (2, 4, 1)
 (1, 4, 3)
 (3, 4, 1)
 (3, 5, 1)
 (4, 5, 6)

In [2]:
Edges = Vector{}()
function readDataFromFile(filename="in_n_10e5_m_5e6.in")
    open(filename, "r") do f 
        n, m = split(readline(f))
        n = parse(Int, n)
        sizehint!(Edges, parse(Int, m))
#         println(n," ",m)
        for ln in eachline(f)
#             a,b,c = split(ln)
            a,b,c = readdlm(IOBuffer(ln), Int)
            if(a==0)
                a=n
            end
            if(b==0)
                b=n
            end
            push!(Edges, (a,b,c))
#             println(a," ",b," ",c)
        end
        return n
    end
end
numVertex = readDataFromFile()

100000

In [53]:
uf = IntDisjointSets(numVertex+1)

weights = Vector{}()
sizehint!(weights, length(Edges))

mst= Vector{}()
sizehint!(mst, numVertex)

0-element Array{Any,1}

In [54]:
for e in Edges
    push!(weights, (e[3], (src=e[1],dst= e[2])))
end

In [5]:
Base.@pure function kruskal(EdgeWeights, MST, UF)
    if isempty(EdgeWeights)
        return 0
    end
    sortedEdgeWeights = sort(EdgeWeights)
    mst_weight=0
    for weight in sortedEdgeWeights
        c, e = weight
        if !in_same_set(uf, e.src, e.dst)
            union!(uf, e.src, e.dst)
            push!(mst, e)    
            mst_weight+=c
        end
    end
    return mst_weight
end    

kruskal (generic function with 1 method)

In [55]:
using Random
rng = MersenneTwister();

In [56]:
Base.@pure function filterKruskal(EdgeWeights, MST, UF)
    if isempty(EdgeWeights)
        return 0
    end
    if kruskalThreshold(length(weights), length(MST))
        # println("pre-start kruskal ",length(EdgeWeights) ," at", CPUtime_us())
        tmp =  @async kruskal(EdgeWeights, MST, UF)
        println("allocated kruskal ",length(EdgeWeights) ," at", CPUtime_us())
        res = fetch(tmp)
        println("finished kruskal ",length(EdgeWeights) ," at", CPUtime_us())
        return res
    else
        sampler = Random.Sampler(rng, EdgeWeights)
        pivot = rand(rng, sampler)
        
        # Partition
        EdgeWeightsLeft = Vector{}()
        EdgeWeightsRight = Vector{}()
        for EdgeWeight in EdgeWeights
            if EdgeWeight[1] < pivot[1]
                push!(EdgeWeightsLeft, EdgeWeight)
            else
                push!(EdgeWeightsRight, EdgeWeight)
            end
        end
#         println("pivot = ",pivot)
#         println("left = ",length(EdgeWeightsLeft))
#         println("right = ",length(EdgeWeightsRight))
        println("Partition ",length(EdgeWeightsLeft)," ",length(EdgeWeightsRight) ," going to start at ", CPUtime_us())
        leftMST = @async filterKruskal(EdgeWeightsLeft, MST, UF)
        FilteredEdgeWeightsRight = filter(EdgeWeightsRight, UF)
        rightMST = @async filterKruskal(fetch(FilteredEdgeWeightsRight), MST, UF)
#         println(Threads.nthreads())
        println("Partition ",length(EdgeWeightsLeft)," ",length(EdgeWeightsRight) ," allocated at ", CPUtime_us())
        res = fetch(leftMST) + fetch(rightMST)
        println("Partition ",length(EdgeWeightsLeft)," ",length(EdgeWeightsRight) ," finished at ", CPUtime_us())
        return res
    end
end

filterKruskal (generic function with 1 method)

In [57]:
Base.@pure function kruskalThreshold(EdgesLength, MSTLength)
    sp = Random.Sampler(rng, 1:EdgesLength) # or Random.Sampler(MersenneTwister, 1:20)
#     return EdgesLength <=10000
    return EdgesLength/1.25<=rand(rng, sp) || EdgesLength <=1000
end

kruskalThreshold (generic function with 1 method)

In [58]:
Base.@pure function filter(EdgeWeights, UF)
    FilteredEdgeWeights = Vector{}()
    for EdgeWeight in EdgeWeights
        if !in_same_set(UF, EdgeWeight[2].src, EdgeWeight[2].dst)
            push!(FilteredEdgeWeights, EdgeWeight)
        end
    end
    return FilteredEdgeWeights
end

filter (generic function with 1 method)

In [59]:
@time @CPUtime filterKruskal(weights, mst, uf)

Partition 268535 4731465 going to start at 279439827
Partition 268535 4731465 allocated at 284032377
Partition 27117 241418 going to start at 284185731
Partition 27117 241418 allocated at 286948188
Partition 3453851 1277614 going to start at 286747129
Partition 3453851 1277614 allocated at 288240890
Partition 7397 19720 going to start at 286965884
Partition 7397 19720 allocated at 288260161
Partition 7750 233668 going to start at 287253727
Partition 7750 233668 allocated at 288464044
pre-start kruskal 3453851 at288240980
allocated kruskal 3453851 at288604181
finished kruskal 3453851 at297514893
pre-start kruskal 1277614 at288241011
allocated kruskal 1277614 at297515060
finished kruskal 1277614 at300403324
Partition 2246 5151 going to start at 288265047
Partition 2246 5151 allocated at 300408219
Partition 11500 8220 going to start at 288277194
Partition 11500 8220 allocated at 300416877
Partition 5056 2694 going to start at 288469458
Partition 5056 2694 allocated at 300426809
Partition 

32762056626

In [14]:
@time @CPUtime kruskal(weights, mst, uf)


elapsed CPU time: 12.026884 seconds
 12.189014 seconds (71.03 M allocations: 1.725 GiB, 9.60% gc time)


5996149158

In [None]:
Threads.nthreads()