## Reading + Loading + Processing

### Reading w/ skimage

In [1]:
from skimage.io import imread
import glob
import time

image_paths = glob.glob('/nobackup/kp276129/test/1*/slice_010*') # 10 4k tif

t1 = time.perf_counter()
imgs = [imread(img) for img in image_paths]
t2 = time.perf_counter()

print(f"Time taken to read {len(image_paths)} images: {t2 - t1} seconds")

Time taken to read 10 images: 0.0955035351216793 seconds


### Reading w/ nvimagecodec

In [2]:
# WAITING FOR NVIDIA TO FIX THE ISSUE https://github.com/NVIDIA/nvImageCodec/issues/5
# from nvidia import nvimgcodec
# import time
# import glob

# params = nvimgcodec.DecodeParams(color_spec=nvimgcodec.ColorSpec.UNCHANGED, allow_any_depth=True)
# dec = nvimgcodec.Decoder()

# t1 = time.perf_counter()
# imgs=dec.read(glob.glob('/nobackup/kp276129/test/1*/slice_010*'), params)
# t2 = time.perf_counter()
# print(f"Time taken to read images: {t2 - t1} seconds")

### Loading into GPU (CuPy)

In [3]:
import cupy as cp

imgs_cp = []

# Converting 10 images to CuPy arrays
t3 = time.perf_counter()
for img in imgs:
    imgs_cp.append(cp.asarray(img).squeeze())
t4 = time.perf_counter()

print(f"Time taken to convert {len(imgs)} images to CuPy arrays: {t4 - t3} seconds")

Time taken to convert 10 images to CuPy arrays: 0.24748434871435165 seconds


### Processing vnsr2d

In [4]:
import sys
sys.path.append('..')
from src.pyvsnr import vsnr2d

filters=[{'name':'Dirac', 'noise_level':0.35}]

# Time to apply vsnr2d to 10 images one by one
t5 = time.perf_counter()
for img in imgs_cp:
    vsnr2d(img, filters)
t6 = time.perf_counter()

print(f"Time taken to apply vsnr2d to {len(imgs)} images: {t6 - t5} seconds")

------------------- cuFFT plan cache (device 0) -------------------
cache enabled? True
current / max size   : 0 / 16 (counts)
current / max memsize: 0 / (unlimited) (bytes)
hits / misses: 0 / 0 (counts)

cached plans (most recently used first):

------------------- cuFFT plan cache (device 0) -------------------
cache enabled? True
current / max size   : 1 / 16 (counts)
current / max memsize: 109100032 / (unlimited) (bytes)
hits / misses: 0 / 1 (counts)

cached plans (most recently used first):
key: ((2885, 2363), (2885, 2363), 1, 1, (2885, 1182), 1, 1, 42, 1, 'C', 1, 1182), plan type: PlanNd, memory usage: 109100032

------------------- cuFFT plan cache (device 0) -------------------
cache enabled? True
current / max size   : 1 / 16 (counts)
current / max memsize: 109100032 / (unlimited) (bytes)
hits / misses: 1 / 1 (counts)

cached plans (most recently used first):
key: ((2885, 2363), (2885, 2363), 1, 1, (2885, 1182), 1, 1, 42, 1, 'C', 1, 1182), plan type: PlanNd, memory usage: 1091

### Time Distribution

In [None]:
import matplotlib.pyplot as plt

activities = ['Reading', 'Loading', 'Processing']

time_taken = [t2-t1, t4-t3, t6-t5]

# Create a pie chart
plt.pie(time_taken, labels=activities, autopct='%1.1f%%', startangle=140)

# Add title and legend
plt.title('Time Distribution')
plt.legend(activities, loc="best")

# Display the plot
plt.show()

## VRAM Usage

### VRAM Usage for Batch Processing FFT

In [None]:
import cupy as cp
import matplotlib.pyplot as plt 
import pynvml
import os

