In [None]:
from dask_gateway import GatewayCluster
from dask.distributed import Client
import dask.array as da

cluster = GatewayCluster(
    address = "http://traefik-dask-gateway.dask-gateway.svc.cluster.local",
    public_address = "http://dask.k8s.local",
    auth = "jupyterhub"
)


In [None]:
cluster.adapt(minimum = 1, maximum = 2)
cluster

In [None]:
client = Client(cluster)
client

In [None]:
a = da.random.normal(size=(10000, 10000), chunks=(500, 500))
a.mean().compute()

---
#### Computation Graph

In [None]:
from dask import delayed


@delayed
def inc(x: int) -> int:
    return x + 1


@delayed
def double(x: int) -> int:
    return 2 * x


@delayed
def add(x: int, y: int) -> int:
    return x + y


with Client(cluster):
    data = [1, 2, 3, 4, 5]
    output = []
    for x in data:
        a = inc(x)
        b = double(x)
        c = add(a, b)
        output.append(c)

    total = delayed(sum)(output)

total.visualize(rankdir = "UD")

---
### Read data from S3

In [None]:
import dask.dataframe as dd
import dask.bag as db
import pandas as pd
import json
import os

BUCKET = "minikube-jupyterhub-data"
storage_options = {
    "key": os.environ["AWS_ACCESS_KEY_ID"],
    "secret": os.environ["AWS_SECRET_ACCESS_KEY"],
    "client_kwargs": {"region_name": "us-east-1"}
}

In [None]:
cwd = client.run(os.getcwd)
print(cwd)

---
#### CSV

In [None]:
with Client(cluster):
    df = dd.read_csv(
        f"s3://{BUCKET}/*.csv",
        storage_options = storage_options,
        blocksize = "64MiB"
    )
    
    rows = len(df)
    
f"{rows=:_}"

---
#### Parquet

In [None]:
with Client(cluster):
    df = dd.read_parquet(
        f"s3://{BUCKET}/*.parquet",
        storage_options = storage_options,
        blocksize = "128MiB",
        split_row_groups = "adaptive"
    )

    result = df.groupby("VendorID")["passenger_count"].mean().compute()

result

---
#### NDJSON

In [None]:
with Client(cluster):
    df = dd.read_json(f"s3://{BUCKET}/*.ndjson.gz", storage_options = storage_options)

df.head()

In [None]:
with Client(cluster):
    dbag = db.read_text(
        f"s3://{BUCKET}/*.ndjson.gz",
        storage_options = storage_options
    ).map(json.loads)

with Client(cluster):
    namespace = dbag.map(lambda d: d.get("metadata", {}).get("namespace")).compute()

namespace[:5]

In [None]:
with Client(cluster):
    filtered = dbag.filter(lambda d: d.get("metadata", {}) != {})

    filtered.map(json.dumps).to_textfiles(
        f"s3://{BUCKET}/dask/outputs/manifests.ndjson.gz",
        storage_options = storage_options
    )

filtered

In [None]:
with Client(cluster):
    df = dbag.to_dataframe()

df.loc[:3].compute()

In [None]:
with Client(cluster):
    print(dbag.count().compute())

---
#### Close All Clusters

In [None]:
from dask_gateway import Gateway

cluster.close()

gateway = Gateway("http://traefik-dask-gateway.dask-gateway.svc.cluster.local", auth = "jupyterhub")

for c in gateway.list_clusters():
    gateway.connect(c.name, shutdown_on_close = True)

gateway.list_clusters()