# Improve craco.prepare.freqavg speed

We get data from ASKAP correlator "cards". Each card contains 6 FPGAs. FPGAs send data with ROCE protocol, which means each FPGA sends data independently to 1 numpy buffer per FPGA. 

The numpy buffer is laid out in an unusual way

Each FPGA sends all 36 beams  and 4 channels. The 4 channels are *not* contiguous.

We want to average the correct channels from each for the 6 fpgas to produce 4 output channels per card.

(In future we'd might like to do some dedispersion here too, but that's another story).

FPGAs produces int16 numbers. Ideally we'd average in a larger number of bits (float32 or int32) and then rescale back to int16, as we have limited network bandwidth for hte next beam (an MPI allToall).

The frequency averaging step must complete in much less than 110 milliseconds. You can use up to 4 cores.

My finding is that numpy is OK at doing the frequency averaging, but the rescaling is hopelessly slow.

Cache misses are probably the cause of poor performance.  Re-ordering the order of summation would likely help.

Pavan: I'd like to make this optimised. It's currently too slow, especially the rescaling.

I'd prefer not to use FORTRAN as no-one else knows how to maintain it.

We have NUMBA running on the cluster, so don't be afraid to try that. I'm not concerned about packaging.

You can try pythran or other packages.

Something that let's you easily use multiple cores is fine. I strongly suspect a lot can be gained with careful use of the cache.



In [1]:
from craco.prepare import *

In [2]:
import craco.prepare
craco.prepare?

In [3]:
# The input data always contains 36 beams (reasons)
# The output data will only be a subset (usually 20)
# We specifiy a beam mask which contains 1 where if we want that beam
# 0xfffff is the first 20 beams

beam_mask = 0xfffff
nbeams_out = 0

# count number of ones in the beam mask
for b in range(36):
    if (beam_mask >> b) & 1 == 1:
        nbeams_out += 1

nprocbeams = nbeams_out 
ncards_total = 6*12 #total numbe of cards we need to receive from 20 MPI processes
ncards = ncards_total // nprocbeams # number of cards *this* process is receivng data from
nfpga = 6*ncards
data = [] # As we receive blocks of data over ROCE, we don't get a congiguous numpy array, but rather a list
nt = 64 # nuber of samples per beamformer frame
nbl = 435 # 30 antennas
tframe = 0.110 # seconds = time it takes to transmit nt samples. WE NEED TO COMPLETE THE PROCESSING IN MUCH LESS THAN THIS!
dshape = (nbeams*ncoarse_channels,  nt, nbl, 2)
print(f'About to make {nfpga} times {dshape} data')
for f in range(nfpga):
    data.append(np.arange(np.prod(dshape), dtype=np.int16).reshape(dshape))

nel = np.prod(dshape)
nbytes = nel*2


print(f'Mask 0x{beam_mask:x} has {nbeams_out} beams')

print(f'Total Mbytes={nbytes*nfpga/1e6} Requires GFlops={nel*nfpga*(2)/1e9/tframe}') # complx multiply plus a couple of adds


About to make 18 times (144, 64, 435, 2) data
Mask 0xfffff has 20 beams
Total Mbytes=288.64512 Requires GFlops=2.6240465454545454


In [4]:
output_nchan = nfpga*ncoarse_channels
input_nchan = nfpga*ncoarse_channels*nfine_channels

# Example channel map. A real one will be more complicated
channel_map = np.arange((nfpga*ncoarse_channels), dtype=np.int32).reshape(nfpga,ncoarse_channels)


In [5]:
list(coarse_beam_gen(beam_mask))

