In [None]:
#Libraries used
import dask
import dask.array as da
from dask.distributed import Client
from dask_ml.preprocessing import StandardScaler
from dask_ml.cluster import KMeans
import numpy as np
import math
import time
import pandas as pd
from dask_ml.metrics import pairwise_distances
from dask_ml.metrics import pairwise_distances_argmin_min
from time import time
from timeit import default_timer as timer
from dask_ml.datasets import make_blobs
import pandas as pd
from dask.distributed import SSHCluster
import matplotlib.pyplot as plt
import pickle
import numba
from dask.distributed import SSHCluster, Client
from dask_ml.decomposition import PCA
from kmeans_parallel_dask import kmeans_parallel #our library for implementing kmeans|| in Dask
from matplotlib import colormaps
from matplotlib.colors import ListedColormap
from sklearn.cluster import KMeans as SklearnKMeans
from dask_ml.preprocessing import StandardScaler
import scipy.sparse as sp

In [None]:
import numpy as np
import dask.array as da
from sklearn.cluster import KMeans as SklearnKMeans  # <-- We used this only for weighted recluster


try:
    import scipy.sparse as sp
    _HAVE_SP = True
except Exception:
    sp = None
    _HAVE_SP = False

def _ensure_dask_array(X):
    """If X is SciPy sparse or NumPy, wrap it into dask.array; otherwise, leave X as is."""
    if isinstance(X, da.Array):
        return X
    if _HAVE_SP and sp is not None and sp.issparse(X):
        # Keep sparse object in blocks; chunk by rows
        return da.from_array(X, asarray=False, chunks=(min(10000, X.shape[0]), X.shape[1]))
    # Fallback for dense
    return da.from_array(X, chunks="auto")

def _row_norms_squared_block(block):
    """Row-wise squared norm on dense block or SciPy sparse -> np.ndarray 1D."""
    if _HAVE_SP and sp is not None and sp.issparse(block):
        return np.asarray(block.multiply(block).sum(axis=1)).ravel()
    else:
        return np.einsum("ij,ij->i", block, block)

def _block_pairwise_sq_dists(block, C, C_norm2):
    """
    Pairwise squared distances between rows of the block (m x d) and centroids C (k x d).
    Returns dense array (m x k).
    """
    if _HAVE_SP and sp is not None and sp.issparse(block):
        XC = (-2.0) * block.dot(C.T)       # (m,k)
    else:
        XC = (-2.0) * (block @ C.T)        # (m,k)

    rn2 = _row_norms_squared_block(block)  # (m,)
    XC += rn2[:, None]                     # Add ||x||^2
    XC += C_norm2[None, :]                 # Add ||c||^2
    return XC

def _pairwise_sq_dists(X, C):
    """
    Pairwise squared distances between X (dask.array or compatible) and C (dense np.ndarray).
    Returns dask.array dense (n_samples, k).
    """
    X = _ensure_dask_array(X)
    C = np.asarray(C)
    C_norm2 = np.einsum("ij,ij->i", C, C)  # (k,)
    return X.map_blocks(
        _block_pairwise_sq_dists,
        C, C_norm2,
        dtype=float,
        chunks=(X.chunks[0], (C.shape[0],))
    )

def _to_dense_row(x):
    """Converts a row vector (sparse or dense) into a dense np.ndarray 1D."""
    if _HAVE_SP and sp is not None and sp.issparse(x):
        return np.asarray(x.toarray()).ravel()
    # If it comes as np.matrix or shape (1, d), flatten it
    return np.asarray(x).ravel()



def phi(X,C):
    """
    Cost function for a given set of centroids
    
    Input:
    - dataset X
    - set of centroids C
    Output:
    - Cost function
    """
    # Replaces pairwise_distances(...).min(1).sum() with sparse-compatible version
    return _pairwise_sq_dists(X, C).min(1).sum()

