# More efficient broadcast of arrays with memmap

Data movement is where IPython's naïve model suffers the most.
But knowing about your cluster lets you make smarter decisions about data movement than a simple `rc[:].push` movement

In [3]:
import socket
import os, sys, re

import numpy as np

import ipyparallel as parallel

In [4]:
rc = parallel.Client()
eall = rc[:]

In [5]:
engine_hosts = eall.apply_async(socket.gethostname).get_dict()
engine_hosts

{0: 'laic-ws1',
 1: 'laic-ws1',
 2: 'laic-ws1',
 3: 'laic-ws1',
 4: 'laic-ws1',
 5: 'laic-ws1',
 6: 'laic-ws1',
 7: 'laic-ws1',
 8: 'laic-ws1',
 9: 'laic-ws1',
 10: 'laic-ws1',
 11: 'laic-ws1',
 12: 'laic-ws1',
 13: 'laic-ws1',
 14: 'laic-ws1',
 15: 'laic-ws1',
 16: 'laic-ws1',
 17: 'laic-ws1',
 18: 'laic-ws1',
 19: 'laic-ws1',
 20: 'laic-ws1'}

In [6]:
host_engines = {}

for eid, host in engine_hosts.items():
    if host not in host_engines:
        host_engines[host] = []
    host_engines[host].append(eid)

host_engines

{'laic-ws1': [0,
  1,
  2,
  3,
  4,
  5,
  6,
  7,
  8,
  9,
  10,
  11,
  12,
  13,
  14,
  15,
  16,
  17,
  18,
  19,
  20]}

In [7]:
data = np.asarray(np.memmap('/tmp_data/pcie.bin', np.int32).reshape(-1, 32))

In [8]:
data.shape

(30150587, 32)

In [9]:
data

array([[-117876, -139612, -154660, ..., -163856,  -97812, -112860],
       [-210310, -257566, -277806, ..., -288410, -164374, -195482],
       [-310386, -373090, -401378, ..., -417504, -237366, -268382],
       ..., 
       [-158754,  -14456,  -69958, ..., -168930, -450698, -207364],
       [-103492,    1918,  -50450, ...,  131322, -238316,  -88426],
       [ -46712,   27374,     978, ...,  540872,  108922,  145756]], dtype=int32)

In [10]:
fp = np.memmap('/mnt/ramdisk/temp.bin', dtype='int32', mode='w+', shape=data.shape)

In [11]:
fp[:] = data[:]

In [12]:
fp.flush()

In [13]:
rc[:].scatter('col',np.arange(32))

<AsyncResult: scatter>

In [14]:
%px print col

[stdout:0] [0]
[stdout:1] [1]
[stdout:2] [2]
[stdout:3] [3]
[stdout:4] [4]
[stdout:5] [5]
[stdout:6] [6]
[stdout:7] [7]
[stdout:8] [8]
[stdout:9] [9]
[stdout:10] [10]
[stdout:11] [11]
[stdout:12] [12]
[stdout:13] [13]
[stdout:14] [14]
[stdout:15] [15]
[stdout:16] [16]
[stdout:17] [17]
[stdout:18] [18]
[stdout:19] [19]
[stdout:20] [20]
[stdout:21] [21]
[stdout:22] [22]
[stdout:23] [23]
[stdout:24] [24]
[stdout:25] [25]
[stdout:26] [26]
[stdout:27] [27]
[stdout:28] [28]
[stdout:29] [29]
[stdout:30] [30]
[stdout:31] [31]


In [27]:
%%px 
import numpy as np
npmm = np.memmap('/mnt/ramdisk/temp.bin', dtype='int32').reshape(-1,32)
data = np.asarray(npmm[:,col])

In [None]:
%%px
from scipy.signal import resample
data = resample(data, len(data)*2)

In [15]:
def array_to_file(A):
    """write an array to a temporary file, return its filename"""
    import tempfile
    with tempfile.NamedTemporaryFile(suffix='.np', delete=False) as tf:
        np.save(tf, data)
        data_path = tf.name
    return data_path

