In [1]:
import numpy as np
import pandas as pd
import awkward as ak
import pyarrow as pa
import numba
import tpx3awkward as tpx

## Loading data in and clustering with old way

In [2]:
df = pd.read_parquet("/nsls2/data/chx/shared/tpx/shared_data/example_raw_events.parquet")
df = tpx.drop_zero_tot(df)
np_labels, np_events = tpx.cluster_df_optimized(df)
df['cluster_id'] = np_labels
print(len(df))
print(max(np_labels))
cluster_array = tpx.group_indices(np_labels)
data = tpx.centroid_clusters(cluster_array, np_events)
cdf = pd.DataFrame(tpx.ingest_cent_data(data)).sort_values("t").reset_index(drop=True)
cdf
print(len(cdf))

25502702
20144304
20144305


## Clustering data with awkward

In [3]:
@numba.jit(nopython=True)
def cluster_ak(events, radius = tpx.DEFAULT_CLUSTER_RADIUS, tw = tpx.DEFAULT_CLUSTER_TW):
    n = len(events)
    labels = np.full(n, -1, dtype=np.int64)
    cluster_id = 0

    max_time = radius * tw  # maximum time difference allowed for clustering
    radius_sq = radius ** 2

    for i in range(n):
        if labels[i] == -1:  # if event is unclustered
            labels[i] = cluster_id
            for j in range(i + 1, n):  # scan forward only
                if events[j].t - events[i].t > max_time:  # early exit based on time
                    break
                # Compute squared Euclidean distance
                dx = events[i].x - events[j].x
                dy = events[i].y - events[j].y
                dt = (events[i].t // tw) - (events[j].t // tw)
                distance_sq = dx**2 + dy**2 + dt**2

                if distance_sq <= radius_sq:
                    labels[j] = cluster_id
            cluster_id += 1

    return labels

events = ak.from_arrow(pa.Table.from_pandas(df))
print(f"OK, there are {len(events)} raw events....")
labels = cluster_ak(events)
print(f"And we find {max(labels)+1} clusters.")
print(f"Just to be pedantic, we check that length of cluster IDs match num raw events ({len(labels)} - it does)")
print(f"and all events are assigned to clusters ({np.sum(labels == -1)} - as it should be)")
ak_clusters = ak.unflatten(events, ak.run_lengths(labels))
print(f"So: shouldn't the length of clusters be the the max value (+1 because we start at 0)? But it's not: {len(ak_clusters)}")

# FIX: sorting the indices!
sorted_indices = np.argsort(labels)
ak_clusters = ak.unflatten(events[sorted_indices], ak.run_lengths(labels[sorted_indices]))
print(len(ak_clusters))

OK, there are 25502702 raw events....
And we find 20144305 clusters.
Just to be pedantic, we check that length of cluster IDs match num raw events (25502702 - it does)
and all events are assigned to clusters (0 - as it should be)
So: shouldn't the length of clusters be the the max value (+1 because we start at 0)? But it's not: 21568788
20144305


### Numba/Numpy Centroiding

In [4]:
def centroid_numpy(clusters, events):
    cluster_array = tpx.group_indices(clusters)
    return tpx.centroid_clusters(cluster_array, events)
    
centroid_numpy(np_labels, np_events)

(array([  8903922,   8903922,   8903924, ..., 642368832, 642371337,
        642372471], dtype=uint64),
 array([258.,  67.,  34., ..., 489., 262., 353.], dtype=float32),
 array([ 52.,  34., 153., ..., 303., 380., 261.], dtype=float32),
 array([275, 325, 100, ..., 325, 350, 325], dtype=uint32),
 array([275, 325, 100, ..., 325, 350, 325], dtype=uint32),
 array([1, 1, 1, ..., 1, 1, 1], dtype=uint8))

### Pandas Groupby Centroiding

In [5]:
sub_df = df[["x", "y", "t", "ToT", "cluster_id"]]

def centroid_pandas_groupby(df):
    df['x_weighted'] = df['x'] * df['ToT']
    df['y_weighted'] = df['y'] * df['ToT']
    
    grouped = df.groupby('cluster_id', sort=False)
    
    ToT_sum = grouped['ToT'].sum()
    ToT_max = grouped['ToT'].max()
    n = grouped.size()
    xc = grouped['x_weighted'].sum() / ToT_sum
    yc = grouped['y_weighted'].sum() / ToT_sum
    
    idxmax = grouped['ToT'].idxmax()
    t = df.loc[idxmax.values, 't'].values
    cluster_ids = idxmax.index.values
    
    return pd.DataFrame({
        't': t,
        'xc': xc.values,
        'yc': yc.values,
        'ToT_max': ToT_max.values,
        'ToT_sum': ToT_sum.values,
        'n': n.values
    }, index=cluster_ids)

centroid_pandas_groupby(sub_df)

Unnamed: 0,t,xc,yc,ToT_max,ToT_sum,n
0,8903922,258.0,52.0,275,275,1
1,8903922,67.0,34.0,325,325,1
2,8903924,34.0,153.0,100,100,1
3,8903927,77.0,24.0,275,275,1
4,8903934,25.8,42.0,200,250,2
...,...,...,...,...,...,...
20144300,642368357,328.0,266.1,225,250,2
20144301,642368713,459.0,335.0,375,375,1
20144302,642368832,489.0,303.0,325,325,1
20144303,642371337,262.0,380.0,350,350,1


### Awkward Centroiding

In [6]:
def centroid_awkward_without_events(a):
    n = ak.num(a, axis=1)
    ToT_max = ak.max(a['ToT'], axis=1)
    ToT_sum = ak.sum(a['ToT'], axis=1)
    xc = ak.sum(a['x'] * a['ToT'], axis=1) / ToT_sum
    yc = ak.sum(a['y'] * a['ToT'], axis=1) / ToT_sum

    t = ak.flatten(a['t'][ak.argmax(a['ToT'], axis=1, keepdims=True)])

    return ak.zip({
        "t": t,
        "xc": xc,
        "yc": yc,
        "ToT_max": ToT_max,
        "ToT_sum": ToT_sum,
        "n": n,
    }, depth_limit = 1)


def centroid_awkward_with_events(a):
    n = ak.num(a, axis=1)
    ToT_max = ak.max(a['ToT'], axis=1)
    ToT_sum = ak.sum(a['ToT'], axis=1)
    xc = ak.sum(a['x'] * a['ToT'], axis=1) / ToT_sum
    yc = ak.sum(a['y'] * a['ToT'], axis=1) / ToT_sum
    t = ak.flatten(a['t'][ak.argmax(a['ToT'], axis=1, keepdims=True)])


    return ak.zip({
        "t": t,
        "xc": xc,
        "yc": yc,
        "ToT_max": ToT_max,
        "ToT_sum": ToT_sum,
        "n": n,
        "events": a
    }, depth_limit = 1)


centroid_awkward_with_events(ak_clusters)
#centroid_ak_wo_events = centroid_awkward_without_events(ak_clusters)

## Benchmark

In [None]:
import timeit
import statistics

def run_numpy():
    centroid_numpy(np_labels, np_events)

def run_pandas():
    centroid_pandas_groupby(sub_df)

def run_awkward_with_events():
    centroid_awkward_with_events(ak_clusters)

def run_awkward_without_events():
    centroid_awkward_without_events(ak_clusters)

# Benchmarking function
def benchmark(func, name, number=5):
    times = timeit.repeat(func, repeat=number, number=1)
    avg_time = statistics.mean(times)
    stddev = statistics.stdev(times)
    print(f"{name:30s} | Avg: {avg_time:.6f} s | Stddev: {stddev:.6f} s")

# Run benchmarks
benchmark(run_numpy, "NumPy + Numba")
benchmark(run_pandas, "Pandas groupby")
benchmark(run_awkward_without_events, "Awkward without events")
benchmark(run_awkward_with_events, "Awkward with events")

NumPy + Numba                  | Avg: 0.672638 s | Stddev: 0.008076 s
