# Using DArrays for reading and manipulating data
## Parallel Workshop JuliaCon 2016
### DArrays basics

In [1]:
# Add processes
addprocs(8)
# Use package for distributed arrays
using DistributedArrays

In [12]:
# Create vector of process IDs
C = procs()

9-element Array{Int64,1}:
 1
 2
 3
 4
 5
 6
 7
 8
 9

In [13]:
# Apply a map to the vector
map(t -> t*t, C)

9-element Array{Int64,1}:
  1
  4
  9
 16
 25
 36
 49
 64
 81

In [14]:
# Make the vector distributed
D = distribute(C)

9-element DistributedArrays.DArray{Int64,1,Array{Int64,1}}:
 1
 2
 3
 4
 5
 6
 7
 8
 9

In [16]:
# show how the vector is distributed accross the workers
D.indexes

8-element Array{Tuple{UnitRange{Int64}},1}:
 (1:1,)
 (2:2,)
 (3:3,)
 (4:5,)
 (6:6,)
 (7:7,)
 (8:8,)
 (9:9,)

In [19]:
# apply map to distributed vector (looks identical to non-distributed case)
map(t -> t*t, D)

9-element DistributedArrays.DArray{Int64,1,Array{Int64,1}}:
  1
  4
  9
 16
 25
 36
 49
 64
 81

In [20]:
# Distributed vectors not restricted to numerical types
map(t -> Dates.monthname((t - 1) % 12 + 1), D)

9-element DistributedArrays.DArray{UTF8String,1,Array{UTF8String,1}}:
 "January"  
 "February" 
 "March"    
 "April"    
 "May"      
 "June"     
 "July"     
 "August"   
 "September"

In [27]:
# A slightly more complicated example of map and reduce
monthString = map(t -> Dates.monthname((t - 1) % 12 + 1) |> s -> s*" is my favorite month.\n", D) |>
    t -> reduce(*, Array(t))
println(monthString)

January is my favorite month.
February is my favorite month.
March is my favorite month.
April is my favorite month.
May is my favorite month.
June is my favorite month.
July is my favorite month.
August is my favorite month.
September is my favorite month.



In [30]:
# Distributed comprehension
D55 = @DArray [randn(5,5) for i = 1:32]

32-element DistributedArrays.DArray{Array{Float64,2},1,Array{Array{Float64,2},1}}:
 5x5 Array{Float64,2}:
 -1.06675   -0.862483   0.068994  -0.954383   0.612892 
  1.23371    0.470195  -0.569822   1.97442   -0.357816 
 -0.678979   0.283267  -0.719494  -0.321645   0.30929  
  0.334916  -1.22503    0.745611  -2.33004    0.0618629
  0.63521   -1.19402   -1.89202   -1.41228   -0.399258                
 5x5 Array{Float64,2}:
 -0.263565   0.792395   1.4206     -1.04345   -1.3819  
 -0.774798  -2.12411    0.233554    0.480621  -1.07603 
  0.106657  -0.635091   1.4687      1.32517   -0.53843 
 -0.109394  -0.351786  -1.76389    -1.26364    0.923438
 -1.03854    0.719064  -0.0939638  -0.795515  -1.6328                 
 5x5 Array{Float64,2}:
  0.552984   0.771221  -1.13863   -0.508559  -0.0113824
 -0.933323   0.108645  -0.156966   0.245402  -1.24307  
 -1.71768   -0.63347   -1.1948    -0.513568   0.102846 
  1.00582    0.5017     0.254344  -0.459188  -1.24642  
  0.578157  -0.955268  -0.486167  

In [31]:
# Compute singular values of the dsitributed vector of matrices
Dsvd = map(svdvals, D55)

