In [1]:
using InMemoryDatasets
import InMemoryDatasets as IMD
using BenchmarkTools
using TimerOutputs

# Data

In [5]:
@show dsl = Dataset(xid = [111,222,333,444,555], 
                    x1 = [1,2,1,4,5], 
                    x2 = [-1.2,-3,2.1,-3.5,2.8],
                    x3 = [Date("2019-10-03"), Date("2019-09-30"), Date("2019-10-04"), Date("2019-10-03"), Date("2019-10-03")],
                    x4 = ["abcd","efgh","ijkl","mnop","qrst"]);

@show dsr = Dataset(yid = [111,111,222,444,333],
                    y1 = [3,3,3,3,3],
                    y2 = [0,0, -3,1,2],
                    y3 = [Date("2019-10-01"),Date("2019-10-01"), Date("2019-09-30"), Date("2019-10-05"), Date("2019-10-05")],
                    y4 = ["abc","abcd","efg","mnop","qrst"]);

dsl_big = Dataset(rand(10^3, 5), :auto);
modify!(dsl_big,[:x1,:x2,:x3] .=> byrow(x -> (x*100)));
modify!(dsl_big,[:x1,:x2,:x3] .=> byrow(round), [:x1,:x2,:x3] .=> byrow(Int));

dsr_big = Dataset(rand(10^3, 5), :auto);
modify!(dsr_big,[:x1,:x2,:x3] .=> byrow(x -> (x*100)));
modify!(dsr_big,[:x1,:x2,:x3] .=> byrow(round), [:x1,:x2,:x3] .=> byrow(Int));
function x2y(str)
  replace(str,'x' => 'y')
end
rename!(x2y,dsr_big);

dsl_big2 = Dataset(rand(10^4, 5), :auto);
modify!(dsl_big2,[:x1,:x2,:x3] .=> byrow(x -> (x*100)));
modify!(dsl_big2,[:x1,:x2,:x3] .=> byrow(round), [:x1,:x2,:x3] .=> byrow(Int));

dsr_big2 = Dataset(rand(10^4, 5), :auto);
modify!(dsr_big2,[:x1,:x2,:x3] .=> byrow(x -> (x*100)));
modify!(dsr_big2,[:x1,:x2,:x3] .=> byrow(round), [:x1,:x2,:x3] .=> byrow(Int));
function x2y(str)
  replace(str,'x' => 'y')
end
rename!(x2y,dsr_big2);

dsl = Dataset(xid = [111, 222, 333, 444, 555], x1 = [1, 2, 1, 4, 5], x2 = [-1.2, -3, 2.1, -3.5, 2.8], x3 = [Date("2019-10-03"), Date("2019-09-30"), Date("2019-10-04"), Date("2019-10-03"), Date("2019-10-03")], x4 = ["abcd", "efgh", "ijkl", "mnop", "qrst"]) = 5×5 Dataset
 Row │ xid       x1        x2        x3          x4
     │ identity  identity  identity  identity    identity
     │ Int64?    Int64?    Float64?  Date?       String?
─────┼────────────────────────────────────────────────────
   1 │      111         1      -1.2  2019-10-03  abcd
   2 │      222         2      -3.0  2019-09-30  efgh
   3 │      333         1       2.1  2019-10-04  ijkl
   4 │      444         4      -3.5  2019-10-03  mnop
   5 │      555         5       2.8  2019-10-03  qrst
