# Cython
<img src="../images/cython.jpg" 
     align="right"
     width="20%">
## Build Cython code
**Two stages:**
- use Cython compiler to compile cython source file (```.pyx```) into C code (```.c```)
- use C compiler to compile ```.c``` file into ```.so``` file

**Several ways:**
- setup tools (more flexible, powerful)
- jupyter notebook: Cython compilation interactively (more interactive)

In [None]:
import numpy as np
from numba import njit

#### using setup tools
- write a ```setup.py``` file
- ```python setup.py build_ext --inplace```

In [None]:
def f(x):
    return x**2-x

def integrate_f_py(a, b, N):
    s = 0
    dx = (b-a)/N
    for i in range(N):
        s += f(a+i*dx)
    return s * dx

In [None]:
from integrate import integrate_f_cy

In [None]:
integrate_f_cy(1, 100, 1000)

In [None]:
integrate_f_py(1, 100, 1000)

#### jupyter notebook

In [None]:
del integrate_f_cy

In [None]:
%load_ext Cython

In [None]:
%%cython --annotate
def f_cy(x):
    return x**2-x

def integrate_f_cy(a, b, N):
    s = 0
    dx = (b-a)/N
    for i in range(N):
        s += f_cy(a+i*dx)
    return s * dx

In [None]:
%timeit integrate_f_py(1, 100, 1000)

In [None]:
%timeit integrate_f_cy(1, 100, 1000)

Speed up by **30%** without doing anything: removal of interpreter overhead.

## Static typing:
#### typing variables

In [None]:
%%cython -a
def f_static(double x):
    return x**2-x

def integrate_f_static(double a, double b, int N):
    cdef int i
    cdef double s, dx
    s = 0
    dx = (b-a)/N
    for i in range(N):
        s += f_static(a+i*dx)
    return s * dx

In [None]:
%timeit integrate_f_static(1, 100, 1000)

**5 times speed up!**

#### typing function

In [None]:
%%cython -a
cdef double f_static2(double x):
    return x**2-x

def integrate_f_static2(double a, double b, int N):
    cdef int i
    cdef double s
    s = 0
    dx = (b-a)*N**(-1)
    for i in range(N):
        s += f_static2(a+i*dx)
    return s * dx

In [None]:
%timeit integrate_f_static2(1, 100, 1000)

**200 times speed up!**

- ```cdef``` function can only be called within cython: fast supporting functions
- ```def``` functions can be called in python session: function that you want to import in python
- ```cpdef``` function can be called both within cython (as C functions) and python (python wrapper)

In [None]:
f_static2

#### No need to type everything:
- cython enables automatic type inference during assignmnet
- Unnecessary typing may even slow things down (unnecessary type checks or conversions)
- Must type in performance critical part of the code (such as ```for``` loop: ```for``` loop needs to be white! ) 

#### compare with Numba ```@njit```

In [None]:
@njit
def f_njit(x):
    return x**2-x
@njit
def integrate_f_njit(a,b, N):
    s = 0
    dx = (b-a)/N
    for i in range(N):
        s += f_njit(a+i*dx)
    return s * dx

In [None]:
%timeit integrate_f_njit(1, 100, 1000)

## Using C libaries

- Cython makes calling external C library functions easier! 
- Cython already defines many standard C libarary functions for us!

```python
from libc.math cimport sin
cdef double f(double x):
    return sin(x * x)
```

https://github.com/cython/cython/blob/master/Cython/Includes/libc/math.pxd

## Cython for Numpy user

In [None]:
a=np.random.randn(100,100)
b=np.random.randn(100,100)

In [None]:
def add_numpy(array_1,array_2):
    return array_1+array_2

In [None]:
%timeit add_numpy(a,b)

In [None]:
%%cython -a
def add_cy(array_1,array_2):
    return array_1+array_2

In [None]:
%timeit add_cy(a,b)

In [None]:
def add_py(array_1, array_2):
    x_max = array_1.shape[0]
    y_max = array_1.shape[1]
    result = np.zeros((x_max, y_max), dtype=array_1.dtype)
    for x in range(x_max):
        for y in range(y_max):
            result[x, y] = array_1[x, y]+array_2[x, y]
    return result

In [None]:
%timeit add_py(a,b)

In [None]:
%%cython -a
import numpy as np
def add_cy1(array_1, array_2):
    x_max = array_1.shape[0]
    y_max = array_1.shape[1]
    result = np.zeros((x_max, y_max), dtype=array_1.dtype)
    for x in range(x_max):
        for y in range(y_max):
            result[x, y] = array_1[x, y]+array_2[x, y]
    return result

In [None]:
%timeit add_cy1(a,b)

### Typed memoryview
- **Memoryview:** memoryviews are C structures that can hold a pointer to the data of a NumPy array and all the necessary buffer metadata to provide efficient and safe access: dimensions, strides, item size, item type information, etc… They can be indexed by C integers, thus allowing fast access to the NumPy array data.

