# Client Graph
Notebook to manipulate a dask graph at the client stage.

## Example 1: Edit submit method

In [1]:
from dask.distributed import Client

# Create a Dask client
client = Client()

original_submit = client.submit

In [2]:
def modified_submit(*args, **kwargs):
    print("Entered Submit")
    print("Party if it works")
    return original_submit(*args, **kwargs)

In [3]:
client.submit = modified_submit

In [4]:
def neg(x):
    return -x


client.submit(neg, 3)

Entered Submit
Party if it works


In [5]:
client.shutdown()



## Example 2: Edit collections_to_dsk method

In [6]:
from dask.distributed import Client
import dask
import dask.array as da

from datetime import datetime

client = Client()

# Save original collections_to_dsk method
original_collections_to_dsk = client.collections_to_dsk

In [7]:
def modified_collections_to_dsk(collections, *args, **kwargs):
    c = datetime.now()
    print("Inside myFunc. Time = " + c.strftime("%H:%M:%S"))
    print("Dask Graph Tasks")
    for collection in collections:
        print(f"Task Name: {collection.name}")
    print("-----------------\n")

    return original_collections_to_dsk(collections, *args, **kwargs)


client.collections_to_dsk = modified_collections_to_dsk
print("collections_to_dsk method has been replaced")

collections_to_dsk method has been replaced


In [8]:
x = da.ones((1000, 1000), chunks=(100, 100))
y = x + 1
z = y.mean()

In [9]:
result = client.compute(z)

Inside myFunc. Time = 19:38:18
Dask Graph Tasks
Task Name: mean_agg-aggregate-c50263a0ad47c330577b3058195ad928
-----------------



In [10]:
client.shutdown()



# Example 3: Do some basic caching

### Example 3.1: Keep client running
This will cause an issue as the client still has memory of the task and hash

In [11]:
from dask.distributed import Client
import dask
import dask.array as da
from dask.highlevelgraph import HighLevelGraph
from datetime import datetime

client = Client()

original_collections_to_dsk = client.collections_to_dsk

In [12]:
existing_functions = []


def add_to_three(number):
    return number + 3


def modified_collections_to_dsk(collections, *args, **kwargs):
    c = datetime.now()
    print("Inside myFunc. Time = " + c.strftime("%H:%M:%S"))

    print("Functions already processed")
    global existing_functions
    for funcs in existing_functions:
        print(f"{funcs}")

    print("Dask Graph Tasks")
    for collection in collections:
        print(f"Task Name: {collection.name}")
        print("Dask Graph")
        print(collection.dask)
        if collection.name not in existing_functions:
            print(f"Adding {collection.name} to existing_functions")
            existing_functions.append(collection.name)
        else:
            print(f"{collection.name} has already been processed")
            # TODO: Continue playing around with the dask graph so return is not a string
            # TODO: Use collection hash rather than collection name
            layers = {
                f"add_to_three-{(collection.name)}": {
                    (f"add_to_three-{(collection.name)}", 0): (add_to_three, 20)
                }
            }
            dependencies = {f"add_to_three-{(collection.name)}": set()}
            graph = HighLevelGraph(layers, dependencies)
            collection.dask = graph
            print("New Dask Graph")
            print(collection.dask)

    print("-----------------")

    return original_collections_to_dsk(collections, *args, **kwargs)


client.collections_to_dsk = modified_collections_to_dsk

In [13]:
x = da.ones((1000, 1000), chunks=(100, 100))
y = x + 1
z = y.mean()

In [14]:
result = client.compute(z)

Inside myFunc. Time = 19:40:19
Functions already processed
Dask Graph Tasks
Task Name: mean_agg-aggregate-c50263a0ad47c330577b3058195ad928
Dask Graph
HighLevelGraph with 7 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f95fa07f3a0>
 0. ones_like-c53a97836143a00162e3470fef213e1e
 1. add-098526cf93d1ac0d3a8744f59d0814fb
 2. mean_chunk-4e5168d025885dfbdfd76df3f8083cda
 3. mean_combine-partial-0832826c732652f3243668d740a0d5f3
 4. mean_combine-partial-f33215f12dcb169d48ded3405ae5b4a2
 5. mean_combine-partial-f380b3499d120118c907e62d760350cc
 6. mean_agg-aggregate-c50263a0ad47c330577b3058195ad928

Adding mean_agg-aggregate-c50263a0ad47c330577b3058195ad928 to existing_functions
-----------------


In [15]:
result

In [16]:
result.result()

2.0

In [17]:
# Compute a second time
result = client.compute(z)