def assign_label(X,C):
    """
    Assign labels to each point of X corresponding to
    its closest centroid
    
    Input:
    - dataset X
    - set of centroids C
    Output:
    - Set of labels
    """
    # Replaces pairwise_distances_argmin_min(...) with argmin on squared distances
    return da.argmin(_pairwise_sq_dists(X, C), axis=1)

def update_centroids(X,lab,k):
    """
    Function to update the position of the centroids
    
    Inputs:
    - dataset X
    - labels of the clusters lab
    - number of clusters k
    Output
    - New centroids C
    """
    # Robust reduce-based version for sparse: no boolean indexing on X.
    import dask.array as da
    from dask import delayed, compute

    # 🔧 Ensure X is a dask.array (also handles SciPy sparse)
    X = _ensure_dask_array(X)

    # Align the label chunks to the rows of X
    lab = da.asarray(lab).rechunk((X.chunks[0],))

    # Delayed blocks aligned for rows
    x_blocks = X.to_delayed().ravel()
    l_blocks = lab.to_delayed().ravel()

    @delayed
    def _reduce_block(bx, bl):
        bl = np.asarray(bl)
        d = bx.shape[1]

        # 🔧 COO/CSC -> CSR to support bx[m] (boolean row indexing)
        if _HAVE_SP and sp is not None and sp.issparse(bx) and not sp.isspmatrix_csr(bx):
            bx = bx.tocsr()

        sums = np.zeros((k, d), dtype=float)
        counts = np.zeros((k,), dtype=np.int64)
        for i in range(k):
            m = (bl == i)
            if m.any():
                Xi = bx[m]  # Local to the block: works even if it's SciPy sparse CSR
                s = np.asarray(Xi.sum(axis=0)).ravel()  # Works for dense and sparse
                sums[i] += s
                counts[i] += int(m.sum())
        return sums, counts

    parts = [_reduce_block(bx, bl) for bx, bl in zip(x_blocks, l_blocks)]
    results = compute(*parts)

    total_sums = sum(s for s, _ in results)
    total_counts = sum(c for _, c in results)

    # Means; empty clusters will remain NaN (handled in kmeans_parallel)
    C_new = np.divide(
        total_sums,
        total_counts[:, None],
        out=np.full_like(total_sums, np.nan, dtype=float),
        where=total_counts[:, None] != 0,
    )
    # Return as dask.array (compatible with the rest of your code)
    return da.from_array(C_new, chunks=(k, X.shape[1]))


def kmeans__init(X: da.Array, k: int, l: int, random_state=42):
    """
    k-means|| initialization (Dask)

    Parameters
    ----------
    X : dask.array, shape (n_samples, n_features)
    k : int
    l : int, oversampling factor

    Returns
    -------
    C : np.ndarray, shape (k, n_features)
        Initial centroids
    """
    rng = np.random.default_rng(random_state)

    X = _ensure_dask_array(X)  # (Minimal addition to support SciPy sparse inputs)

    n = X.shape[0]
    # 1) Pick first center
    i0 = rng.integers(n)
    c0 = X[i0].compute()
    C = _to_dense_row(c0)[None, :]

    # 2) Initial cost
    psi = phi(X, C).compute()
    if psi <= 0:
        return C if len(C) == k else np.repeat(C, k, axis=0)[:k]
    
    # RandomState for reproducibility
    rs = da.random.RandomState(random_state)
    
    # 3) O(log ψ) rounds
    t = max(1, int(np.ceil(np.log(psi))))
    for _ in range(t):
        d2min = _pairwise_sq_dists(X, C).min(1)
        p = da.clip(l * d2min / psi, 0.0, 1.0)
        # Bernoulli sampling with RandomState
        samples = rs.random_sample(size=p.shape, chunks=p.chunks) < p
        # Indices of True
        mask = da.where(samples)[0].compute()
        mask = sorted(mask)   #https://github.com/dask/dask-ml/issues/39
        
        if np.any(mask):
            new_pts = X[mask].compute()
            if _HAVE_SP and sp is not None and sp.issparse(new_pts):
                new_pts = new_pts.toarray()
            C = np.vstack([C, np.asarray(new_pts)])
        psi = phi(X, C).compute()
        if psi <= 0:
            break

    # Weights for k-means|| step 7 (from the article)
    dist2 = _pairwise_sq_dists(X, C)   # Dask-friendly even for sparse
    labels = da.argmin(dist2, axis=1)
    weights = da.bincount(labels, minlength=len(C)).compute().astype(float)

    # 8) Recluster down to k  (Always use scikit-learn here)
    k = min(k, C.shape[0])
    km = SklearnKMeans(n_clusters=k, n_init=10, random_state=random_state)
    try:
        km.fit(C, sample_weight=weights)   # Recent scikit-learn
    except TypeError:
        km.fit(C)                          # Fallback for older versions
    return km.cluster_centers_