def image_generator(nb_img, batch_size):
    for i in range(0, nb_img, batch_size):
        batch_imgs = cp.random.rand(batch_size, 4224, 4224, dtype=cp.float32)
        yield batch_imgs

def get_vram_usage():
    pynvml.nvmlInit()
    pid = os.getpid()
    handle = pynvml.nvmlDeviceGetHandleByIndex(0)
    info = pynvml.nvmlDeviceGetComputeRunningProcesses(handle)
    for p in info:
        if p.pid == pid:
            return p.usedGpuMemory / 1024**2
    pynvml.nvmlShutdown()

nb_img = 50
batch_sizes = [5,10]
vram_usage = []

# Processing simple FFT
for batch_size in batch_sizes:
    for batch_imgs in image_generator(nb_img, batch_size):
        filters=[{'name':'Dirac', 'noise_level':0.35}]
        cp.fft.fft2(batch_imgs)
    vram_usage.append(get_vram_usage())

# Calculate VRAM usage per image
for i in range(1, len(batch_sizes)):
    vram_per_image = (vram_usage[i] - vram_usage[i-1]) / (batch_sizes[i] - batch_sizes[i-1])
    print(f"VRAM usage per image for batch size {batch_sizes[i]} is: {vram_per_image}")

plt.xlabel('Batch Size')
plt.ylabel('VRAM Usage (MB)')
plt.plot(batch_sizes, vram_usage)

### VRAM Usage Sequential vs Batch

In [None]:
import sys
sys.path.append('..')
from pyvsnr.vsnr2d import vsnr2d
import cupy as cp
import os
import pynvml
import matplotlib.pyplot as plt

def get_vram_usage():
    pynvml.nvmlInit()

    pid = os.getpid()
    handle = pynvml.nvmlDeviceGetHandleByIndex(0)
    info = pynvml.nvmlDeviceGetComputeRunningProcesses(handle)

    for p in info:
        if p.pid == pid:
            return p.usedGpuMemory / 1024**3
    
    pynvml.nvmlShutdown()

def batch_generator(img, num_img, batch_size):
    for i in range(0, num_img, batch_size):
        batch = cp.stack([img]*batch_size)
        yield batch

filters=[{'name':'Dirac', 'noise_level':0.35}]
img = cp.random.rand(2048, 2048).astype(cp.float32)
num_img = 60
batch_sizes = [5,10,20]


init_vram = get_vram_usage()
for _ in range(num_img):
    vsnr2d(img, filters, algo='cuda')
single_img_vram = get_vram_usage()

batch_vram = []
for batch_size in batch_sizes:
    for batch in batch_generator(img, num_img, batch_size):
        vsnr2d(batch, filters, algo="cupy")
    batch_vram.append(get_vram_usage())
    
# Plot VRAM usage data
plt.figure(figsize=(10, 6))
plt.plot(batch_sizes, batch_vram, 'o-', label='Batch Processing')
plt.axhline(y=single_img_vram, color='b', linestyle='--', label='Single Image')
plt.axhline(y=init_vram, color='r', linestyle='--', label='Initial VRAM')
plt.xticks(batch_sizes)  # Set the x-ticks to be the batch sizes
plt.xlabel('Batch Size')
plt.ylabel('VRAM Usage (GB)')
plt.legend()
plt.grid(True)
plt.show()

## Speed Tests

#### Pyvsnr Profiling

In [None]:
import sys
import pstats
import cProfile
import io

sys.path.append('../')
import src.pyvsnr as pyvsnr
import cupy as cp

img = cp.random.rand(2000, 2000)
filters=[{'name':'Dirac', 'noise_level':0.35}]

pr = cProfile.Profile()
pr.enable()

pyvsnr.vsnr2d(img, filters, algo='numpy')

pr.disable()
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('time')
ps.print_stats()

print(s.getvalue())

#### Pyvsnr Average Time