dsr = Dataset(yid = [111, 111, 222, 444, 333], y1 = [3, 3, 3, 3, 3], y2 = [0, 0, -3, 1, 2], y3 = [Date("2019-10-01"), Date("2019-10-01"), Date("2019-09-30"), Date("2019-10-05"), Date("2019-10-05")], y4 = ["abc", "abcd", "efg", "mnop",

# Function src

In [50]:
function my_cartesianjoin_v5(dsl::AbstractDataset, dsr::AbstractDataset;
  on=nothing, threads::Bool=true, flag=ones(Bool, nrow(dsl) * nrow(dsr)))

  reset_timer!()

  @timeit "Step-0 Syntax" begin
    dsr_cols = Symbol[]
    equalon_dsr_cols = Symbol[]
    conditions = Function[]
  
    for element in map(x -> x.second, on)
      if typeof(element) <: Pair
        push!(dsr_cols, element.first)
        push!(conditions, element.second)
      else
        push!(dsr_cols, element)
        push!(equalon_dsr_cols, element)
        push!(conditions, isequal)
      end
    end
  
    onleft = IMD.multiple_getindex(IMD.index(dsl), map(x -> x.first, on))
    onright = IMD.multiple_getindex(IMD.index(dsr), dsr_cols)
  
    equalon_dsr_cols = IMD.multiple_getindex(IMD.index(dsr), equalon_dsr_cols)
    right_cols = setdiff(1:length(IMD.index(dsr)), equalon_dsr_cols)
  
    l_len = nrow(dsl)
    r_len = nrow(dsr)
  end



  # get flag
  @timeit "Step-1 Flag vector" cross_compare_vec(dsl, dsr, flag, conditions, onleft, onright, l_len, r_len, threads)
  #cross_compare(dsl,dsr,flag,conditions,onleft,onright,threads)
  #println(flag)

  @timeit "Step-2 New ds" newds = generate_newds_v5(flag, dsl, dsr, l_len, r_len, right_cols)

  print_timer()
  newds #flag, nds
end

function cross_compare_vec(dsl, dsr,
  flag, conditions, onleft, onright, l_len, r_len, threads)

  oncols_left = onleft
  oncols_right = onright

  for i in eachindex(conditions)  #1:length(conditions)  # Each conditions 每个条件
    fun = conditions[i]

    IMD.@_threadsfor threads for j in 1:l_len  # each row in dsl
      cur_index = (j - 1) * l_len
      _op_for_dsrcol(flag, fun, cur_index,
       IMD._columns(dsl)[onleft[i]][j], IMD._columns(dsr)[onright[i]],
        r_len)
    end

  end

end

function _op_for_dsrcol(flag, fun, cur_index, x, r_col, r_len)
  for k in 1:r_len
    flag[cur_index+k] == 0 && continue
    #println(IMD._columns(dsl)[oncols_left[i]][j],",",cur_index,",",IMD._columns(dsr)[oncols_right[i]][k])
    flag[cur_index+k] &= fun(x, r_col[k])
  end
end

function generate_newds_v5(flag, dsl, dsr, l_len, r_len, right_cols, threads=true)
  T = Int32

  ## new left
  ### step-1
  @timeit "2-1 left row count" begin
    dsl_count = Vector{T}(undef, nrow(dsl))  # left每一行对应几个右边

    find_count_for_left(flag, dsl_count, l_len, r_len)

    new_ends = cumsum(dsl_count)  # 累计和
    total_length = new_ends[end]
  end

  ### step-2
  @timeit "2-2 left row _res" begin
    res = []  # 固定尺寸不用push？
    for j in 1:length(IMD.index(dsl))  # left 的每一列
      @timeit "2.1 init _res" _res = IMD.allocatecol(IMD._columns(dsl)[j], total_length, addmissing=false)  # 

      @timeit "2.2 fill _res" fill_left_res(_res, IMD._columns(dsl)[j], dsl_count, new_ends, threads)

      @timeit "2.3 push _res" push!(res, _res)
    end
  end

  ### step-3
  @timeit "2-3 newds dsl" begin
    if dsl isa SubDataset
      newds = Dataset(res, copy(IMD.index(dsl)), copycols=false)
    else
      newds = Dataset(res, IMD.Index(copy(IMD.index(dsl).lookup), copy(IMD.index(dsl).names), copy(IMD.index(dsl).format)), copycols=false)
    end
  end

  ## new right
  #println("Cerating right")
  @timeit "2-4 findall dsr idx" begin
    ### step-4
    @timeit "4.1 findall func" dsr_idx = findall(isone, flag)  # 全局索引，要去做出局部索引
    for i in 1:l_len
      dsl_count[i] == 0 && continue

      i == 1 ? lo = 1 : lo = new_ends[i-1] + 1
      hi = new_ends[i]

      dsr_idx[lo:hi] .-= (i - 1) * r_len
      #for k in lo:hi
      #    all[k] -= (i-1)*r_len
      #end
    end
  end

  @timeit "2-5 newds dsr" begin
    ### step-5
    #println("Nesting")
    for j in eachindex(right_cols) #1:length(right_cols)   # right 的每一列
      @timeit "5.1 init _res" _res = IMD.allocatecol(IMD._columns(dsr)[right_cols[j]], total_length, addmissing=false)  # 空的dsr

      @timeit "5.2 fill _res" fill_right_res(_res, IMD._columns(dsr)[right_cols[j]], dsr_idx, threads)

      #println(_res)
      push!(IMD._columns(newds), _res)
      new_var_name = IMD.make_unique([IMD._names(dsl); IMD._names(dsr)[right_cols[j]]], makeunique=true)[end]
      push!(IMD.index(newds), new_var_name)
      setformat!(newds, IMD.index(newds)[new_var_name], getformat(dsr, IMD._names(dsr)[right_cols[j]]))
    end
  end

  newds
end

function find_count_for_left(flag, dsl_count, l_len, r_len)
  for i in 1:l_len
    lo = 1 + (i - 1) * r_len
    hi = lo + r_len - 1
    #push!(dsl_count,count(==(true),flag[lo:hi]))
    @inbounds dsl_count[i] = count(view(flag, lo:hi))
  end
end

function fill_left_res(_res, l_col, dsl_count, new_ends, threads)
  # x = IMD._columns(dsl)[j] # 这一步可以隔离
  # 左侧填充
  IMD.@_threadsfor threads for i in eachindex(l_col)# 1:length(l_col)
    dsl_count[i] == 0 && continue
    i == 1 ? lo = 1 : lo = new_ends[i-1] + 1
    hi = new_ends[i]
    IMD._fill_val_join!(_res, lo:hi, l_col[i])
  end
end

function fill_right_res(_res, r_col, dsr_idx, threads)
  IMD.@_threadsfor threads for i in eachindex(dsr_idx)#1:length(dsr_idx) # 1:length(l_len)  # left 的每一列
    _res[i] = r_col[dsr_idx[i]]
  end
end



fill_right_res (generic function with 1 method)

In [45]:
function fun1(x,y) 
  x <= y
end

fun1 (generic function with 1 method)

In [52]:
nds = my_cartesianjoin_v5(dsl,dsr,
  on = [:xid=>:yid, :x2=>:y2=>fun1]); # :x1=>:y1=>isless, 
nds

[0m[1m ────────────────────────────────────────────────────────────────────────────────[22m
[0m[1m                               [22m         Time                    Allocations      
                               ───────────────────────   ────────────────────────
       Tot / % measured:           2.87ms /  99.5%           68.1KiB /  96.6%    

 Section               ncalls     time    %tot     avg     alloc    %tot      avg
 ────────────────────────────────────────────────────────────────────────────────
 Step-2 New ds              1   2.37ms   83.0%  2.37ms   55.8KiB   84.8%  55.8KiB
   2-2 left row _res        1   1.26ms   44.3%  1.26ms   21.5KiB   32.7%  21.5KiB
     2.2 fill _res          5   1.05ms   36.7%   210μs   18.4KiB   28.0%  3.69KiB
     2.1 init _res          5    185μs    6.5%  37.0μs      784B    1.2%     157B
     2.3 push _res          5   3.88μs    0.1%   777ns     80.0B    0.1%    16.0B
   2-5 newds dsr            1   1.04ms   36.3%  1.04ms   27.0KiB   41.0

Unnamed: 0_level_0,xid,x1,x2,x3,x4,y1,y2,y3,y4
Unnamed: 0_level_1,Int64?,Int64?,Float64?,Date?,String?,Int64?,Int64?,Date?,String?
Unnamed: 0_level_2,identity,identity,identity,identity,identity,identity,identity,identity,identity
1,111,1,-1.2,2019-10-03,abcd,3,0,2019-10-01,abc
2,111,1,-1.2,2019-10-03,abcd,3,0,2019-10-01,abcd
3,222,2,-3.0,2019-09-30,efgh,3,-3,2019-09-30,efg
4,444,4,-3.5,2019-10-03,mnop,3,1,2019-10-05,mnop


In [48]:
nds = my_cartesianjoin_v5(dsl_big,dsr_big,
  on = [:x1 => :y1, :x2 => :y2 => fun1],threads=true);

[0m[1m ────────────────────────────────────────────────────────────────────────────────[22m
[0m[1m                               [22m         Time                    Allocations      
                               ───────────────────────   ────────────────────────
       Tot / % measured:           92.6ms / 100.0%           5.88MiB / 100.0%    

 Section               ncalls     time    %tot     avg     alloc    %tot      avg
 ────────────────────────────────────────────────────────────────────────────────
 Step-2 New ds              1   62.0ms   67.0%  62.0ms   4.26MiB   72.4%  4.26MiB
   2-5 newds dsr            1   57.2ms   61.8%  57.2ms   3.74MiB   63.6%  3.74MiB
   2-2 left row _res        1   2.89ms    3.1%  2.89ms    252KiB    4.2%   252KiB
     2.2 fill _res          5   2.59ms    2.8%   517μs   18.5KiB    0.3%  3.71KiB
     2.1 init _res          5    271μs    0.3%  54.2μs    231KiB    3.8%  46.2KiB
     2.3 push _res          5   4.19μs    0.0%   837ns     80.0B    0.0

In [53]:
nds = my_cartesianjoin_v5(dsl_big2,dsr_big2,
  on = [:x1 => :y1, :x2 => :y2 => fun1],threads=true);

[0m[1m ────────────────────────────────────────────────────────────────────────────────[22m
[0m[1m                               [22m         Time                    Allocations      
                               ───────────────────────   ────────────────────────
       Tot / % measured:            451ms / 100.0%           65.8MiB / 100.0%    

 Section               ncalls     time    %tot     avg     alloc    %tot      avg
 ────────────────────────────────────────────────────────────────────────────────
 Step-2 New ds              1    336ms   74.4%   336ms   63.4MiB   96.4%  63.4MiB
   2-1 left row count       1    182ms   40.4%   182ms    117KiB    0.2%   117KiB
   2-5 newds dsr            1   88.5ms   19.6%  88.5ms   21.0MiB   32.0%  21.0MiB
     5.2 fill _res          4   87.3ms   19.4%  21.8ms   3.55MiB    5.4%   908KiB
     5.1 init _res          4    852μs    0.2%   213μs   17.5MiB   26.6%  4.37MiB
   2-4 findall dsr idx      1   61.5ms   13.6%  61.5ms   20.4MiB   31.0

In [74]:
@time nds = my_cartesianjoin_v5(dsl_big2,dsr_big2,
  on = [:x1 => :y1, :x2 => :y2 => isless, :x3 => :y3 => fun1],threads=true);
nrow(nds)

[0m[1m ────────────────────────────────────────────────────────────────────────────────[22m
[0m[1m                               [22m         Time                    Allocations      
                               ───────────────────────   ────────────────────────
       Tot / % measured:            274ms / 100.0%           37.4MiB / 100.0%    

 Section               ncalls     time    %tot     avg     alloc    %tot      avg
 ────────────────────────────────────────────────────────────────────────────────
 Step-2 New ds              1    158ms   57.6%   158ms   36.1MiB   96.4%  36.1MiB
   2-1 left row count       1    124ms   45.2%   124ms    117KiB    0.3%   117KiB
   2-4 findall dsr idx      1   25.5ms    9.3%  25.5ms   16.4MiB   43.7%  16.4MiB
     4.1 findall func       1   24.2ms    8.8%  24.2ms   13.9MiB   37.0%  13.9MiB
   2-2 left row _res        1   4.53ms    1.7%  4.53ms   10.9MiB   29.0%  10.9MiB
     2.2 fill _res          5   3.57ms    1.3%   715μs   18.5KiB    0.0

252920

In [69]:
@time  nds2 = innerjoin(dsl_big2,dsr_big2,
        on = [:x1 => :y1, :x2 => :y2, :x3 => :y3]);
nrow(nds2)

  0.006194 seconds (1.69 k allocations: 683.359 KiB)


103