def kmeans_parallel(X, k, max_it=100, tol=1e-8, l=2):
    """
    Core function of our implementation: it implements kmeans with dask and kmeans|| initialization
    """
    C = kmeans__init(X, k, l) # Initial centroids with k_means||
    for i in range(max_it): # Iterative phase
        lab = assign_label(X, C)
        C_new = update_centroids(X, lab, k).compute()

        # Handle empty clusters: replace NaN rows with the previous center
        if np.isnan(C_new).any():
            nan_rows = np.any(np.isnan(C_new), axis=1)
            C_new[nan_rows] = C[nan_rows]

        if da.allclose(C, C_new, atol=tol):
            #print(f"Main KMeans Converged after {i+1} iterations.")
            break

        C = C_new

    return lab, C

In [None]:
# Dask configuration
dask.config.set({
    'distributed.comm.timeouts.connect': '60s',
    'distributed.comm.timeouts.tcp': '60s',
    'distributed.deploy.lost-worker-timeout': '30s',
})

def setup_cluster():
    """Setup del cluster SSH con tutte e 4 le VM"""
    VM_IPS = ['10.67.22.253', '10.67.22.253', '10.67.22.93', '10.67.22.68', '10.67.22.80']
    
    print(f"Setting up SSH cluster with {len(VM_IPS) -1 } VMs...")
    
    cluster = SSHCluster(
        hosts=VM_IPS,
        connect_options={
            "username": "ubuntu",
            "client_keys": ["/home/ubuntu/MAPDB-13.pem"],
            "known_hosts": None,
            "connect_timeout": 30,
        },
        scheduler_options={
            "port": 8786,
            "dashboard_address": ":8787",
        },
        worker_options={
            "n_workers":1,
            "nthreads": 4,
            "memory_limit": "7.8GB",
        },
    )
    
    client = Client(cluster)
    print("Waiting for workers...")
    client.wait_for_workers(n_workers=4, timeout=30)
    
    print(f"✅ Cluster ready with {len(client.scheduler_info()['workers'])} workers")
    print(f"Dashboard: {client.dashboard_link}")
    
    return client, cluster

client, cluster=setup_cluster()

Setting up SSH cluster with 4 VMs...


2025-09-14 14:59:57,433 - distributed.deploy.ssh - INFO - 2025-09-14 14:59:57,432 - distributed.scheduler - INFO - State start
2025-09-14 14:59:57,435 - distributed.deploy.ssh - INFO - 2025-09-14 14:59:57,434 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space/scheduler-mszt6tvw', purging
2025-09-14 14:59:57,445 - distributed.deploy.ssh - INFO - 2025-09-14 14:59:57,444 - distributed.scheduler - INFO -   Scheduler at:   tcp://10.67.22.253:8786
2025-09-14 14:59:58,223 - distributed.deploy.ssh - INFO - 2025-09-14 14:59:58,223 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.67.22.93:34409'
2025-09-14 14:59:58,224 - distributed.deploy.ssh - INFO - 2025-09-14 14:59:58,224 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.67.22.80:40685'
2025-09-14 14:59:58,230 - distributed.deploy.ssh - INFO - 2025-09-14 14:59:58,230 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.67.22.68:42983'
2025-09-14 14:59:58,245 - di