32-element DistributedArrays.DArray{Array{Float64,1},1,Array{Array{Float64,1},1}}:
 [3.999836455760208,2.5044582011232213,1.7051737717267952,0.6770905297265457,0.2298652678344431]    
 [3.5269810373142625,2.9504161034893985,2.3501373295253316,0.904445051394586,0.036825704100338044]  
 [2.6238799978968435,2.1866508742328703,1.707233356369122,1.2765590827198168,0.5839922950173981]    
 [4.105123344980141,1.9854356889225975,1.6245519564067503,1.4201918512438185,0.03971714564736037]   
 [3.8691762964769665,2.8147930897207667,2.352397425888243,0.9912135906748117,0.2145582563848968]    
 [4.2015580368111705,2.2515617032836204,1.5737011557308977,0.44782935379067657,0.28057510281268183] 
 [2.7386924486715265,2.328167427456983,1.674341982152823,1.0797402982138045,0.4057750025903677]     
 [2.8585407690602858,2.4226851529761357,1.9197908080706862,0.7444017726046709,0.3663540362476281]   
 [3.6701118732695526,2.6341723275159223,1.7506859088988276,1.362974720086374,1.1825383948594033]    
 [3.4742

### Reading data in parallel

In [32]:
# Save the path to the data directory and load a list of subdirectories with the data
pth = "/data/MIMICII"
dirs = filter(isdir, map(t -> joinpath(pth, t), readdir(pth)))

LoadError: LoadError: SystemError: unable to read directory /data/MIMICII: No such file or directory
while loading In[32], in expression starting on line 3

In [None]:
# Extract the data files
fls = mapreduce(t -> map(s -> joinpath(t, s), readdir(t)), vcat, dirs)

In [None]:
# Size if GB
@time sum(map(filesize, fls))/1024^3

In [33]:
# Create smaller subset of the files to avoid waiting
flsSmall = fls[1:div(length(fls), 10)]
@time sum(map(filesize, flsSmall))/1024^3

LoadError: LoadError: UndefVarError: fls not defined
while loading In[33], in expression starting on line 2

In [None]:
# Use package for reading binary files (written in Julia)
using MAT

In [None]:
# Map matread to all the file paths to read in all the files in parallel
@time dt = map(matread, distribute(flsSmall));

In [None]:
# The result is distributed vector of dictionaries of vectors (and a dictionary)
@show typeof(dt)
dt[1]

In [None]:
# Plot a signal
using PlotlyJS
plot(PlotlyJS.scattergl(;y = dt[1]["signal"][:]))

In [None]:
# custom cleaner functions are fast in Julia
# in this exmaple, replace NaNs with zeros for a single signal
x = map(t -> (isnan(t) ? 0 : t), dt[1]["signal"][:]);

# fft
xfft = fft(x)

In [None]:
# size in GB
@time mapreduce(Base.summarysize, +, dt)/1024^3

In [None]:
# Compute the lengths for the signals
@time lngs = map(t -> length(t["signal"]), dt)

In [None]:
# Plot the distribution of the lenghts of the signals
plot(histogram(x = lngs), Layout(yaxis=Dict("type" => "log")))

In [None]:
# My own small package for iterative SVD and a few convenience methods

# Pkg.clone("https://github.com/andreasnoack/TSVD.jl")
using TSVD

# Define method for converting distributed vector of vectors to distributed matrix
function Base.hcat{T}(A::DistributedArrays.DArray{Vector{T}})
    n = length(A[1])
    D = DArray((n, length(A)), A.pids, [1, length(A.pids)]) do I
        mB, nB = map(length, I)
        # Create new DArray from the distributed vector of vectors.
        # For now, we assume that each vector is only located on a single
        # worker. Eventually, we'd like to find a more flexible solution.
        B = Array(eltype(A[1]), mB, nB)
        for i = 1:nB
            B[:,i] = A[I[2][i]][I[1]]
        end
        B
    end
    return D
end

# convenience function for concatenating distributed vectors collected in a vector
@everywhere function Base.hcat{T<:DistributedArrays.DVector}(x::Vector{T})
    l    = length(x)
    if l == 0
        throw(ArgumentError("cannot flatten empty vector"))
    else
        x1   = x[1]
        m, n = size(x1, 1), size(x1, 2)
        B    = DArray((m, l*n)) do I
            B_local = Array(eltype(x1), map(length, I))
            for j = 1:length(I[2])
                B_local[:, j] = x[I[2][j]][I[1]]
            end
            return B_local
        end
        return B
    end
end

@everywhere Base.procs(A::Array) = fill(myid(), 1, 1)

Base.convert{S,T,N,D<:DArray}(::Type{Array{S,N}}, s::SubArray{T,N,D}) = begin
    I = s.indexes
    d = s.parent
#     println("Hej", isa(I,Tuple{Vararg{UnitRange{Int}}}))
#     if isa(I,Tuple{Vararg{UnitRange{Int}}}) && S<:T && T<:S
    l = DistributedArrays.locate(d, map(first, I)...)
        if isequal(d.indexes[l...], I)
            # SubDArray corresponds to a chunk
        return DistributedArrays.chunk(d, l...)
        end
#     end
#     a = Array(S, size(s))
#     a[[1:size(a,i) for i=1:N]...] = s
#     return a
end

In [None]:
# Necesarry to define our own rep function because Julia's repeat is a bit slow
@everywhere function rep(x::Vector, l)
    y = similar(x, l)
    cx = cycle(x)
    s = start(cx)
    @inbounds @simd for i = 1:l
        (yi, s) = next(cx, s)
        y[i] = ifelse(isnan(yi), zero(yi), yi)
    end
    return y
end

In [None]:
# Compute a distributed vector of vectors of equal lengths
@time dt1 = let n = 50000
    map(t -> rep(vec(t["signal"]), n), dt);
end;

In [None]:
# Convert to a distributed matrix
@time A = hcat(dt1);

In [None]:
# Apply the fft along the first dimension of the matrix
@time B = mapslices(fft, A, 1)

In [None]:
# Similar to 
map(fft, dt1)

In [None]:
# Created initial vector. Has to be distributed.
v0 = DArray(I -> rand(Complex64, length(I[1])), (size(A, 1),), A.pids[:,1])

# Compute the SVD
@time U, s, V = TSVD.tsvd(A, 5, initVec = v0, stepSize = 5, debug = true);