[(0, 0, 0, 0),
 (0, 1, 1, 1),
 (0, 2, 2, 2),
 (0, 3, 3, 3),
 (0, 4, 4, 4),
 (0, 5, 5, 5),
 (0, 6, 6, 6),
 (0, 7, 7, 7),
 (0, 8, 8, 8),
 (0, 9, 9, 9),
 (0, 10, 10, 10),
 (0, 11, 11, 11),
 (0, 12, 12, 12),
 (0, 13, 13, 13),
 (0, 14, 14, 14),
 (0, 15, 15, 15),
 (0, 16, 16, 16),
 (0, 17, 17, 17),
 (0, 18, 18, 18),
 (0, 19, 19, 19),
 (1, 0, 0, 32),
 (1, 1, 1, 33),
 (1, 2, 2, 34),
 (1, 3, 3, 35),
 (1, 4, 4, 36),
 (1, 5, 5, 37),
 (1, 6, 6, 38),
 (1, 7, 7, 39),
 (1, 8, 8, 40),
 (1, 9, 9, 41),
 (1, 10, 10, 42),
 (1, 11, 11, 43),
 (1, 12, 12, 44),
 (1, 13, 13, 45),
 (1, 14, 14, 46),
 (1, 15, 15, 47),
 (1, 16, 16, 48),
 (1, 17, 17, 49),
 (1, 18, 18, 50),
 (1, 19, 19, 51),
 (2, 0, 0, 64),
 (2, 1, 1, 65),
 (2, 2, 2, 66),
 (2, 3, 3, 67),
 (2, 4, 4, 68),
 (2, 5, 5, 69),
 (2, 6, 6, 70),
 (2, 7, 7, 71),
 (2, 8, 8, 72),
 (2, 9, 9, 73),
 (2, 10, 10, 74),
 (2, 11, 11, 75),
 (2, 12, 12, 76),
 (2, 13, 13, 77),
 (2, 14, 14, 78),
 (2, 15, 15, 79),
 (2, 16, 16, 80),
 (2, 17, 17, 81),
 (2, 18, 18, 82),
 (2, 19,

In [6]:
freqavg?

In [7]:
accum = np.zeros((nbeams_out, output_nchan, nt, nbl, 2), dtype=np.int32)
output_data = np.zeros((nbeams_out, output_nchan, nt, nbl, 2), dtype=np.int16)
scale=6

# freqavg seems fast enough, if we accmulate into np.int32.
# on my setup it takes 79ms for 20 beams. It needs to be < 110 ms.
%timeit _ = freqavg(data, channel_map, accum, beam_mask=beam_mask)

84 ms ± 5.71 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [8]:
# Rescaling takes a *MUCH* longer!

%timeit rescale(accum, output_data, scale)

501 ms ± 9.43 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [9]:
# Even resetting to zero takesa  along time
%timeit accum[:] = 0

33.4 ms ± 1.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [10]:
nbytes = accum.size*accum.itemsize
print(f'Output size is {nbytes/1e6} MByte Achieved data rate is roughtly {nbytes/0.0334/1e9:.2f} GByte/sec')

Output size is 320.7168 MByte Achieved data rate is roughtly 9.60 GByte/sec


In [11]:
# Must allocating and zering the array doesn't take much longer
%timeit np.zeros_like(accum)

79.9 ms ± 238 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [12]:
coarse_beam_gen?

In [13]:
# play with coarse_beam_gen
list(coarse_beam_gen(0x1))

[(0, 0, 0, 0), (1, 0, 0, 32), (2, 0, 0, 64), (3, 0, 0, 96)]

In [14]:
list(coarse_beam_gen(0x3))

[(0, 0, 0, 0),
 (0, 1, 1, 1),
 (1, 0, 0, 32),
 (1, 1, 1, 33),
 (2, 0, 0, 64),
 (2, 1, 1, 65),
 (3, 0, 0, 96),
 (3, 1, 1, 97)]

In [15]:
list(coarse_beam_gen(0x180000000))

[(0, 0, 31, 31),
 (1, 0, 31, 63),
 (2, 0, 31, 95),
 (3, 0, 31, 127),
 (0, 1, 32, 128),
 (1, 1, 32, 132),
 (2, 1, 32, 136),
 (3, 1, 32, 140)]

In [16]:
from numba import njit, config, threading_layer
import numba
#config.THREADING_LAYER = 'threadsafe'
@njit(parallel=True, fastmath=True)
def freqavg_numba(fpga_data: list, channel_map: np.ndarray, output_data: np.ndarray, beam_mask:int =0xfffffffff):
    '''
    Frequency averaging and re-ordering of the data directly from the FPGAs into output_data that can
    be corner-turned over MPI. Each card comprieses 6 FPGAS. 
    
    @param fpga_data list of nump arrays from the fpgas. length=multiple of 6
    @param channel_map numpy array shape[nfpga, ncoarse_channels] that maps the channel from the given FPGA into the 
    coarse channel index the output
    @output_data suitable for transposing shape = [nbeams_out, nfpga*ncoarse_channels, nt, nbl, 2]
    @block_average averaged data

    '''

    nfpga = len(fpga_data)
    nchan_input = nfpga*ncoarse_channels*nfine_channels
    nchan_output = nfpga*ncoarse_channels
    
    inshape = fpga_data[0].shape
    nctimesnb, nt, nbl, expect2 = inshape
    nbeams_out = output_data.shape[0]


    for ifpga, indata in enumerate(fpga_data):
        for coarse_channel, outbeam, inbeam, cbslot in coarse_beam_gen(beam_mask):
            outchan = channel_map[ifpga, coarse_channel]
            for ibl in range(output_data.shape[2]):
                for t in range(output_data.shape[3]):
                    output_data[outbeam, outchan, ibl, t,:] += indata[cbslot, ibl,t, :]

    return output_data

In [17]:
#freqavg_numba(data, channel_map, accum, beam_mask=beam_mask)
#print("Threading layer chosen: %s" % threading_layer())
#numba.set_num_threads(4)
#%timeit freqavg_numba(data, channel_map, accum, beam_mask=beam_mask)

In [18]:
%load_ext cython

In [19]:
%%cython --annotate

a: cython.int = 0
for i in range(10):
    a += i
print(a)


45


In [20]:
inarray = np.array(data)
inarray.shape

(18, 144, 64, 435, 2)

In [21]:
accum.shape

(20, 72, 64, 435, 2)

In [22]:
output_data.shape

(20, 72, 64, 435, 2)

In [23]:
inflat = np.zeros((inarray.shape[0], inarray.shape[1], int(np.prod(inarray.shape[2:]))), dtype=np.int16)
outflat = np.zeros((output_data.shape[0], output_data.shape[1], int(np.prod(output_data.shape[2:]))), dtype=np.int16)
print(f'inar {inarray.shape} out {output_data.shape}')

inar (18, 144, 64, 435, 2) out (20, 72, 64, 435, 2)


In [32]:
%%cython --annotate
cimport numpy as np
np.import_array()
import cython


cdef int ncoarse_channels  =4
cdef int nfine_channels =6
cdef int nbeams =36

def coarse_beam_gen(beam_mask=0xfffffffff):   
    slot = 0
    for c in range(4):
        outbeam = 0
        for b in range(32):
            if (beam_mask >> b) & 0x1 == 1:
                yield (c, outbeam, b, slot)
                outbeam += 1

            slot += 1

    end_outbeam = outbeam

    for c in range(4):
        outbeam = end_outbeam
        for b in range(32, 36):
            if (beam_mask >> b) & 0x1 == 1:
                yield (c, outbeam, b, slot)
                outbeam += 1

            slot += 1
            
@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False)  # turn off negative index wrapping for entire function
def freqavg_cython(np.ndarray[np.int16_t, ndim=3] fpga_data, 
                   np.ndarray[np.int32_t, ndim=2] channel_map, 
                   np.ndarray[np.int16_t, ndim=3] output_data, 
                   int beam_mask):
    cdef int nfpga = fpga_data.shape[0]
    cdef int nchan_input = nfpga*ncoarse_channels*nfine_channels
    cdef int nchan_output = nfpga*ncoarse_channels
    
    
    cdef int beams_out = output_data.shape[0]
    cdef int nrun = output_data.shape[2]

    cdef int coarse_channel
    cdef int outbeam
    cdef int inbeam
    cdef int cbslot
    cdef int ifpga
    cdef int i
    cdef int outchan
    for ifpga in range(nfpga):
        for coarse_channel, outbeam, inbeam, cbslot in coarse_beam_gen(beam_mask):
            outchan = channel_map[ifpga, coarse_channel]
            for i in range(nrun):                
                output_data[outbeam, outchan, i] += fpga_data[ifpga, cbslot, i]

    return output_data


Error compiling Cython file:
------------------------------------------------------------
...
    cdef int inbeam
    cdef int cbslot
    cdef int ifpga
    cdef int i
    cdef int outchan
    for ifpga in prange(nfpga):
                ^
------------------------------------------------------------

/home/ban115/.cache/ipython/cython/_cython_magic_d339e2dc0530ea73f7374e5a31e0ee93.pyx:53:17: undeclared name not builtin: prange


In [33]:
%timeit _= freqavg_cython(inflat, channel_map, outflat, beam_mask)

59.9 ms ± 590 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [31]:
output_data

array([[[[[     0,      1],
          [     2,      4],
          [     5,      6],
          ...,
          [  1152,   1153],
          [  1154,   1156],
          [  1157,   1158]],

         [[  1160,   1161],
          [  1162,   1164],
          [  1165,   1166],
          ...,
          [  2312,   2313],
          [  2314,   2316],
          [  2317,   2318]],

         [[  2320,   2321],
          [  2322,   2324],
          [  2325,   2326],
          ...,
          [  3472,   3473],
          [  3474,   3476],
          [  3477,   3478]],

         ...,

         [[-16622, -16620],
          [-16619, -16618],
          [-16616, -16615],
          ...,
          [-15470, -15468],
          [-15467, -15466],
          [-15464, -15463]],

         [[-15462, -15460],
          [-15459, -15458],
          [-15456, -15455],
          ...,
          [-14310, -14308],
          [-14307, -14306],
          [-14304, -14303]],

         [[-14302, -14300],
          [-14299, -14298],
    