Waiting for workers...
✅ Cluster ready with 4 workers
Dashboard: http://10.67.22.253:8787/status


In [4]:
# Connect the Dask client to the existing cluster using the scheduler's address
#client = Client("tcp://localhost:8786")
client = Client(cluster)

# Print the cluster configuration
scheduler_info = client.scheduler_info()
workers_info = scheduler_info['workers'] 
# Get the information for the first worker in the cluster
worker = list(workers_info.values())[0]
n_workers = len(workers_info)
n_threads = worker['nthreads']

print(f'client dashboard link: {client.dashboard_link}')
print(f"Cluster setup with {n_workers} workers, each with {n_threads} threads")
client

client dashboard link: http://10.67.22.253:8787/status
Cluster setup with 4 workers, each with 4 threads


0,1
Connection method: Cluster object,Cluster type: distributed.SpecCluster
Dashboard: http://10.67.22.253:8787/status,

0,1
Dashboard: http://10.67.22.253:8787/status,Workers: 4
Total threads: 16,Total memory: 29.06 GiB

0,1
Comm: tcp://10.67.22.253:8786,Workers: 4
Dashboard: http://10.67.22.253:8787/status,Total threads: 16
Started: Just now,Total memory: 29.06 GiB

0,1
Comm: tcp://10.67.22.253:42207,Total threads: 4
Dashboard: http://10.67.22.253:46359/status,Memory: 7.26 GiB
Nanny: tcp://10.67.22.253:42323,
Local directory: /tmp/dask-scratch-space/worker-6khp8_a0,Local directory: /tmp/dask-scratch-space/worker-6khp8_a0
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.81 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.67.22.68:36105,Total threads: 4
Dashboard: http://10.67.22.68:42831/status,Memory: 7.26 GiB
Nanny: tcp://10.67.22.68:42983,
Local directory: /tmp/dask-scratch-space/worker-298wr67j,Local directory: /tmp/dask-scratch-space/worker-298wr67j
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.48 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.67.22.80:41311,Total threads: 4
Dashboard: http://10.67.22.80:43765/status,Memory: 7.26 GiB
Nanny: tcp://10.67.22.80:40685,
Local directory: /tmp/dask-scratch-space/worker-96hfcca_,Local directory: /tmp/dask-scratch-space/worker-96hfcca_
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.37 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.67.22.93:43405,Total threads: 4
Dashboard: http://10.67.22.93:34675/status,Memory: 7.26 GiB
Nanny: tcp://10.67.22.93:34409,
Local directory: /tmp/dask-scratch-space/worker-_3lbf9jq,Local directory: /tmp/dask-scratch-space/worker-_3lbf9jq
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.84 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B


## Test on rcv1 dataset

In [None]:
try:
    import scipy.sparse as sp
    _HAVE_SP = True
except Exception:
    sp = None
    _HAVE_SP = False

def _l2_normalize_block(block, eps=1e-12):
    """
    Normalizza L2 per riga un blocco (denso o SciPy sparse).
    - Per sparse: usa diags(1/||x||) @ X per mantenere la sparsità.
    - Per denso: divide riga per riga.
    """
    if _HAVE_SP and sp is not None and sp.issparse(block):
        # ||x||_2 for each row
        norms = np.sqrt(block.multiply(block).sum(axis=1)).A1
        inv = 1.0 / np.maximum(norms, eps)
        D = sp.diags(inv)
        return D.dot(block)  # keeps CSR/CSC
    else:
        # dense
        block = np.asarray(block)
        norms = np.linalg.norm(block, axis=1)
        inv = 1.0 / np.maximum(norms, eps)
        return (block.T * inv).T

def l2_normalize_dask(X, eps=1e-12):
    """
    Applica _l2_normalize_block a tutti i blocchi di X (mantiene i chunk).
    """
    X = X  # it's already a Dask array
    return X.map_blocks(_l2_normalize_block, dtype=float)