In [None]:
import sys
sys.path.append('..')
from src.pyvsnr import vsnr2d
# import cupy as cp
import numpy as np
import time

# filters=[{'name':'Gabor', 'noise_level':0.30, 'sigma'}]
filters = [{ "name":"Gabor", "noise_level" : 30, "sigma" : (0.5, 80), "theta" : 0 },
           { "name":"Gabor", "noise_level" : 80, "sigma" : (0.5, 200), "theta" : 0 }]

img = np.random.rand(2000, 2000)
nit=5

# Generate different images for each iteration
imgs = [np.random.randint(0,255,(2000, 2000)).astype(np.uint16) for _ in range(nit)]

t1 = time.perf_counter()
for img in imgs:
    vsnr2d(img, filters, maxit=50, cvg_threshold=0, norm=False, algo="numpy")
t2 = time.perf_counter()

# print average
print(f"Average time to apply vsnr2d: {(t2-t1)/nit} seconds")

In [None]:
import sys
sys.path.append('..')
from src.pyvsnr import vsnr2d
# import cupy as cp
import numpy as np
from tifffile import TiffFile
import time

# filters=[{'name':'Gabor', 'noise_level':0.30, 'sigma'}]
filters = [{ "name":"Gabor", "noise_level" : 30, "sigma" : (0.5, 80), "theta" : 0 },
           { "name":"Gabor", "noise_level" : 80, "sigma" : (0.5, 200), "theta" : 0 }]


with TiffFile('/home/kp276129/Documents/pystack3d/assets/stacks/stack_1/ESB/slice_00100_z=1.0894um.tif') as tif:
    img_esb = tif.asarray()

img_esb = img_esb[1224:3224, 1000:3000]
nit=5


# vsnr2d(img, filters, algo='cupy') # warm up for GPU Only

# Generate different images for each iteration
imgs = [img_esb.copy() for _ in range(nit)]

t1 = time.perf_counter()
for img in imgs:
    vsnr2d(img, filters, maxit=50, cvg_threshold=0, norm=False, algo="numpy")
t2 = time.perf_counter()

# print average
print(f"Average time to apply vsnr2d: {(t2-t1)/nit} seconds")

#### Pyvsnr Batch Average Time

In [None]:
import sys
sys.path.append('..')
from pyvsnr.vsnr2d import vsnr2d
import cupy as cp
import time

def batch_generator(images, batch_size):
    for i in range(0, len(images), batch_size):
        yield images[i:i+batch_size]


nb_img = 200
batch_size = 10
filters=[{'name':'Dirac', 'noise_level':0.35}]
imgs = cp.random.rand(nb_img, 2048, 2048).astype(cp.float32)

t1 = time.perf_counter()
for batch in batch_generator(imgs, batch_size):
    vsnr2d(batch, filters, algo='cupy')
t2 = time.perf_counter()

print(f"Average time to apply vsnr2d_batch: {(t2-t1)/nb_img} seconds")


#### Time Sequential vs Batch

In [None]:
import sys
sys.path.append('..')
from pyvsnr.vsnr2d import vsnr2d
import numpy as np
import cupy as cp
import matplotlib.pyplot as plt
import time

def batch_generator(img, num_img, batch_size):
    for i in range(0, num_img, batch_size):
        batch = np.stack([img]*batch_size)
        yield batch

filters=[{'name':'Dirac', 'noise_level':0.35}]
img = np.random.rand(2048, 2048).astype(cp.float32)
num_img = 200
batch_sizes = [5,10,20]

# measure average time on 60 images
single_processing_time = 0
for _ in range(num_img):
    t1 = time.perf_counter()
    vsnr2d(img, filters, algo='cupy')
    t2 = time.perf_counter()
    single_processing_time += t2 - t1


