## Why Dask?

In [1]:
import numpy as np

In [2]:
# The following numpy array will give MemoryError because of it's size
# images = np.ones((10000,10000,10000)) # Creates a 3d array with given dimensions
# images

## Basic Dask

In [3]:
import dask.array as da

In [4]:
 # Creates a 3d array with given dimensions
images = da.ones((10000,10000,10000))
images

Unnamed: 0,Array,Chunk
Bytes,7.28 TiB,126.51 MiB
Shape,"(10000, 10000, 10000)","(255, 255, 255)"
Dask graph,64000 chunks in 1 graph layer,64000 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.28 TiB 126.51 MiB Shape (10000, 10000, 10000) (255, 255, 255) Dask graph 64000 chunks in 1 graph layer Data type float64 numpy.ndarray",10000  10000  10000,

Unnamed: 0,Array,Chunk
Bytes,7.28 TiB,126.51 MiB
Shape,"(10000, 10000, 10000)","(255, 255, 255)"
Dask graph,64000 chunks in 1 graph layer,64000 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [5]:
# Create dask array with user defined chunks
x = da.ones((100,), chunks=(10,))
x

Unnamed: 0,Array,Chunk
Bytes,800 B,80 B
Shape,"(100,)","(10,)"
Dask graph,10 chunks in 1 graph layer,10 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 800 B 80 B Shape (100,) (10,) Dask graph 10 chunks in 1 graph layer Data type float64 numpy.ndarray",100  1,

Unnamed: 0,Array,Chunk
Bytes,800 B,80 B
Shape,"(100,)","(10,)"
Dask graph,10 chunks in 1 graph layer,10 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [6]:
# Dask has lazy-computation until compute() call it will not execute
result = x.sum()
print(result.compute())

100.0


In [7]:
# Create a Dask array from an predefined numpy array
data = np.arange(100000).reshape(200, 500)
a = da.from_array(data, chunks=(100, 100))
a

Unnamed: 0,Array,Chunk
Bytes,781.25 kiB,78.12 kiB
Shape,"(200, 500)","(100, 100)"
Dask graph,10 chunks in 1 graph layer,10 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 781.25 kiB 78.12 kiB Shape (200, 500) (100, 100) Dask graph 10 chunks in 1 graph layer Data type int64 numpy.ndarray",500  200,

Unnamed: 0,Array,Chunk
Bytes,781.25 kiB,78.12 kiB
Shape,"(200, 500)","(100, 100)"
Dask graph,10 chunks in 1 graph layer,10 chunks in 1 graph layer
Data type,int64 numpy.ndarray,int64 numpy.ndarray


In [8]:
# show the chunk structure
a.chunks

((100, 100), (100, 100, 100, 100, 100))

In [9]:
# Access a specific chunk
a.blocks[0, 4]

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,78.12 kiB
Shape,"(100, 100)","(100, 100)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 78.12 kiB 78.12 kiB Shape (100, 100) (100, 100) Dask graph 1 chunks in 2 graph layers Data type int64 numpy.ndarray",100  100,

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,78.12 kiB
Shape,"(100, 100)","(100, 100)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray


In [23]:
# Create a Dask array
a = da.from_array([[0, 20, 30], [40, 50, 60], [70, 80, 90]], chunks=(2, 2))
print("Array:\n", a.compute())
print("\nChunk Structure:\n", a.chunks)

Array:
 [[ 0 20 30]
 [40 50 60]
 [70 80 90]]

Chunk Structure:
 ((2, 1), (2, 1))


## Indexing

In [24]:
# Indexing to get a single element
element = a[1, 2].compute()
print("Single element:", element)

Single element: 60


In [25]:
# Slicing along the first dimension
slice_result = a[:1, 2].compute()
print("Sliced along the first dimension:\n", slice_result)

Sliced along the first dimension:
 [30]


In [26]:
# Slicing along the second dimension
slice_result = a[:, 1:].compute()
print("Sliced along the second dimension:\n", slice_result)

Sliced along the second dimension:
 [[20 30]
 [50 60]
 [80 90]]


## Statistics

In [11]:
# Finding average of the whole array
a.mean().compute()

48.888888888888886

In [12]:
# Finding sum of all elements in array
result = a.sum()
result.compute()

440

In [13]:
# Compute sin value of each element
np.sin(a).compute()

array([[ 0.        ,  0.91294525, -0.98803162],
       [ 0.74511316, -0.26237485, -0.30481062],
       [ 0.77389068, -0.99388865,  0.89399666]])

In [14]:
# Transpose
a.T.compute()

array([[ 0, 40, 70],
       [20, 50, 80],
       [30, 60, 90]])

In [18]:
# Sum
result_sum = a.sum()
print("Sum of Elements:", result_sum.compute())