In [None]:
import s3fs
import logging

logger = logging.getLogger(__name__)

def _is_sparse_npz(fs, path):
    """Returns True if `path` is an npz file saved with scipy.sparse.save_npz."""
    try:
        # If load_npz can load it, it's a valid sparse npz
        with fs.open(path, mode="rb") as f:
            _ = sp.load_npz(f)
        return True
    except Exception:
        return False

def _load_feature_shard(fs, path):
    import numpy as np
    import scipy.sparse as sp

    if path.endswith(".npz"):
        # Try as sparse-npz (save_npz)
        try:
            with fs.open(path, "rb") as f:
                mat = sp.load_npz(f)
            # Accept only 2D
            if mat.ndim == 2:
                return mat
        except Exception:
            pass
        # Fallback: generic npz with dense arrays
        with fs.open(path, "rb") as f:
            data = np.load(f, allow_pickle=True)
            # Try common keys
            for key in ("X", "features", "data"):
                if key in data and data[key].ndim == 2:
                    return data[key]
            # Otherwise, take the first 2D array
            for v in data.values():
                if isinstance(v, np.ndarray) and v.ndim == 2:
                    return v
        return None

    elif path.endswith(".npy"):
        with fs.open(path, "rb") as f:
            arr = np.load(f, allow_pickle=True)
        # Discard 1D (labels)
        return arr if getattr(arr, "ndim", 1) == 2 else None

    else:
        raise ValueError(f"Unsupported shard format: {path}")


def load_dataset_from_cloudveneto(dataset_name, fs=None, bucket=None, row_chunk=20000):
    """
    Returns a dask.array (row-sharded) maintaining sparse blocks when possible.
    """
    if fs is None:
        fs = create_s3_filesystem()
    if bucket is None:
        bucket = CLOUDVENETO_CONFIG["bucket"]

    prefix = f"{bucket}/datasets/{dataset_name}"
    files = fs.ls(prefix)

    shard_files = sorted([
        f for f in files
        if "shard_" in f and f.endswith((".npy", ".npz")) and "labels" not in f.lower() and "y" not in f.lower()
    ])

    if not shard_files:
        raise FileNotFoundError(f"No feature shards found for dataset {dataset_name}")

    parts = []
    n_rows_total = 0
    n_cols = None

    for path in shard_files:
        shard = _load_feature_shard(fs, path)
        if shard is None:
            continue

        # Check for consistent dimensions
        if n_cols is None:
            n_cols = shard.shape[1]
        elif shard.shape[1] != n_cols:
            raise ValueError(f"Shard {path} has {shard.shape[1]} cols, expected {n_cols}")

        # Wrap in dask preserving sparsity
        if sp.issparse(shard):
            chunks = (min(row_chunk, shard.shape[0]), shard.shape[1])
            Xp = da.from_array(shard, asarray=False, chunks=chunks)
        else:
            Xp = da.from_array(shard, chunks="auto")

        parts.append(Xp)
        n_rows_total += shard.shape[0]

    if not parts:
        raise ValueError("No valid 2D feature shards were loaded.")

    X = da.concatenate(parts, axis=0)
    logger.info(f"Loaded dataset '{dataset_name}' with shape={X.shape}, sparse_blocks={any(sp.issparse(p._meta) if hasattr(p,'_meta') else False for p in parts)}")
    return X