batch_times = []
for batch_size in batch_sizes:
    t1 = time.perf_counter()
    for batch in batch_generator(img, num_img, batch_size):
        vsnr2d(batch, filters, algo='cupy')
    t2 = time.perf_counter()
    print(f"Time taken to process {num_img} images with batch size {batch_size}: {t2 - t1} seconds")
    batch_times.append(t2 - t1)  # Add the time taken for the current operation

# Plot time data
fig, ax = plt.subplots(figsize=(10, 6))
ax.axhline(y=single_processing_time, color='r', linestyle='--', label='Single Processing')
ax.plot(batch_sizes, batch_times, 'o', label='Batch Processing')
ax.set_xlabel('Batch Size')
ax.set_ylabel('Time (s)')
ax.legend()
ax.grid(True)
plt.show()

#### Multrithreading on pyvsnr

In [None]:
# Actually slower than processing in batch
import sys
from multiprocessing import Pool
sys.path.append('..')
from src.pyvsnr import vsnr2d
import numpy as np
import time

filters=[{'name':'Dirac', 'noise_level':0.35}]
img = np.random.rand(2048, 2048) # CuPy usage leads to CUDA initialization error
nit=2

def task(i):
    vsnr2d(img, filters, algo="numpy")

# Calculating average time for 100 images
t1 = time.perf_counter()
with Pool(10) as pool:
    pool.map(task, range(nit))
t2 = time.perf_counter()

# print average
print(f"Average time to apply vsnr2d: {(t2-t1)/nit} seconds")


### Cupy Streams

#### Overlap data transfer and computation

In [None]:
import cupy as cp
from pyvsnr.vsnr2d import vsnr2d
from tifffile import imread
import glob
import time
from concurrent.futures import ThreadPoolExecutor, as_completed


# List of image paths
image_paths = glob.glob('/nobackup/kp276129/test/1*/*')

# Number of images and batch size
num_img = 100
batch_size = 10

def process_image(image):
    return vsnr2d(image, [{'name':'Dirac', 'noise_level':0.35}])

# Function to load a batch of images
def load_batch(paths):
    return [cp.array(imread(path)) for path in paths]

# Function to process a batch of images
def process_batch(batch):
    return [process_image(image) for image in batch]

# Create a ThreadPoolExecutor with more workers for better concurrency
executor = ThreadPoolExecutor(max_workers=4)

t1 = time.perf_counter()

# Initialize the first batch loading
futures = {executor.submit(load_batch, image_paths[i:i+batch_size]): i for i in range(0, num_img, batch_size)}
results = []

for future in as_completed(futures):
    i = futures[future]
    batch = future.result()
    t_start = time.perf_counter()
    result = process_batch(batch)
    t_end = time.perf_counter()
    results.append(result)
    # print(f"Processed batch {i//batch_size + 1}, processing time: {t_end - t_start:.4f} seconds")
    
    # Submit the next batch
    if i + batch_size < num_img:
        futures[executor.submit(load_batch, image_paths[i+batch_size:i+2*batch_size])] = i + batch_size

t2 = time.perf_counter()
print(f"Time taken with threads: {t2 - t1} seconds")

t3 = time.perf_counter()