```python
cdef int [:] foo         # 1D memoryview
cdef int [:, :] foo      # 2D memoryview
cdef int [:, :, :] foo   # 3D memoryview
...                      # You get the idea.
```

In [None]:
%%cython -a
import numpy as np
cimport numpy as np
def add_cy2(double[:, :] array_1, double[:, :] array_2):
    # Py_ssize_t is the proper C type for Python array indices.
    cdef Py_ssize_t x_max = array_1.shape[0]
    cdef Py_ssize_t y_max = array_1.shape[1]
    result = np.zeros((x_max, y_max))
    cdef double[:, :] result_view = result
    cdef Py_ssize_t x, y
    for x in range(x_max):
        for y in range(y_max):
            result_view[x, y] = array_1[x, y]+array_2[x, y]
    return result

In [None]:
%timeit add_cy2(a,b)

In [None]:
%%cython -a
import numpy as np
cimport cython
@cython.boundscheck(False)  # Deactivate bounds checking
@cython.wraparound(False)   # Deactivate negative indexing.
def add_cy3(double[:, ::1] array_1, double[:, ::1] array_2): #C contiguous memoryview
    cdef Py_ssize_t x_max = array_1.shape[0]
    cdef Py_ssize_t y_max = array_1.shape[1]
    result = np.zeros((x_max, y_max))
    cdef double[:, ::1] result_view = result
    cdef Py_ssize_t x, y
    for x in range(x_max):
        for y in range(y_max):
            result_view[x, y] = array_1[x, y]+array_2[x, y]
    return result

In [None]:
%timeit add_cy3(a,b)

still slower than Numpy

## Using Parallelism: ```prange```

In [None]:
%%cython -a --compile-args=-fopenmp --link-args=-fopenmp
from cython.parallel import prange
import numpy as np
cimport cython
@cython.boundscheck(False)  # Deactivate bounds checking
@cython.wraparound(False)   # Deactivate negative indexing.
def add_cy4(double[:, ::1] array_1, double[:, ::1] array_2):
    cdef Py_ssize_t x_max = array_1.shape[0]
    cdef Py_ssize_t y_max = array_1.shape[1]
    result = np.zeros((x_max, y_max))
    cdef double[:, ::1] result_view = result
    cdef Py_ssize_t x, y
    for x in prange(x_max,nogil=True):
        for y in range(y_max):
            result_view[x, y] = array_1[x, y]+array_2[x, y]
    return result

In [None]:
%timeit add_cy4(a,b)

In [None]:
%timeit add_numpy(a,b)

In [None]:
a=np.random.randn(10000,10000)
b=np.random.randn(10000,10000)

## When should we use Cython?

- definitely not for ```a+b```; most useful for speeding up operations that can't be easily vectorized
- Use Cython for bottlenecks, rather than re-writing everything in Cython
- Numba ```@njit``` is an alternative, but sometimes not feasible, an example: https://docs.scipy.org/doc/scipy/reference/optimize.cython_optimize.html

<img src="../images/dask-horizontal.svg" 
     align="left"
     width="20%">

In [None]:
import xarray as xr
import numpy as np
from numba import njit
import dask
import dask.array as da
import matplotlib.pyplot as plt

#### A flexible library for parallel computing in Python

- Dynamic task scheduling optimized for computation
- “Big Data” collections like dask arrays (extending NumPy interfaces to larger-than-memory or distributed environments). These parallel collections run on top of dynamic task schedulers.

## Dask Array

<img src="../images/dask-array-black-text.svg" 
     align="left"
     width="40%"
     alt="Dask arrays are blocked numpy arrays">

- Dask arrays coordinate many Numpy arrays, arranged into chunks within a grid.
- Dask arrays support most of the NumPy interface
  - Arithmetic and scalar mathematics: ```+, *, exp, log, ...```
  - Reductions along axes: ```sum(), mean(), std(), sum(axis=0), ...```
  - Tensor contractions / dot products / matrix multiply: ```tensordot```
  - Axis reordering / transpose: ```transpose```
  - Slicing: ```x[:100, 500:100:-2]```
  - Fancy indexing along single axes with lists or NumPy arrays: ```x[:, [10, 1, 5]]```
  - Array protocols like ```__array__``` and ```__array_ufunc__```
  - Some linear algebra: ```svd, qr, solve, solve_triangular, lstsq```

In [None]:
wbgt=xr.open_dataset('/scratch/brown/kong97/WBGT/cmip6/ACCESS_CM2/WBGT_2000.nc')
wbgt

In [None]:
wbgt.WBGT.data

In [None]:
wbgt=wbgt.chunk({'time':200})
wbgt

In [None]:
wbgt_mean=wbgt.WBGT.mean('time')
wbgt_mean