Inside myFunc. Time = 19:40:23
Functions already processed
mean_agg-aggregate-c50263a0ad47c330577b3058195ad928
Dask Graph Tasks
Task Name: mean_agg-aggregate-c50263a0ad47c330577b3058195ad928
Dask Graph
HighLevelGraph with 7 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f95fa07f3a0>
 0. ones_like-c53a97836143a00162e3470fef213e1e
 1. add-098526cf93d1ac0d3a8744f59d0814fb
 2. mean_chunk-4e5168d025885dfbdfd76df3f8083cda
 3. mean_combine-partial-0832826c732652f3243668d740a0d5f3
 4. mean_combine-partial-f33215f12dcb169d48ded3405ae5b4a2
 5. mean_combine-partial-f380b3499d120118c907e62d760350cc
 6. mean_agg-aggregate-c50263a0ad47c330577b3058195ad928

mean_agg-aggregate-c50263a0ad47c330577b3058195ad928 has already been processed
New Dask Graph
HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f95f8773c70>
 0. add_to_three-mean_agg-aggregate-c50263a0ad47c330577b3058195ad928

-----------------


Debugging information
---------------------
old task state: memory
old run_spec: (<function finalize at 0x7f96080ec280>, ([('mean_agg-aggregate-c50263a0ad47c330577b3058195ad928',)],), {})
new run_spec: (<function finalize at 0x7f96080ec280>, ([('mean_agg-aggregate-c50263a0ad47c330577b3058195ad928',)],), {})
old token: ('tuple', [('ae65af090ad79ae2e434bc4c7eb750244a1a40bb', []), ('tuple', [('list', [('tuple', ['mean_agg-aggregate-c50263a0ad47c330577b3058195ad928'])])]), ('dict', [])])
new token: ('tuple', [('ae65af090ad79ae2e434bc4c7eb750244a1a40bb', []), ('tuple', [('list', [('tuple', ['mean_agg-aggregate-c50263a0ad47c330577b3058195ad928'])])]), ('dict', [])])
old dependencies: {('mean_agg-aggregate-c50263a0ad47c330577b3058195ad928',)}
new dependencies: set()



In [18]:
client.shutdown()



### Example 3.2: Edit HighLevelGraph in a new cluster

In [19]:
from dask.distributed import Client
import dask
import dask.array as da
from dask.highlevelgraph import HighLevelGraph
from datetime import datetime

In [20]:
client = Client()

original_collections_to_dsk = client.collections_to_dsk

In [21]:
existing_functions = ["mean_agg-aggregate-c50263a0ad47c330577b3058195ad928"]


def modified_collections_to_dsk(collections, *args, **kwargs):
    c = datetime.now()
    print("Inside myFunc. Time = " + c.strftime("%H:%M:%S"))

    print("Functions already processed")
    global existing_functions
    for funcs in existing_functions:
        print(f"{funcs}")

    print("Dask Graph Tasks")
    for collection in collections:
        print(f"Task Name: {collection.name}")
        print("Dask Graph")
        print(collection.dask)
        if collection.name not in existing_functions:
            print(f"Adding {collection.name} to existing_functions")
            existing_functions.append(collection.name)
        else:
            print(f"{collection.name} has already been processed")
            # TODO: Continue playing around with the dask graph so return is not a string
            # TODO: Use collection hash rather than collection name
            layers = {
                f"add_to_three-{(collection.name)}": {
                    (f"add_to_three-{(collection.name)}", 0): (add_to_three, 20)
                }
            }
            dependencies = {f"add_to_three-{(collection.name)}": set()}
            graph = HighLevelGraph(layers, dependencies)
            collection.dask = graph
            # layers = {f"add_to_thre11e-{(collection.name)}": {(f"add_to_three-{(collection.name)}", 0): (add_to_three, 20)}}
            # layers = {f"add_to_thre11e-{(collection.name)}": (add_to_three, 20)}
            # dependencies = {f"add_to_three-{(collection.name)}": set()}
            # graph = HighLevelGraph(layers, dependencies)
            # collection.dask = graph
            print("New Dask Graph")
            print(collection.dask)

    print("-----------------")

    return original_collections_to_dsk(collections, *args, **kwargs)

In [22]:
client.collections_to_dsk = modified_collections_to_dsk

In [23]:
def add_to_three(number):
    return number + 3

In [24]:
x = da.ones((1000, 1000), chunks=(100, 100))
y = x + 1
z = y.mean()
result = client.compute(z)