# Process images sequentially without threads for comparison
for i in range(0, num_img, batch_size):
    batch = load_batch(image_paths[i:i+batch_size])
    t_start = time.perf_counter()
    result = process_batch(batch)
    t_end = time.perf_counter()
    # print(f"Processed batch {i//batch_size + 1}, processing time: {t_end - t_start:.4f} seconds")
    assert cp.allclose(results[i//batch_size], result)

t4 = time.perf_counter()
print(f"Time taken without threads: {t4 - t3} seconds")


#### Time using streams

In [None]:
import sys
sys.path.append('..')
from pyvsnr.vsnr2d import vsnr2d
import cupy as cp
import time

def calcul(arr, stream=None):
    if stream is None:
        stream = cp.cuda.Stream.null
    with stream:
        img_corr = vsnr2d(arr, [{'name':'Dirac', 'noise_level':0.35}])
        # img_corr = cp.fft.fft2(arr)
        # img_corr = cp.fft.ifft2(img_corr)
    return img_corr


img = cp.random.rand(2048, 2048).astype(cp.float32)
imgs = cp.stack([img]*10)

# Création de 10 streams CUDA
streams = [cp.cuda.Stream(non_blocking=True) for _ in range(10)]

t1 = time.perf_counter()
# Lancement des calculs en parallèle sur différents streams
for i in range(10):
    calcul(imgs[i], stream=streams[i])
t2 = time.perf_counter()

# Synchronisation des streams pour s'assurer que tous les calculs sont terminés
for stream in streams:
    stream.synchronize()
t3 = time.perf_counter()

print(f"Submit tasks: {t2 - t1} seconds")
print(f"vsnr2d to 10 images in parallel: {t3 - t1} seconds")

#### Time for sequential

In [None]:
import sys
sys.path.append('..')
from pyvsnr.vsnr2d import vsnr2d
import cupy as cp
import time

def simple_calcul_sequential(arr):
    img_corr = vsnr2d(arr, [{'name':'Dirac', 'noise_level':0.35}], algo='cupy')
    return img_corr

img = cp.random.rand(2048, 2048).astype(cp.float32)
imgs = cp.stack([img]*10)

# Lancement des calculs en séquentiel
t1 = time.perf_counter()
for i in range(10):
    simple_calcul_sequential(imgs[i])
t2 = time.perf_counter()

print(f"vsnr2d to 10 images in sequential: {t2 - t1} seconds")


#### Streams on smaller processing

In [None]:
import cupy as cp
import numpy as np
import time

rand = cp.random.RandomState(seed=1)

y = cp.random.normal(size=(2**24, 1)) # Create one random matrix in CPU

t1 = time.perf_counter()
for _ in range(10): # Iterate over streams and execute operations asynchronously
    x = rand.normal(size=(1, 2**24)) # Create other random matrix on GPU
    z = cp.matmul(x, y) # Multiply matrices
t2 = time.perf_counter()

print(f"Time to execute operations without streams: {t2 - t1} seconds")

streams = []
for i in range(10):
    streams.append(cp.cuda.Stream(non_blocking=True)) # Create the streams

y = cp.random.normal(size=(2**24, 1)) # Create one random matrix in CPU

t1 = time.perf_counter()
for stream in streams: # Iterate over streams and execute operations asynchronously
    with stream:
        x = rand.normal(size=(1, 2**24)) # Create other random matrix on GPU
        z = cp.matmul(x, y) # Multiply matrices

for stream in streams:
    stream.synchronize() 

t2 = time.perf_counter()

print(f"Time to execute operations asynchronously: {t2 - t1} seconds")

In [None]:
import time
import cupy as cp
import numpy as np

def some_gpu_operation(x):
    # Replace this with your actual GPU operation
    return cp.sin(x)

def test_stream_vs_sequential():
    # Create some data
    x = cp.random.rand(1000000)

    # Sequential execution
    start = time.time()
    for _ in range(10):
        y = some_gpu_operation(x)
    sequential_time = time.time() - start

    # Execution with stream
    start = time.time()
    stream = cp.cuda.Stream.null
    with stream:
        for _ in range(10):
            y = some_gpu_operation(x)
    stream_time = time.time() - start

    # print absolute difference between times in percentage
    print(f"Stream time={stream_time}, sequential={sequential_time}")
    percentage = int((sequential_time-stream_time)/sequential_time*100)  
    if percentage > 0:
        print(f"Stream execution was {percentage}% faster")
    else:
        print(f"Stream execution was {abs(percentage)}% slower")

    # Check that stream execution was faster
    assert stream_time < sequential_time, f"Stream time={stream_time}, but sequential={sequential_time}"

test_stream_vs_sequential()