# Mean
result_mean = a.mean()
print("\nMean of Elements:", result_mean.compute())

# Maximum
result_max = a.max()
print("\nMaximum Element:", result_max.compute())

Sum of Elements: 45

Mean of Elements: 5.0

Maximum Element: 9


In [None]:
# Dot product
result_dot = da.dot(a, b)
print("Dot Product:\n", result_dot.compute())

# Singular value decomposition
# result_svd = da.linalg.svd(a)
# print("\nSingular Value Decomposition:\n", result_svd)

In [None]:
# Dot product
result_dot = da.dot(a, b)
print("Dot Product:\n", result_dot.compute())

# Singular value decomposition (Not implemented in Dask)
# Use NumPy for SVD
import numpy as np
u, s, vt = np.linalg.svd(a.compute())
print("\nSingular Value Decomposition (NumPy):\n", u, s, vt)

In [None]:
# Random normal distribution
result_random_normal = da.random.normal(size=(3, 3), chunks=(2, 2))
print("Random Normal Distribution:\n", result_random_normal.compute())

# Random uniform distribution
result_random_uniform = da.random.uniform(size=(3, 3), chunks=(2, 2))
print("\nRandom Uniform Distribution:\n", result_random_uniform.compute())

In [39]:
# Compute the standard deviation.
result_std = a.std()
print(result_std.compute())

# Compute the variance.
result_var = a.var()
print(result_var.compute())

# Indices of the minimum and maximum elements.
result_argmin = a.argmin()
result_argmax = a.argmax()
print(result_argmin.compute())
print(result_argmax.compute())

2.581988897471611
6.666666666666667
0
8


## Create random array

In [15]:
result_random = da.random.normal(size=(3, 3), chunks=(2, 2))
print("\nRandom Array:\n", result_random.compute())


Random Array:
 [[-0.1303064  -2.11158956  1.04618388]
 [ 0.35214228  0.69922617  0.96844703]
 [-1.51867736 -0.33832875  0.35890898]]


## Arithmetic Operations

In [30]:
# Create Dask arrays
a = da.from_array([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2))
b = da.from_array([[9, 8, 7], [6, 5, 4], [3, 2, 1]], chunks=(2, 2))
print("First Array:\n", a.compute())
print("\nSecond Array:\n", b.compute())

First Array:
 [[1 2 3]
 [4 5 6]
 [7 8 9]]

Second Array:
 [[9 8 7]
 [6 5 4]
 [3 2 1]]


In [31]:
# Addition
result_add = a + b
print("Addition:\n", result_add.compute())

Addition:
 [[10 10 10]
 [10 10 10]
 [10 10 10]]


In [32]:
# Element-wise multiplication
result_multiply = a * b
print("\nElement-wise Multiplication:\n", result_multiply.compute())


Element-wise Multiplication:
 [[ 9 16 21]
 [24 25 24]
 [21 16  9]]


In [33]:
# Square root
result_sqrt = da.sqrt(a)
print("\nSquare Root:\n", result_sqrt.compute())


Square Root:
 [[1.         1.41421356 1.73205081]
 [2.         2.23606798 2.44948974]
 [2.64575131 2.82842712 3.        ]]


In [34]:
# Matrix multiplication
c = da.matmul(a,b)
c.compute()

array([[ 30,  24,  18],
       [ 84,  69,  54],
       [138, 114,  90]])

## Reshaping Concatenate and Transpose

In [35]:
# Reshape
result_reshape = a.reshape((1, 9))
print("Reshaped Array:\n", result_reshape.compute())

Reshaped Array:
 [[1 2 3 4 5 6 7 8 9]]


In [36]:
# Transpose
result_transpose = a.transpose()
print("\nTransposed Array:\n", result_transpose.compute())


Transposed Array:
 [[1 4 7]
 [2 5 8]
 [3 6 9]]


In [37]:
# Concatenate along rows
result_concatenate_rows = da.concatenate([a, b], axis=0)
print("Concatenated along Rows:\n", result_concatenate_rows.compute())


Concatenated along Rows:
 [[1 2 3]
 [4 5 6]
 [7 8 9]
 [9 8 7]
 [6 5 4]
 [3 2 1]]


In [38]:
# Concatenate along columns
result_concatenate_columns = da.concatenate([a, b], axis=1)
print("Concatenated along Columns:\n", result_concatenate_columns.compute())


Concatenated along Columns:
 [[1 2 3 9 8 7]
 [4 5 6 6 5 4]
 [7 8 9 3 2 1]]


## Persist and Distributed computing

In [40]:
# Compute (force computation)
result_compute = a.compute()
print("Computed Array:\n", result_compute)

# Persist (persist data in memory for future computations)
a_persisted = a.persist()
print("\nArray Persisted:", a_persisted)