In [16]:
@parallel.interactive
def load_memmap(name, path, mode='r+'):
    """load a file on disk into the interactive namespace as a memmapped array"""
    globals()[name] = np.memmap(path, mode=mode)

In [17]:
def bcast_memmap(data, name, client, host_engines):
    """broadcast a numpy array efficiently
    
    - sends data to each remote host only once
    - loads with memmap everywhere
    """

    # actually push the data, just once to each machine

    local_filename = None
    filenames_ars = {}
    for host, engines in host_engines.items():
        h0 = engines[0]
        if host == socket.gethostname():
            # Don't push at all to local engines
            local_filename = array_to_file(data)
        else:
            filenames_ars[host] = rc[h0].apply_async(array_to_file, data)

    # load the data on all engines into a memmapped array
    msg_ids = []
    for host, engines in host_engines.items():
        if host == socket.gethostname():
            filename = local_filename
        else:
            filename = filenames_ars[host].get()
        ar = rc[engines].apply_async(load_memmap, name, filename)
        msg_ids.extend(ar.msg_ids)
    
    return parallel.AsyncResult(client, msg_ids=msg_ids)

In [20]:
%%time
ar = bcast_memmap(data, 'data', rc, host_engines)
ar.wait_interactive()

TypeError: __init__() got an unexpected keyword argument 'msg_ids'

In [21]:
%px np.linalg.norm(data, 2)

[0;31mOut[0:2]: [0m95689.659153954562

[0;31mOut[1:2]: [0m95689.659153954562

[0;31mOut[2:2]: [0m95689.659153954562

[0;31mOut[3:2]: [0m95689.659153954562

[0;31mOut[4:2]: [0m95689.659153954562

[0;31mOut[5:2]: [0m95689.659153954562

[0;31mOut[6:2]: [0m95689.659153954562

[0;31mOut[7:2]: [0m95689.659153954562

[0;31mOut[8:2]: [0m95689.659153954562

[0;31mOut[9:2]: [0m95689.659153954562

[0;31mOut[10:2]: [0m95689.659153954562

[0;31mOut[11:2]: [0m95689.659153954562

[0;31mOut[12:2]: [0m95689.659153954562

[0;31mOut[13:2]: [0m95689.659153954562

[0;31mOut[14:2]: [0m95689.659153954562

[0;31mOut[15:2]: [0m95689.659153954562

[0;31mOut[16:2]: [0m95689.659153954562

[0;31mOut[17:2]: [0m95689.659153954562

[0;31mOut[18:2]: [0m95689.659153954562

[0;31mOut[19:2]: [0m95689.659153954562

[0;31mOut[20:2]: [0m95689.659153954562

[0;31mOut[21:2]: [0m95689.659153954562

[0;31mOut[22:2]: [0m95689.659153954562

[0;31mOut[23:2]: [0m95689.659153954562

In [23]:
eall.scatter('ind', np.arange(48))

<AsyncResult: scatter>

In [32]:
%%px --target : --noblock
print ind

<AsyncResult: execute>

In [35]:
%pxresult

[stdout:0] [0 1]
[stdout:1] [2 3]
[stdout:2] [4 5]
[stdout:3] [6 7]
[stdout:4] [8 9]
[stdout:5] [10 11]
[stdout:6] [12 13]
[stdout:7] [14 15]
[stdout:8] [16 17]
[stdout:9] [18 19]
[stdout:10] [20 21]
[stdout:11] [22 23]
[stdout:12] [24 25]
[stdout:13] [26 27]
[stdout:14] [28 29]
[stdout:15] [30 31]
[stdout:16] [32 33]
[stdout:17] [34 35]
[stdout:18] [36 37]
[stdout:19] [38 39]
[stdout:20] [40 41]
[stdout:21] [42 43]
[stdout:22] [44 45]
[stdout:23] [46 47]