Inside myFunc. Time = 19:43:16
Functions already processed
mean_agg-aggregate-c50263a0ad47c330577b3058195ad928
Dask Graph Tasks
Task Name: mean_agg-aggregate-c50263a0ad47c330577b3058195ad928
Dask Graph
HighLevelGraph with 7 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f9608d4ba90>
 0. ones_like-c53a97836143a00162e3470fef213e1e
 1. add-098526cf93d1ac0d3a8744f59d0814fb
 2. mean_chunk-4e5168d025885dfbdfd76df3f8083cda
 3. mean_combine-partial-0832826c732652f3243668d740a0d5f3
 4. mean_combine-partial-f33215f12dcb169d48ded3405ae5b4a2
 5. mean_combine-partial-f380b3499d120118c907e62d760350cc
 6. mean_agg-aggregate-c50263a0ad47c330577b3058195ad928

mean_agg-aggregate-c50263a0ad47c330577b3058195ad928 has already been processed
New Dask Graph
HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f95fa07fa00>
 0. add_to_three-mean_agg-aggregate-c50263a0ad47c330577b3058195ad928

-----------------


In [25]:
result

In [26]:
result.result()

'mean_agg-aggregate-c50263a0ad47c330577b3058195ad928'

### Example 4.3: Try with blockwise

In [5]:
from dask.distributed import Client
import dask
import dask.array as da
import numpy as np
from dask.highlevelgraph import HighLevelGraph
from datetime import datetime
from dask.array.core import Array

client = Client()

# Define some simple functions to use in the graph
def add(x, y):
    return x + y


def multiply(x, y):
    return x * y


# Create a Dask array from the HighLevelGraph
# Note: We need to specify the shape and chunks of the resulting array
dask_array = da.blockwise(
    lambda x, y: multiply(add(x, y), y),
    "i",
    np.array([1, 2, 3]),
    "i",
    np.array([4, 5, 6]),
    "i",
    dtype=np.int64,
)

# Use client.compute to compute the result
future = client.compute(dask_array)
result = future.result()

client.shutdown()

print("------------")
print(result)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 16985 instead


------------
[20 35 54]


In [6]:
from dask.distributed import Client
import dask
import dask.array as da
import numpy as np
from dask.highlevelgraph import HighLevelGraph
from datetime import datetime
from dask.array.core import Array


def modified_collections_to_dsk(collections, *args, **kwargs):
    c = datetime.now()
    print("Inside myFunc. Time = " + c.strftime("%H:%M:%S"))

    print("Functions already processed")

    print("Dask Graph Tasks")
    for collection in collections:
        print(f"Task Name: {collection.name}")
        print("Dask Graph")
        print(collection.dask)
        print("-----------------")
        print("Edditing Dask Graph")
        dask_array = da.blockwise(
            lambda x, y: multiply(add(x, y), y),
            "i",
            np.array([5, 6, 7]),
            "i",
            np.array([8, 9, 10]),
            "i",
            dtype=np.int64,
        )
        collection.dask = dask_array.dask
        collection.__dask_keys__ = dask_array.__dask_keys__

        print("New Dask Graph")
        print(collection.dask)

    print("-----------------")

    return original_collections_to_dsk(collections, *args, **kwargs)


client = Client()

original_collections_to_dsk = client.collections_to_dsk

client.collections_to_dsk = modified_collections_to_dsk

# Define some simple functions to use in the graph
def add(x, y):
    return x + y


def multiply(x, y):
    return x * y


# Create a Dask array from the HighLevelGraph
# Note: We need to specify the shape and chunks of the resulting array
dask_array = da.blockwise(
    lambda x, y: multiply(add(x, y), y),
    "i",
    np.array([1, 2, 3]),
    "i",
    np.array([4, 5, 6]),
    "i",
    dtype=np.int64,
)

# Use client.compute to compute the result
future = client.compute(dask_array)
result = future.result()

client.shutdown()

print("------------")
print(result)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 25436 instead


Inside myFunc. Time = 22:41:44
Functions already processed
Dask Graph Tasks
Task Name: lambda-bd96b5ee7bd8b34908deaa71441f1d4a
Dask Graph
HighLevelGraph with 3 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7fae2833ff10>
 0. array-93899268a02d75f85755ccb59b755ad1
 1. array-08b8023b2db8bebf73813d166e3f6b63
 2. lambda-bd96b5ee7bd8b34908deaa71441f1d4a

-----------------
Edditing Dask Graph
New Dask Graph
HighLevelGraph with 3 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7fae481a7190>
 0. array-71a7394e5f14de7dc72bb0b1b251ce84
 1. array-e3622cdb87acdb3eed57eec6c2ef9fb7
 2. lambda-19e9145bca12a9b712321a40e73e64bd

-----------------




------------
[104 135 170]
