In [12]:
from IPython.display import Image
import contexttimer
import time
import math
from numba import jit
import multiprocessing
import threading
from joblib import Parallel
import logging
import numpy as np

In [2]:
def wait_loop(n):
    """
    Function under test.
    """
    for m in range(n):
        for l in range(m):
            for j in range(l):
                for i in range(j):
                    i=i+4
                    out=math.sqrt(i)
                    out=out**2.
    return out

In [3]:
@jit('float64(int64)', nopython=True, nogil=True)
def wait_loop_nogil(n):
    """
    Function under test.
    """
    for m in range(n):
        for l in range(m):
            for j in range(l):
                for i in range(j):
                    i=i+4
                    out=math.sqrt(i)
                    out=out**2.
    return out

In [4]:
@jit('float64(int64)', nopython=True, nogil=False)
def wait_loop_withgil(n):
    """
    Function under test.
    """
    for m in range(n):
        for l in range(m):
            for j in range(l):
                for i in range(j):
                    i=i+4
                    out=math.sqrt(i)
                    out=out**2.
    return out

In [5]:
nloops=200
with contexttimer.Timer(time.perf_counter) as pure_wall:
    with contexttimer.Timer(time.process_time) as pure_cpu:
        result=wait_loop(nloops)
print('pure python wall time {} and cpu time {}'.format(pure_wall.elapsed,pure_cpu.elapsed))

pure python wall time 31.15257935775645 and cpu time 31.09375


In [6]:
nloops=200
with contexttimer.Timer(time.perf_counter) as numba_wall:
    with contexttimer.Timer(time.process_time) as numba_cpu:
        result=wait_loop_nogil(nloops)
print('numba wall time {} and cpu time {}'.format(numba_wall.elapsed,numba_cpu.elapsed))
print("numba speed-up factor {}".format((pure_wall.elapsed - numba_wall.elapsed)/numba_wall.elapsed))

numba wall time 0.009586053992229893 and cpu time 0.015625
numba speed-up factor 3248.7813368261436


In [7]:
nloops=200
with contexttimer.Timer(time.perf_counter) as numba_wall:
    with contexttimer.Timer(time.process_time) as numba_cpu:
        result=wait_loop_withgil(nloops)
print('numba wall time {} and cpu time {}'.format(numba_wall.elapsed,numba_cpu.elapsed))
print("numba speed-up factor {}".format((pure_wall.elapsed - numba_wall.elapsed)/numba_wall.elapsed))

numba wall time 0.030602775366951107 and cpu time 0.03125
numba speed-up factor 1016.9658212110753


In [8]:
logging.basicConfig(level=logging.DEBUG,
                    format='%(message)s %(threadName)s %(processName)s',
                    )

def find_ids():
    logging.debug('debug logging: ')

In [9]:
njobs=6
nprocs=3
thread_id_jobs =[(find_ids,[],{}) for i in range(nprocs)]
nloops=1250
calc_jobs=[(wait_loop_nogil,[nloops],{}) for i in range(njobs)]
print(calc_jobs)

[(CPUDispatcher(<function wait_loop_nogil at 0x0000025F8B1327B8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x0000025F8B1327B8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x0000025F8B1327B8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x0000025F8B1327B8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x0000025F8B1327B8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x0000025F8B1327B8>), [1250], {})]


In [10]:
with contexttimer.Timer(time.perf_counter) as wall:
    with contexttimer.Timer(time.process_time) as cpu:
        with Parallel(n_jobs=nprocs,backend='threading') as parallel:
            parallel(thread_id_jobs)
            results=parallel(calc_jobs)
        print(results)
print('wall time {} and cpu time {}'.format(wall.elapsed,cpu.elapsed))

debug logging:  Thread-6 MainProcess
debug logging:  Thread-7 MainProcess
debug logging:  Thread-8 MainProcess


[1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0]
wall time 5.509568361227654 and cpu time 16.171875


In [14]:
import numpy as np
import zarr
import time
import datetime
import pytz
from zarr.util import human_readable_size
import dask
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.diagnostics.profile_visualize import visualize

In [15]:
wvel_data = np.random.normal(2000, 1000, size=[8000,7500]).astype(np.float32)
human_readable_size(wvel_data.nbytes)

'228.9M'

In [17]:
a2 = np.zeros([8000,7500],dtype=np.float32)
item='disk2_data'
store = zarr.DirectoryStore(item)
group=zarr.hierarchy.group(store=store,overwrite=True,synchronizer=zarr.ThreadSynchronizer())
the_var='wvel'
out_zarr2=group.zeros(the_var,shape=a2.shape,dtype=a2.dtype,chunks=[2000,7500])
out_zarr2

Array(/wvel, (8000, 7500), float32, chunks=(2000, 7500), order=C)
  nbytes: 228.9M; nbytes_stored: 273; ratio: 879120.9; initialized: 0/4
  compressor: Zlib(level=1)
  store: DirectoryStore; synchronizer: ThreadSynchronizer

In [18]:
da_input = da.from_array(out_zarr2, chunks=out_zarr1.chunks)
da_input

dask.array<array-a..., shape=(8000, 7500), dtype=float32, chunksize=(2000, 7500)>

In [19]:
result=(da_input**2. + da_input**3.).mean(axis=0)
result

dask.array<mean_ag..., shape=(7500,), dtype=float32, chunksize=(7500,)>

In [25]:
from dask.dot import dot_graph
#dot_graph(result.dask)

In [27]:
with Profiler() as prof, ResourceProfiler(dt=0.1) as rprof,\
              CacheProfiler() as cprof:
    result = result.compute()

In [29]:
%matplotlib inline
visualize([prof, rprof,cprof], min_border_top=15, min_border_bottom=15)

Session output file 'profile.html' already exists, will be overwritten. MainThread MainProcess
