# Make Python go Brrrrr - Joseph Long 03/22/2022
I want to parallel process some optical physics
- Gaussian Beamlet Decomposition
- Polarization Ray Tracing

### Vocab


Numba allows you to write your own kernels that operate on Cupy arrays!

Cupy uses pre-compiled functions

## "Embarrassingly Parallel" Problems
Things where you don't have to do anything to re-structure the problem to parallelize it
- Divide into units that don't require communication between units!
- Units have the input information
- Units don't need to ask their neighbor
- Units can write their result all at once

Astro Examples
- Testing orbit fits with different parameters
- calibrating the contrast at many points w/ fake planet injections
- dark/bias/flat correcting a whole night of observations

## Non-"Embarrassingly Parallel" Problems
Units that need to communicate to each other
- Common when writing out results
- Need synchronization points

Astro Examples
- N body simulations
- fluid dynamics simulations
- file compression

## Functional Purity
_pure functions:_ operate identically to math functions e.g. f(x) = 2x
_impure functions_: have side effects e.g. 

`def f(x):`

`    print('called f(x)')`

`    return 2x`

Pure functions are better for parallelization! 

## The first problem
- your code is slow!
- your program maxes out 1 core!
- your python code uses `multiprocessing` to use all cores on your computer but NOT on multiple computers in parallel!

## The `Ray` python package
Built for parallelism. Ignore most of what goes on under the hood.

In [1]:
# The slow version!
import time
import random

def time_waster(min_delay: float = 1, max_delay: float=1) -> float:
    
    delay = (max_delay-min_delay)*random.random() + min_delay
    time.sleep(delay)
    return delay

def main():
    for _ in range(10):
        result = time_waster(min_delay=2,max_delay=3)
        print(f"Delay was {result} sec")
        
if __name__ == "main":
    start = time.time()
    main()
    print(f"Took {time.time() - start}")
    
main()

Delay was 2.7271072245597465 sec
Delay was 2.659958782576988 sec
Delay was 2.3898726772562426 sec
Delay was 2.4741810141576788 sec
Delay was 2.1322021014016648 sec
Delay was 2.071398089942027 sec
Delay was 2.1619966366261036 sec
Delay was 2.6365671212551285 sec
Delay was 2.7687874992795596 sec
Delay was 2.3871740651692637 sec


In [2]:
# The ray version!
# It takes > 1s to start up, so if you plan to go faster this isn't worth it
import time
import random
import ray

@ray.remote # added decorator
def time_waster(min_delay: float = 1, max_delay: float=1) -> float:
    
    delay = (max_delay-min_delay)*random.random() + min_delay
    time.sleep(delay)
    return delay

def main():
    pending = []
    for _ in range(10):
        # This will return a reference to a process that isn't necesarilly finished computing
        ref = time_waster.remote(min_delay=2,max_delay=3) # change how it's called
        pending.append(ref) # hold on to results until they are done
        
        # submission is happening one at a time, process happens in parallel
        
    for ref in pending: # loop over results
        result = ray.get(ref) # get references
        print(f"Delay was {result} sec")
        
if __name__ == "main":
    start = time.time()
    main()
    print(f"Took {time.time() - start}")
    
main()

Delay was 2.5393779920580535 sec
Delay was 2.8071518478152004 sec
Delay was 2.135721757691998 sec
Delay was 2.974236252636493 sec
Delay was 2.099829523551635 sec
Delay was 2.407166373533177 sec
Delay was 2.234198802008069 sec
Delay was 2.052082425725765 sec
Delay was 2.892786093701405 sec
Delay was 2.4402318467233934 sec


## The Drawback
The process is limited by the slowest operation. You can actually just return in the order the results are ready!

## Caveats
- Cluster-scale ray is hard to set up but can be really useful!
- Ray functions *only* run in the executor, making automatic tests awk
- Reccomend your analysis is regular python, and intensive functions are decorated with @ray.remote
- Visibility grows unavoidable complexity, test at small scales first

You can access the dashboard through the IP it prints in the terminal!

In [3]:
# Calling a non-wrapped version of the function
# Non-wrapped is _time_waster
# wrapped is time_waster

def _time_waster():
    return 1

time_waster = ray.remote(_time_waster)

# The Second kind of Problem
- try not to use for loops
- array broadcast when you can
- But you NEED to do loops for your process
- If there are so many network calls then ray is probably not meaningful

## Numba
A _just-in-time compiler:_ Built simmilarly to ahead-of-time compilers. Compile code into machine code. Python is an interpreted language, so it doesn't compile. Numba is kind of amazing! 

Apply one decorator, get instant performance*
Terms and conditions apply

@jit turns the function into just in time. But there are situations where it doesn't optimize well - but it'll default to do something correct!

@njit Nopython = True jit! If you can't compile this completely into something ignoring python objects, DON'T BOTHER and throw an error. Just get me to machine code! Only works with primitive data types.

In [16]:
import numba
import time
import numpy as np
import random

def demo(arr):
    width,height = arr.shape
    out = np.zeros_like(arr)
    for i in range(height):
        for j in range(width):
            out[j,i] = random.random()
            
@numba.jit # just in time compilation
def demo_faster(arr):
    width,height = arr.shape
    out = np.zeros_like(arr)
    for i in range(height):
        for j in range(width):
            out[j,i] = random.random()
            
@numba.njit(parallel=True) # 
def demo_fastest(arr):
    planes,width,height = arr.shape
    outcube = np.zeros_like(arr)
    for p in numba.prange(planes): # only parallelize the top level! You can break your problem up into smaller bits
        for i in range(height):
            for j in range(width):
                outcube[p,j,i] = random.random()
            
# Calling jit takes a while for the first time, but then it's faster!

def main():
    N = 100
    cube = np.arange(N**3,dtype=float).reshape(N,N,N)
    outcube = np.zeros_like(cube)
    
    #warmup
    for impl in (demo,demo_faster,demo_fastest):
        impl(cube[0])
        
    for impl in (demo,demo_faster,demo_fastest):
        start = time.time()
        for i in range(N):
            outcube[i] = impl(cube[i])
        print(f"time per iteration: {(time.time()-start)/N}")
        
main()

TypingError: Failed in nopython mode pipeline (step: nopython frontend)
Internal error at <numba.core.typeinfer.ExhaustIterConstraint object at 0x7ff730203430>.
[1m[1mwrong tuple length for '$4load_attr.1': expected 3, got 2[0m
[0m[1mDuring: typing of exhaust iter at <ipython-input-16-8f79f43d97cc> (23)[0m
Enable logging at debug level for details.
[1m
File "<ipython-input-16-8f79f43d97cc>", line 23:[0m
[1mdef demo_fastest(arr):
[1m    planes,width,height = arr.shape
[0m    [1m^[0m[0m


In [None]:
# You can replace range() with numba.prange()