In [1]:
# dask
import dask
from dask.distributed import Client
from dask.distributed import deploy
from dask_jobqueue import PBSCluster

# standard lib
from pathlib import Path
import io

# image processing
from PIL import Image

# numerical libraries
import numpy

In [2]:
cluster = PBSCluster(
    cores=24,
    memory="10GB",
    walltime=None,
    resource_spec="h_vmem=10G,mem_free=240G",
    processes=6,
    project="SIP",
    job_extra=("-pe serial 24", "-j y", "-o ~/logs/dask_workers.out")
)

In [14]:
print(cluster.job_script())

#!/usr/bin/env bash

#$ -N dask-worker
#$ -A SIP
#$ -l h_vmem=10G,mem_free=240G
#$ -pe serial 24
#$ -j y
#$ -o ~/logs/dask_workers.out

/home/maximl/personal/miniconda3/envs/dask/bin/python -m distributed.cli.dask_worker tcp://172.24.4.77:33946 --nthreads 4 --nprocs 6 --memory-limit 1.55GiB --name dummy-name --nanny --death-timeout 60 --protocol tcp://



In [15]:
cluster.scale(jobs=4)

In [16]:
client = Client(cluster)

In [17]:
@dask.delayed
def load_multiframe_tiff(p):
    im = Image.open(p)
    arr = numpy.empty(shape=(im.n_frames, im.height, im.width), dtype=float)
    for i in range(im.n_frames):
        im.seek(i)
        arr[i] = numpy.array(im)
    return arr

In [20]:
images = []
for p in Path("/home/maximl/personal/data/vulcan_pbmc/").glob("*.tiff"):
    images.append(load_multiframe_tiff(str(p)))

In [21]:
%time
pixels = dask.compute(*images)

CPU times: user 8 µs, sys: 2 µs, total: 10 µs
Wall time: 24.8 µs


In [12]:
client.close()
cluster.close()