# Visualize (visualize the Dask computation graph)
da.visualize(a, filename='dask_graph.png')

Computed Array:
 [[1 2 3]
 [4 5 6]
 [7 8 9]]

Array Persisted: dask.array<array, shape=(3, 3), dtype=int64, chunksize=(2, 2), chunktype=numpy.ndarray>


AttributeError: module 'dask.array' has no attribute 'visualize'

In [None]:
from dask.distributed import Client

client = Client()
client.persist(a)  # Persist the array in memory
client.compute(a)  # Trigger computation

# Now you can view the Dask dashboard in your web browser
# The dashboard URL will be printed in the output
print(client.dashboard_link)

In [42]:
# High level view of array
a.dask
# a.visualize()

0,1
"layer_type  MaterializedLayer  is_materialized  True  number of outputs  4  shape  (3, 3)  dtype  int64  chunksize  (2, 2)  type  dask.array.core.Array  chunk_type  numpy.ndarray",3  3

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,4
shape,"(3, 3)"
dtype,int64
chunksize,"(2, 2)"
type,dask.array.core.Array
chunk_type,numpy.ndarray


## Low level Interfaces

In [None]:
import dask
from dask import delayed

@delayed
def inc(x):
    return x + 1

@delayed
def add(x, y):
    return x + y

a = inc(1)       # Delayed object representing inc(1)
b = inc(2)       # Delayed object representing inc(2)
c = add(a, b)    # Delayed object representing add(inc(1), inc(2))

# At this point, no actual computation has happened.

# To trigger the computation, you can use dask.compute or call compute on the result
result = dask.compute(c)
# or
c_result = c.compute()

# The result now contains the computed value of c
print(result)


In [None]:
from dask.distributed import Client

client = Client()

def inc(x):
   return x + 1

def add(x, y):
   return x + y

a = client.submit(inc, 1)     # work starts immediately
b = client.submit(inc, 2)     # work starts immediately
c = client.submit(add, a, b)  # work starts immediately

c = c.result()                # block until work finishes, then gather result

In [None]:
from dask.distributed import Client
client = Client()
client

In [None]:
import dask.array as da
a = da.arange(101, chunks=(10,))
a.sum().compute()

In [None]:
a.chunks

In [45]:
b = a.rechunk(chunks = (1,20))
b.chunks

((1, 1, 1), (3,))

## Stack

In [58]:
# Stack two array
# Initialize two arrays
arr0 = da.from_array(np.zeros((3, 4), dtype='int16'), chunks=(1, 2))
print("Array1:\n", arr0.compute())
arr1 = da.from_array(np.ones((3, 4), dtype='int16'), chunks=(1, 2))
print("Array2:\n", arr1.compute())

Array1:
 [[0 0 0 0]
 [0 0 0 0]
 [0 0 0 0]]
Array2:
 [[1 1 1 1]
 [1 1 1 1]
 [1 1 1 1]]


In [59]:
# stack arr0 on top of arr1 horizontally
x = da.stack([arr0, arr1], axis=0)
x.compute()

array([[[0, 0, 0, 0],
        [0, 0, 0, 0],
        [0, 0, 0, 0]],

       [[1, 1, 1, 1],
        [1, 1, 1, 1],
        [1, 1, 1, 1]]], dtype=int16)

In [60]:
x.shape

(2, 3, 4)

In [65]:
# stack arr0 on top of arr1 horizontally
a = np.array([[1,2,3],[4,5,6],[7,8,9]])
b = np.array([[10,11,12],[13,14,15],[16,17,18]])
x = da.stack([a, b], axis=0)
x.compute()

array([[[ 1,  2,  3],
        [ 4,  5,  6],
        [ 7,  8,  9]],

       [[10, 11, 12],
        [13, 14, 15],
        [16, 17, 18]]])

In [66]:
# Each column is stack on the corresponding column and make one array
x = da.stack([a, b], axis=1)
x.compute()

array([[[ 1,  2,  3],
        [10, 11, 12]],

       [[ 4,  5,  6],
        [13, 14, 15]],

       [[ 7,  8,  9],
        [16, 17, 18]]])

In [62]:
x.shape

(3, 2, 4)

## Block

In [67]:
# Block of arrays
data = [
    [arr0, arr1],
    [arr1, arr0]
]
x = da.block(data)
x.compute()

array([[0, 0, 0, 0, 1, 1, 1, 1],
       [0, 0, 0, 0, 1, 1, 1, 1],
       [0, 0, 0, 0, 1, 1, 1, 1],
       [1, 1, 1, 1, 0, 0, 0, 0],
       [1, 1, 1, 1, 0, 0, 0, 0],
       [1, 1, 1, 1, 0, 0, 0, 0]], dtype=int16)