def run_distributed_kmeans(
    dataset_name: str,
    k: int = 10,
    l: int = 2,
    *,
    fs=None,
    bucket: str | None = None,
    row_chunk: int = 20000,
    persist_data: bool = True,
    normalize: str = "l2",   # "l2" (custom) or None
    verbose: bool = True
):
    """
    Loads dataset (including sparse) from CloudVeneto, applies L2 normalization per row in a sparse-safe way,
    then runs k-means|| with Dask.
    """
   
    # 1) Filesystem and loading (uses your fixed loader)
    if fs is None:
        fs = create_s3_filesystem()
    X = load_dataset_from_cloudveneto(
        dataset_name, fs=fs, bucket=bucket, row_chunk=row_chunk
    )

    if verbose:
        try:
            is_sparse_meta = hasattr(X, "_meta") and sp.issparse(X._meta)
        except Exception:
            is_sparse_meta = False
        print(f"[load] X.shape={X.shape}, chunks={X.chunks}, sparse_blocks={is_sparse_meta}")

    # 2) L2 Normalization (custom, sparse-safe)
    if normalize == "l2":
        Xn = l2_normalize_dask(X)
        if persist_data:
            Xn = Xn.persist()
        if verbose:
            print("[norm] L2 normalization applied (custom, sparse-safe).")
    elif normalize is None:
        Xn = X.persist() if persist_data else X
        if verbose:
            print("[norm] Skipped (using raw X).")
    else:
        raise ValueError("normalize must be 'l2' or None.")

    # 3) k-means|| (your already sparse-friendly implementation)
    labels, centroids = kmeans_parallel(Xn, k=k, l=l)

    if verbose:
        print(f"[kmeans] centroids.shape={np.asarray(centroids).shape}")

    return labels, centroids

In [17]:
#test
run_distributed_kmeans("rcv1")

2025-09-14 15:12:13,654 - INFO - •Tentativo con configurazione semplificata...
2025-09-14 15:12:13,656 - INFO - ✅ S3 FileSystem creato (configurazione semplificata)
2025-09-14 15:12:14,239 - INFO - Loaded dataset 'rcv1' with shape=(23149, 47236), sparse_blocks=True
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


[load] X.shape=(23149, 47236), chunks=((3858, 3858, 3858, 3858, 3858, 3859), (47236,)), sparse_blocks=True
[norm] L2 normalization applied (custom, sparse-safe).
[kmeans] centroids.shape=(10, 47236)


(dask.array<arg_agg-aggregate, shape=(23149,), dtype=int64, chunksize=(3859,), chunktype=numpy.ndarray>,
 array([[0.00000000e+00, 0.00000000e+00, 0.00000000e+00, ...,
         0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, ...,
         0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, ...,
         0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        ...,
        [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, ...,
         0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        [2.15006515e-05, 6.44773513e-05, 2.70198649e-05, ...,
         1.91493858e-05, 3.42949202e-05, 3.06258917e-05],
        [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, ...,
         0.00000000e+00, 0.00000000e+00, 0.00000000e+00]],
       shape=(10, 47236)))

In [None]:
#here we will close the cluster
# Shut down the Dask cluster, terminating all associated workers and releasing resources.
cluster.close()

# Close the connection between the Dask client and the cluster to ensure no further communication.
client.close()

2025-09-14 15:15:45,137 - INFO - [conn=1, chan=1] Sending KILL signal
2025-09-14 15:15:45,139 - INFO - [conn=1] Closing connection
2025-09-14 15:15:45,140 - INFO - [conn=1, chan=1] Closing channel
2025-09-14 15:15:45,141 - INFO - [conn=1] Sending disconnect: Disconnected by application (11)
2025-09-14 15:15:45,142 - INFO - [conn=4, chan=1] Sending KILL signal
2025-09-14 15:15:45,144 - INFO - [conn=4] Closing connection
2025-09-14 15:15:45,147 - INFO - [conn=4, chan=1] Closing channel
2025-09-14 15:15:45,149 - INFO - [conn=4] Sending disconnect: Disconnected by application (11)
2025-09-14 15:15:45,149 - INFO - [conn=2, chan=1] Sending KILL signal
2025-09-14 15:15:45,150 - INFO - [conn=2] Closing connection
2025-09-14 15:15:45,151 - INFO - [conn=2, chan=1] Closing channel
2025-09-14 15:15:45,152 - INFO - [conn=2] Sending disconnect: Disconnected by application (11)
2025-09-14 15:15:45,153 - INFO - [conn=3, chan=1] Sending KILL signal
2025-09-14 15:15:45,153 - INFO - [conn=3] Closing conn