In [None]:
%time wbgt_mean=wbgt_mean.load()
wbgt_mean.plot()

#### Dask has two families of task schedulers:
* **Single machine scheduler:** Default scheduler, can only be used on a single machine. If you import Dask, set up a computation, and then call compute, then you will use the single-machine scheduler by default. ***We use single-machine scheduler above by default!***


* **Distributed scheduler:** can run on a single machine or distributed across a cluster, **should be preferred even on a single machine** (offer more diagnostic features). To use the dask.distributed scheduler you must set up a Client

In [None]:
from dask.distributed import Client
client = Client(processes=False)
#client = Client(n_workers=4)
client

- **Threads**: One process, multiple threads; good for numeric code that releases the GIL (like NumPy, Pandas, Scikit-Learn, Numba, …)

- **Processes**: several processes (maybe also multiple threads in one process); good for pure Python objects like strings or JSON-like dictionary data that holds onto the GIL; expensive inter-process communication

## use ```xr.open_mfdataset``` to open multiple files parallelly

In [None]:
wbgt2=xr.open_mfdataset('/scratch/brown/kong97/WBGT/cmip6/ACCESS_CM2/WBGT*.nc',parallel=True)
wbgt2

## Apply customized function to dask arrays chunk by chunk

In [None]:
@njit
def wbgtlim(M):
    return 56.7-11.5*np.log10(M)

@njit
def workability(WBGT,M):
    WBGTlimrest=wbgtlim(117)
    WBGTlim=wbgtlim(M)
    return np.maximum(0,np.minimum(1,(WBGTlimrest-WBGT)/(WBGTlimrest-WBGTlim)))
def labor(WBGT,M):
    lc=xr.apply_ufunc(workability,WBGT-273.15,M,dask="parallelized",output_dtypes=[float])
    lc=lc.rename({'WBGT':'labor_capacity'})
    return lc

In [None]:
lc=labor(wbgt,400)
lc

In [None]:
%time lc=lc.load()
#or write to disk directly
#lc.to_netcdf('/scratch/brown/kong97/labor.nc')

## Some tips

- select good chunksize
  - small enough so that many chunks can fit in memory at once
  - large enough to avoid overhead (rare to see chunk size below 100MB)
- orient your chunk
  - the way we chunk matters; if we often slice along 'time' dimension, it's better to chunk along it.
- avoid too many tasks
  - every task comes with overhead (200us ~ 1ms);  millions of tasks lead to overhead of 10 minutes ~ hours
  - easy to create too many tasks: ```array_a+1``` can create many new tasks
  - avoid too small chunks
  - Fusing operations together and use ```xr.apply_ufunc()```

```python
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
cluster = SLURMCluster(
    queue="huberm",
    cores=24,
    processes=1,
    local_directory='/tmp',
    project="huberm",
    memory="80 GB",
    walltime="00:30:00",
    interface='ib0' # choose the faster network
)

client=Client(cluster)
cluster.scale(5)
cluster.adapt(minimum=2, maximum=10)
```

# combine Cython and Dask

In [None]:
import fsspec
import intake
import zarr
import pandas as pd
from wetbulb import wetbulb
import xarray as xr
import numpy as np
from numba import njit
import dask
from dask.distributed import Client
client = Client(processes=False)

In [None]:
df = pd.read_csv('https://storage.googleapis.com/cmip6/cmip6-zarr-consolidated-stores.csv')
df_data=df.query("source_id=='GFDL-CM4'& experiment_id=='historical' & table_id=='3hr' & variable_id==['tas', 'huss','ps'] & grid_label=='gr1'")
def drop_all_bounds(ds):
    """Drop coordinates like 'time_bounds' from datasets,
    which can lead to issues when merging."""
    drop_vars = [vname for vname in ds.coords
                 if (('_bounds') in vname ) or ('_bnds') in vname or ('height') in vname]
    return ds.drop(drop_vars)
def open_dsets(df):
    """Open datasets from cloud storage and return xarray dataset."""
    dsets = [xr.open_zarr(fsspec.get_mapper(ds_url), consolidated=True)
             .pipe(drop_all_bounds)
             for ds_url in df.zstore]
    return dsets
dsets = dict() 
for group, df in df_data.groupby(by=['variable_id']):
    dsets[group] = open_dsets(df)
dsets

In [None]:
dsets['tas'][0]

In [None]:
huss=dsets['huss'][0].huss.sel(time='2001') #specific humidity
tas=dsets['tas'][0].tas.sel(time='2001') # 2m air temperature
ps=dsets['ps'][0].ps.sel(time='2001') # surface air pressure

In [None]:
huss

In [None]:
wb=xr.apply_ufunc(wetbulb,tas.astype('float64'),huss.astype('float64'), ps.astype('float64'),dask="parallelized",output_dtypes=[float])
wb

In [None]:
%time wb=wb.load()

In [None]:
wb