In [1]:
%reload_ext Cython

In [90]:
%%cython -f
# cython: profile=True
# cython: linetrace=True
# cython: binding=True
# distutils: define_macros=CYTHON_TRACE_NOGIL=1

cimport cython
from cython.parallel import parallel, prange
from libc.stdlib cimport rand, RAND_MAX

import time
import cupy as cp

from random import random
from cpython cimport array
import numpy as np
cimport numpy as np

ctypedef np.int_t DTYPE_t
cdef bint DEBUG


## DATA GENERATION
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef generateData(int i, int j):
    
    cdef long startTime = time.time() 
    
    cdef double[:,:] output = np.zeros((i, j))
    cdef int i_n, j_n
    
    with nogil, parallel():
        for i_n in prange(i):
            for j_n in prange(j):
                output[i_n, j_n] = j_n + 1
    
    for row in output:
        np.random.shuffle(row)

    print("Data generation took", 1000*(time.time() - startTime), "ms")
    return cp.array(output, dtype=np.int)

        
cdef class AbstractSMA:
    
    cdef public:
        
        # Timestep (t), size (a, b)
        int t, a, b
        
        # Preferences (PA, PB), Matches (MA, MB), History (HA, HB)
        PA, PB
        MA, MB
        HA, HB
        
    def __init__(self, PA, PB):
        
        self.a, self.b = PA.shape[0], PB.shape[1]
        self.PA, self.PB = PA, PB
        self.MA = self.MB = self.HA = self.HB = cp.zeros((self.a, self.b), dtype=np.int)
        
        if(DEBUG): print("\nPA (*)\n", self.PA, "\n\nPB\n", self.PB, "\n")
    
    @cython.boundscheck(False)
    @cython.wraparound(False)
    cdef AbstractSMA nextState(self):
        
        cdef:
            # proposalMask: where pref is selected rank
            # singleMask: where row is all zeros
            singleMask = cp.all(self.MA == 0, axis = 1, keepdims = True)
#             proposalMask = self.PA >= max(self.a - self.t, 1)
            proposalMask = self.PA >= 1
            feasibleProposalMask = proposalMask * singleMask
            
            # proposerScores: reviewers' ratings of proposers
            # reviewerScores: proposers' ratings of reviewers
            AScores = self.PB * feasibleProposalMask
            BScores = self.PA * feasibleProposalMask
            
        # track past proposals
        self.HA = self.HA | feasibleProposalMask
            
        # start @ t = 0
        if DEBUG:
            print("------------\nt =", self.t, "\n")
            print("Single Mask\n", singleMask.astype(np.int), "\n\n")
            print("Proposal Mask\n", proposalMask.astype(np.int), "\n\n")
            print("Feas. Proposal Mask\n", feasibleProposalMask.astype(np.int), "\n\n")
            print("MA (pre-mutate)\n", self.MA, "\n\n")
            print("MB (pre-mutate)\n", self.MB, "\n\n")
            print("A's scores with B\n", AScores, "\n\n")
            print("B's scores with A\n", BScores, "\n\n")
            print("History A\n", self.HA, "\n\n")
        
        # update a match if proposal score is higher than existing match
        self.MB = cp.maximum(AScores, self.MB)
        
        # eliminate non-max matches from matrix (only keep highest)
#         cp.place(self.MB, self.MB == cp.amax())
        self.MB *= self.MB == cp.amax(self.MB, axis = 1, keepdims = True)
    
        # A's matches = A preferences masked by mask of B
        self.MA = self.PA * (self.MB != 0)
        
        if DEBUG:
            print("final MA\n", self.MA, "\n\n")
            print("final MB\n", self.MB, "\n\n")

        self.t += 1
        return self
    
    cdef bint everyoneIsContent(self):
#         return cp.sum(self.HA) == self.HA.size
        return self.t >= self.a
    
    cdef public AbstractSMA run(self):
        while not self.everyoneIsContent():
            self.nextState()
        
        return self

cdef:
    PA_small = cp.array([
        [2, 1, 3],
        [1, 2, 3],
        [1, 3, 2]
    ], dtype=np.int)
    
    # change to size a x b
    PB_small = cp.array([
        [1, 3, 2],
        [2, 2, 1],
        [3, 1, 3]
    ], dtype=np.int)
    
    int N = 4
    PA_big = generateData(N, N)
    PB_big = generateData(N, N)

cpdef test():
    return AbstractSMA(PA_big if BIG else PA_small, PB_big if BIG else PB_small).run()

print("Group A is making all proposals for this test case.")

# DEBUG = True
# BIG = False

DEBUG = True
BIG = True

myTest = test()
print("Done. Finished in", myTest.t, "steps.\n")
print("MA\n", myTest.MA, "\n\n")
print("MB\n", myTest.MB, "\n\n")
print("HA\n", myTest.HA, "\n\n")

Data generation took 310.7447624206543 ms
Data generation took 311.5255832672119 ms
Group A is making all proposals for this test case.

PA (*)
 [[1 4 3 2]
 [4 3 1 2]
 [3 4 1 2]
 [2 1 4 3]] 

PB
 [[4 3 1 2]
 [1 2 3 4]
 [4 2 1 3]
 [2 4 1 3]] 

------------
t = 0 

Single Mask
 [[1]
 [1]
 [1]
 [1]] 


Proposal Mask
 [[1 1 1 1]
 [1 1 1 1]
 [1 1 1 1]
 [1 1 1 1]] 


Feas. Proposal Mask
 [[1 1 1 1]
 [1 1 1 1]
 [1 1 1 1]
 [1 1 1 1]] 


MA (pre-mutate)
 [[0 0 0 0]
 [0 0 0 0]
 [0 0 0 0]
 [0 0 0 0]] 


MB (pre-mutate)
 [[0 0 0 0]
 [0 0 0 0]
 [0 0 0 0]
 [0 0 0 0]] 


A's scores with B
 [[4 3 1 2]
 [1 2 3 4]
 [4 2 1 3]
 [2 4 1 3]] 


B's scores with A
 [[1 4 3 2]
 [4 3 1 2]
 [3 4 1 2]
 [2 1 4 3]] 


History A
 [[1 1 1 1]
 [1 1 1 1]
 [1 1 1 1]
 [1 1 1 1]] 


final MA
 [[1 0 0 0]
 [0 0 0 2]
 [3 0 0 0]
 [0 1 0 0]] 


final MB
 [[4 0 0 0]
 [0 0 0 4]
 [4 0 0 0]
 [0 4 0 0]] 


------------
t = 1 

Single Mask
 [[0]
 [0]
 [0]
 [0]] 


Proposal Mask
 [[1 1 1 1]
 [1 1 1 1]
 [1 1 1 1]
 [1 1 1 1]] 


Feas. P

In [10]:
%timeit -r 3 -n 1 test()

1.02 s ± 43.9 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [119]:
%timeit -r 3 -n 1 test()

1.59 s ± 10.4 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [8]:
%%cython
import cupy as cp

cdef:
    long iterations = 1000000
    long loops = 50, stacks = iterations // loops
    a, b

cpdef naiveMultiplication():
    a = cp.random.rand(44, 44)
    b = cp.random.rand(44, 44)
    
    for i in range(iterations):
        a * b
        
    cp.cuda.Device(0).synchronize()
    
cpdef stackMultiplication():
    a = cp.random.rand(stacks, 44, 44)
    b = cp.random.rand(stacks, 44, 44)
    
    for i in range(loops):
        a * b
        
    cp.cuda.Device(0).synchronize()

UsageError: Cell magic `%%cython` not found.


In [7]:
%timeit -r 1 -n 1 naiveMultiplication()

NameError: name 'naiveMultiplication' is not defined

In [7]:
%timeit -r 1 -n 1 stackMultiplication()

328 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [1]:
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
from dask.array.utils import assert_eq

import dask.array as da
import cupy

add_broadcast_kernel = cupy.RawKernel(
    r'''
    extern "C" __global__
    void add_broadcast_kernel(
        const float* x, const float* y, float* z,
        const int xdim0, const int zdim0)
    {
        int idx0 = blockIdx.x * blockDim.x + threadIdx.x;
        int idx1 = blockIdx.y * blockDim.y + threadIdx.y;
        z[idx1 * zdim0 + idx0] = x[idx1 * xdim0 + idx0] + y[idx0];
    }
    ''',
    'add_broadcast_kernel'
)


def dispatch_add_broadcast(x, y):
    block_size = (32, 32)
    grid_size = (x.shape[1] // block_size[1], x.shape[0] // block_size[0])

    z = cupy.empty(x.shape, x.dtype)

    xdim0 = x.strides[0] // x.strides[1]
    zdim0 = z.strides[0] // z.strides[1]

    add_broadcast_kernel(grid_size, block_size, (x, y, z, xdim0, zdim0))
    return z

cluster = LocalCUDACluster()
client = Client(cluster)
    
x = cupy.arange(4096 * 1024, dtype=cupy.float32).reshape((4096, 1024))
y = cupy.arange(1024, dtype=cupy.float32).reshape(1, 1024)

res_cupy = x + y
res_add_broadcast = dispatch_add_broadcast(x, y)

# assert_eq(res_cupy, res_add_broadcast)

# dx = da.from_array(x, chunks=(1024, 512), asarray=False)
# dy = da.from_array(y, chunks=(1, 512), asarray=False)

# res = da.map_blocks(dispatch_add_broadcast, dx, dy, dtype=dx.dtype)
# res = res.compute()

# assert_eq(res, res_cupy)



In [2]:
dx = da.from_array(x, chunks=(1024, 512), asarray=False)
dy = da.from_array(y, chunks=(1, 512), asarray=False)
res = da.map_blocks(dispatch_add_broadcast, dx, dy, dtype=dx.dtype)
res.compute()

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/usr/local/lib/python3.7/dist-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/usr/local/lib/python3.7/dist-packages/distributed/protocol/serialize.py", line 210, in serialize
    raise TypeError(msg, str(x)[:10000])
  File "cupy/core/core.pyx", line 1366, in cupy.core.core.ndarray.__str__
  File "cupy/core/core.pyx", line 1456, in cupy.core.core.ndarray.get
  File "cupy/core/core.pyx", line 1462, in cupy.core.core.ndarray.get
  File "cupy/cuda/memory.pyx", line 449, in cupy.cuda.memory.MemoryPointer.copy_to_host
  File "cupy/cuda/runtime.pyx", line 378, in cupy.cuda.runtime.memcpy
  File "cupy/cuda/runtime.pyx", line 201, in cupy.cuda.runtime.check_status
cupy.cuda.runtime.CUDARuntimeError: cudaErrorIl

KeyboardInterrupt: 

In [54]:
%%cython
import time
import numpy as np

cdef double[:,:] a = np.zeros((1000,1000))
cdef double[:,:] b = np.zeros((1000,1000))
cdef double[:,:] c = np.empty((1000,1000))

def timeme():
    cdef long i, j
    
    for i in range(1000):
        for j in range(i):
            c[i,j] = a[i,j] + b[i,j]
    
    return c
        
print(timeme())

<MemoryView of 'ndarray' object>


In [61]:
%timeit -n 10 -r 1 timeme()

1.15 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 10 loops each)
