In [1]:
from dask.distributed import Client
client = Client("localhost:8786")

In [2]:
client

0,1
Connection method: Direct,
Dashboard: http://localhost:8787/status,

0,1
Comm: tcp://10.67.22.157:8786,Workers: 3
Dashboard: http://10.67.22.157:8787/status,Total threads: 12
Started: 2 hours ago,Total memory: 23.28 GiB

0,1
Comm: tcp://10.67.22.157:46669,Total threads: 4
Dashboard: http://10.67.22.157:40683/status,Memory: 7.75 GiB
Nanny: tcp://10.67.22.157:37109,
Local directory: /tmp/dask-scratch-space/worker-d8y11cg4,Local directory: /tmp/dask-scratch-space/worker-d8y11cg4
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 396.25 MiB,Spilled bytes: 0 B
Read bytes: 39.64 kiB,Write bytes: 38.69 kiB

0,1
Comm: tcp://10.67.22.183:35143,Total threads: 4
Dashboard: http://10.67.22.183:45315/status,Memory: 7.76 GiB
Nanny: tcp://10.67.22.183:38875,
Local directory: /tmp/dask-scratch-space/worker-loth0veu,Local directory: /tmp/dask-scratch-space/worker-loth0veu
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 4.0%,Last seen: Just now
Memory usage: 129.28 MiB,Spilled bytes: 0 B
Read bytes: 286.16060028163315 B,Write bytes: 1.45 kiB

0,1
Comm: tcp://10.67.22.210:33687,Total threads: 4
Dashboard: http://10.67.22.210:38199/status,Memory: 7.76 GiB
Nanny: tcp://10.67.22.210:41341,
Local directory: /tmp/dask-scratch-space/worker-ar6mv8iq,Local directory: /tmp/dask-scratch-space/worker-ar6mv8iq
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 313.67 MiB,Spilled bytes: 0 B
Read bytes: 286.30026040099284 B,Write bytes: 1.45 kiB


### Load data

In [3]:
import json
import dask.bag as db
import dask.array as da
import dask.dataframe as dframe
import dask.bytes as dbytes
import dask.delayed
import numpy as np
import pandas as pd
import itertools
from scipy import stats
import time

In [4]:
start = time.time()

In [5]:
with open("credentials.json") as f:
    cred = json.load(f)

In [6]:
# Configure S3 endpoint URL and credentials
bucket_name = 'mapd-minidt-batch'
credentials = {"key":cred['access_key'], 
           "secret":cred['secret_key'], 
           "endpoint_url":'https://cloud-areapd.pd.infn.it:5210'}

urlpaths=[f's3://{bucket_name}/data_{i:06}.dat' for i in range(0, 81)]

sample, blocks = dbytes.read_bytes([url for url in urlpaths], **credentials)
bag_blocks = db.from_delayed([blocks[i][0] for i in range(len(blocks))])

In [7]:
@dask.delayed
def proc(bytes):
    data = np.frombuffer(bytes, dtype=np.ulonglong, count=len(bytes) // 8)
    tdcs = data & 0b11111
    bxs = (data >> 5) & 0b1111_11111111
    orbits = (data >> 17) & 0b11111111_11111111_11111111_11111111
    chans = (data >> 49) & 0b1_11111111
    fpgas = (data >> 58) & 0b111
    heads = data >> 61
    # return pd.DataFrame({'TDC': tdcs, "BX": bxs, "ORBIT":orbits, "CHAN": chans, "FPGA": fpgas, "HEAD": heads})
    mat = np.column_stack((tdcs, bxs, orbits, chans, fpgas, heads))
    mat = mat[(mat[:, 5] == 2)]

    return pd.DataFrame(data=mat, columns=["TDC", "BX", "ORBIT", "CHANNEL", "FPGA", "HEAD"], index=mat[:, 2])

data = dframe.from_delayed([proc(p[0]) for p in blocks])
# len(data)

In [8]:
def get_layers(cells):
    # Here we assign the correct LAYER only to the cell belonging to layer 0 and 3
    # because the modulo operator is good only for the layer 0 and 3, not the 1 and 2
    # because the 4*k cell is in the first row, the 4*k+3 cell is in the fourth row, but
    # the 4*k+1 is in the third and the 4*k+2 in the second, so these last two are flipped
    layers = cells % 4
    # Here we swap layer 1 with layer 2 (the incorrect ones)
    return np.where((layers == 1) | (layers == 2), layers % 2 + 1, layers)


# Questa funzione ha lo scopo di prendere il dataframe grezzo e produrne uno con soltanto gli eventi
# che ci interessano e aggiungengo tutta una serie di colonne utili
def manipulate_dataframe(df):    
    TIME_OFFSETS = np.array(
        [95.0 - 1.1,  # Ch 0
        95.0 + 6.4,  # Ch 1
        95.0 + 0.5,  # Ch 2
        95.0 - 2.6]  # Ch 3
    )

    SPACE_OFFSETS = np.array([219.8, 977.3, 1035.6, 1819.8])
    CELL_WIDTH = 42
    CELL_HEIGHT = 13


    # Numero minimo di layer diversi colpiti per camera
    MIN_UNIQUE_LAYERS_HIT = 2
    # Numero massimo di hits in una camera
    MAX_HITS_PER_CHAMBER = 12
    # Numero minimo di hits per camera (in questo caso uguale al numero minimo di layer colpiti)
    MIN_HITS_PER_CHAMBER = MIN_UNIQUE_LAYERS_HIT
    # Numero massimo di hits per layer (di una camera)
    MAX_HITS_PER_LAYER = 3
    # Numero minimo di 'buone camere' per considerare l'intero evento, la definizione di
    # buona camera è specificata più sotto
    MIN_GOOD_CHAMBERS = 2

    MIN_PR_HIT = 3

    # we eliminate all hits on chamber 1
    df = df[~((df.FPGA == 0) & (df.CHANNEL >= 64) & (df.CHANNEL <= 127))]

    # we keep only the first hit in the same cell (to do that we sort by 'time' and keep the first)
    df = df.sort_values(by=["ORBIT", "BX", "TDC"])
    df = df.drop_duplicates(["ORBIT", "CHANNEL", "FPGA"])  # by default it keeps the first

    # we create the column for the time (we don't include orbit because we don't want to compare
    # hits in different event, but only hits from the same events so adding the orbit it's just
    # like adding an offset equal for every hit
    df["TIME"] = 25 * df.BX + df.TDC * 25 / 30

    # We keep only the orbits with a t0 (todo: check if there is only one t0)
    mask_t0 = (df.FPGA == 1) & (df.CHANNEL == 128)
    orbits_with_t0 = df.loc[mask_t0, ('ORBIT', "TIME")]  # list of orbits which contain a t0 row
    df = df[df.ORBIT.isin(orbits_with_t0.ORBIT.unique())]  # we keep only the orbit in that list

    # we rename the TIME columns for the t0s to T0 and then merge with the original df on ORBIT
    # in this way we obtain a new column called T0 for every hit which contains the t0 of the event
    # associated with that orbit
    orbits_with_t0.rename(columns={'TIME': 'T0'}, inplace=True)
    df = pd.merge(df, orbits_with_t0, on='ORBIT', how='inner')

    # We remove the column with t0 and the infamous channel 138 (we don't know what is that)
    df = df[(df.CHANNEL != 128) & (df.CHANNEL != 138)]

    # Here we create some column
    df["CHAMBER"] = np.round(df.FPGA * 2 + df.CHANNEL // 64)
    # Cell is a number from 0-63 for every chamber (like shown in the pdf)
    df["CELL"] = (df.CHANNEL - (df.CHAMBER % 2) * 64)
    df["LAYER"] = get_layers(df.CELL)

    # Here we assign to every hit the position of the central cable of that cell (basically the center of the cell)
    df["CELL_X"] = (df.CELL // 4) * CELL_WIDTH + CELL_WIDTH * 0.5 + CELL_WIDTH * 0.5 * (df.LAYER % 2)
    df["CELL_Y"] = SPACE_OFFSETS[df.CHAMBER] + (4 - df.LAYER) * CELL_HEIGHT - CELL_HEIGHT * 0.5

    # time correction (there is a different t0 for each chamber)
    df["T0"] = df.T0 - TIME_OFFSETS[df.CHAMBER]
    # We calculate the distance using the drift time, and then convert it from um to mm
    df["REL_TIME"] = df.TIME - df.T0
    df["DISTANCE"] = df.REL_TIME * 53.8 / 1000

    hits_before = len(df)
    # We keep only the distances in the range [0, 21] because are the only distances possible (half cell width)
    df = df[(df.DISTANCE >= 0) & (df.DISTANCE <= 21)]
    # print(f"Sono stati rimossi {hits_before - len(df)} eventi perchè avevano distanze sbagliate")

    # In this function we check if there are at least 'MIN_GOOD_CHAMBERS' chambers
    # What is a good chamber? A good chamber is when
    # the chamber has at least 'MIN_HITS_PER_CHAMBER' hits in total
    # the chamber has at least 'MIN_UNIQUE_LAYERS_HIT' layers hit
    #
    # But if we find a chamber with more than 'MAX_HITS_PER_CHAMBER' hits, then we discard the whole event
    def filter_chambers(x):
        # x is dataframe only containing the hits from the same orbit
        good_ch = 0
        # we now subset that dataframe grouping by chamber
        for ch, df_ch in x.groupby("CHAMBER"):
            # if the numbers of hits in this chamber is greater than the maximum allowed we discard
            # the whole event (if we return False, pandas know to not consider the orbit in input to
            # this function)
            if len(df_ch) > MAX_HITS_PER_CHAMBER:
                return False

            # If in this chamber there is the minimum number of hits
            if len(df_ch) >= MIN_HITS_PER_CHAMBER:
                # and if there is the minimum number of layer hit
                if len(df_ch.LAYER.value_counts()) >= MIN_UNIQUE_LAYERS_HIT:
                    # and the number if hits in the same layer is allowed
                    # then this chamber is a good chamber
                    if np.all(df_ch.LAYER.value_counts() <= MAX_HITS_PER_LAYER):
                        # we increment the good chamber count
                        good_ch += 1

        # if this event (same orbit) has at least the minimum number of good chamber then we keep them
        # otherwise we return false and filter it out
        return good_ch >= MIN_GOOD_CHAMBERS

    # We groupby orbit and then each subgroup is sent to the function to check if it's good (return True)
    # or we have to filter it out (return False)
    df = df.groupby(["ORBIT"]).filter(filter_chambers)

    # We finally sort the hits
    df = df.sort_values(by=["ORBIT", "CHAMBER", "LAYER", "CELL"])
    # We add 2 more columns: we will need them in further computation
    # This step is highly suggested, to avoid futher problems with
    # dask meta and columns present in the dataframe
    df['HIT_X'] = np.nan
    df['HIT_X2'] = np.nan
    return df

In [9]:
print(f"The number of partitions is {data.npartitions}")

The number of partitions is 81


In [10]:
filtered_data = data.map_partitions(manipulate_dataframe, enforce_metadata=False)

# Find local tracks

In [11]:
def group_hits(df):
    n = len(df)
    matx = df.CELL_X.values.reshape(-1, 1) - df.CELL_X.values.reshape(1, -1)
    maty = df.CELL_Y.values.reshape(-1, 1) - df.CELL_Y.values.reshape(1, -1)
    # 45 approx sqrt(21^2 + (3*13)^2) = 44.29
    mat = np.sqrt(matx ** 2 + maty ** 2) < 45  # todo do not sqrt

    to_visit_global = set(range(n))
    groups = []
    while to_visit_global:
        i = to_visit_global.pop()
        group = set(np.where(mat[i, :])[0])
        to_visit_local = group - {i}
        while to_visit_local:
            j = to_visit_local.pop()
            j_nodes = np.where(mat[j, :])[0]
            group.update(j_nodes)

            if len(j_nodes) == n:
                to_visit_global.clear()
                break

            to_visit_global.remove(j)
            to_visit_local.update(to_visit_global.intersection(j_nodes))

        groups.append(df.iloc[list(group)])
    return groups


# Sceglie il gruppo 'migliore', codice brutto
def get_index_good_group(groups):
    lenghts = np.array([np.arange(len(groups)),
                        [len(a) for a in groups]]).T
    if 4 in lenghts:
        return lenghts[lenghts[:, 1] == 4][0][0]

    lenghts = lenghts[(lenghts[:, 1] <= 5) & (lenghts[:, 1] >= 3)]
    if len(lenghts) > 0:
        lenghts = lenghts[lenghts[:, 1].argsort()[::-1]]
        return lenghts[0, 0]


In [12]:
### from fitter
# import fitter
def get_yresiduals_squared(x, y, slope, intercept):
    return (x * slope + intercept - y) ** 2


def get_xresiduals_squared(x, y, slope, intercept):
    return ((y - intercept) / slope - x) ** 2


def get_residuals_eucl_squared(x, y, slope, intercept):
    # distance from the line y_ = ax_ + b from point (x, y)
    return ((slope * x - y + intercept) / np.sqrt(slope ** 2 + 1)) ** 2

def fit_by_bruteforce(x1, x2, x, y, hint=None, res_method="xy", debug=False, block_mode=False):
    combs = np.array(list(itertools.product([0, 1], repeat=len(x1))))
    stack = np.column_stack((x1, x2))

    if hint is not None:
        # help = np.array(help, dtype=np.int)
        good_combs_i = np.all(np.isnan(hint) | (np.array(combs) == hint), axis=1)
        combs = combs[good_combs_i, :]

    res_list = []
    for i, comb in enumerate(combs):
        x_data = stack[np.arange(len(x1)), comb]
        y_data = y
        res = stats.linregress(x_data, y_data)

        if res_method == "xy":
            residuals = get_residuals_eucl_squared(x_data, y_data, res.slope, res.intercept)
        elif res_method == "y":
            residuals = get_yresiduals_squared(x_data, y_data, res.slope, res.intercept)
        elif res_method == "x":
            residuals = get_xresiduals_squared(x_data, y_data, res.slope, res.intercept)
        else:
            raise ValueError(f"Invalid residual method: {res_method}, allowed: x, y, xy")

        residual_sum = np.sum(residuals)
        res_list.append((res, comb, residual_sum, residuals))

    best_res = min(res_list, key=lambda x: x[2])

    debug_data = None
    if debug:
        debug_data = [res_list, None, best_res[3]]
    return best_res[0], best_res[1], debug_data

In [13]:
# Here we want to construct the track for a chamber only (the local track)
def calculate_local_track(df):	
    orbit_groupby = df.groupby("ORBIT")

    # The tracks parameters are saved in a matrix with 6 columns (orbit, chamber, slope1, intercept1,
    # slope2, intercept2) what those are, is specified below
    tracks = np.zeros(shape=(len(orbit_groupby) * 3, 6))
    counter = 0
    for orbit, df_orbit in orbit_groupby:
        # for every orbit we group by chamber
        for ch, df_ch in df_orbit.groupby("CHAMBER"):
            # if orbit != 666316 or ch != 0:
            #     continue

            if len(df_ch) >= 8:
                continue

            groups = group_hits(df_ch)
            df_ch_index = get_index_good_group(groups)

            if df_ch_index is None:
                continue

            df_ch = groups[df_ch_index]

            # x1 are the distances on the left side, x2 on the right side
            x1 = (df_ch.CELL_X - df_ch.DISTANCE).values
            x2 = (df_ch.CELL_X + df_ch.DISTANCE).values
            # x_cell are the x-positions of the center of the cell
            x_cell = df_ch.CELL_X.values
            # y_cell are the y-positions of the center of the cell
            y_cell = df_ch.CELL_Y.values

            # We calculate the linear regression of every possibile combination of right and left
            # res_bf is the best regression result
            res_bf, comb_bf, debug_bf = fit_by_bruteforce(x1, x2, x_cell, y_cell, debug=True)

            # we sort the result by the lowest residual square, where the residual is the euclidean
            # distance from the points to the line
            bf_lr_sorted = sorted(debug_bf[0], key=lambda x: x[2])

            # here we get the parameters of the second element of that array, so the parameters
            # of the second-best line (silver medal)
            slope_silver = bf_lr_sorted[1][0].slope
            inter_silver = bf_lr_sorted[1][0].intercept
            # comb is an array of 0 or 1, that tells us if we used the left (0) or the right (1) distance
            comb_silver = bf_lr_sorted[1][1]

            # we save the result in the tracks-matrix of the best line
            tracks[counter, 0:4] = [orbit, ch, res_bf.slope, res_bf.intercept]
            # and we add in the hits-dataframe a column with the real distance of the hits we found
            # that means that if we found right-right-left-left we get the center distance of the cell
            # and we add-add-sub-sub the distance found from the drift time
            # hit_y is not needed because is equal to the y-center of the cell
            df.loc[df_ch.index, "HIT_X"] = np.where(comb_bf == 0, x1, x2)

            # if we have the second best line (it should be always the case) we add its info
            if slope_silver is not None:
                tracks[counter, 4:] = [slope_silver, inter_silver]
                df.loc[df_ch.index, "HIT_X2"] = np.where(comb_silver == 0, x1, x2)

            counter += 1
    
    # we convert the numpy tracks-matrix to a tracks-dataframe
    tracks = tracks[~np.all(tracks == 0, axis=1)]
    tracks = pd.DataFrame(data=tracks[tracks[:, 0] != 0],
                          columns=["ORBITt", "CHAMBERt", "SLOPEt", "INTERCEPTt", "SLOPE2t", "INTERCEPT2t"])
    return pd.concat([df, tracks], axis=1)

In [14]:
# Now that we have added the new columns, we can perform the next lazy computation:
test_and_track = filtered_data.map_partitions(calculate_local_track, enforce_metadata=False)

In [15]:
# Split the DataFrame based on columns
# Select specific columns for both datasets
analized_data = test_and_track.loc[:, ['TDC', 'BX', 'CHANNEL', 'FPGA', 'HEAD', 'TIME', 'ORBIT', 'T0',
       'CHAMBER', 'CELL', 'LAYER', 'CELL_X', 'CELL_Y', 'REL_TIME', 'DISTANCE', 'HIT_X', 'HIT_X2']]  
tracks = test_and_track.loc[:, ["ORBITt", "CHAMBERt", "SLOPEt", "INTERCEPTt", "SLOPE2t", "INTERCEPT2t"]]

################# 
# Renaming columns as before, to avoid problems with futher analysis
new_columns = ["ORBIT", "CHAMBER", "SLOPE", "INTERCEPT", "SLOPE2", "INTERCEPT2"]
tracks = tracks.rename(columns=dict(zip(tracks.columns, new_columns)))

# Remove rows with NaN
analized_data = analized_data.dropna(how="all")
tracks = tracks.dropna()

In [16]:
# We can see that both analized_data and tracks are dask core dataframes objects
type(analized_data), type(tracks)

(dask.dataframe.core.DataFrame, dask.dataframe.core.DataFrame)

In [17]:
####################
# We now persist our results to avoid redundant computation by caching the intermediate results.
# This allows us to reuse the computed data without re-computing it each time.

# Persist the DataFrames to disk

analized_data = analized_data.persist()
tracks = tracks.persist()

In [18]:
# And still...
type(analized_data), type(tracks)

(dask.dataframe.core.DataFrame, dask.dataframe.core.DataFrame)

# Find global tracks

In [19]:
### from fitter
# from fitter import fit_chambers_by_bruteforce
def fit_chambers_by_bruteforce(xmat, ymat):
    combs = np.array(list(itertools.product(np.arange(xmat.shape[1]), repeat=xmat.shape[0])))

    res_list = []
    y_data = ymat[~np.isnan(ymat)].reshape(-1)
    for i, comb in enumerate(combs):
        # x_data = np.concatenate((x1s[comb[0]], x2s[comb[1]]))
        x_data = xmat[np.arange(xmat.shape[0]), comb, :].reshape(-1)
        x_data = x_data[~np.isnan(x_data)]
        res = stats.linregress(x_data, y_data)
        residuals = get_residuals_eucl_squared(x_data, y_data, res.slope, res.intercept)
        res_list.append((res, comb, np.sum(residuals), residuals))

    res_list.sort(key=lambda x: x[2])
    return res_list

In [20]:
### from analyzer
# from analyzer import calculate_global_track
# Here we try to combine the local track to form a global track

def calculate_global_track(df, tracks):
    # array that will contain the difference in degrees of the global track vs the local track
    # of each chamber
    columns_names = ['G_SLOPE', 'G_INTERCEPT', 'CHAMBER0', 'CHAMBER1', 'CHAMBER2', 'CHAMBER3']
    diff_angles = np.full(shape=(tracks.shape[0], len(columns_names)), fill_value=np.nan)
    # So, for each orbit
    for index, (orbit, df_orbit) in enumerate(df.groupby("ORBIT")):

        # we get the tracks associated with this orbit from the tracks-dataframe
        df_track = tracks[tracks.ORBIT == orbit]
        # we require that there are at least two local track, can be lowered to 1
        if len(df_track) < 2:
            continue

        # chambers is just a list of the chambers with hits in this event
        chambers = df_track.CHAMBER.unique().astype(int)

        # forse toglierli ancora prima, all'inizio o nel local track
        df_orbit = df_orbit[~np.isnan(df_orbit.HIT_X)]
        ch_counts = df_orbit.CHAMBER.value_counts()
        xmat = np.full(shape=(len(ch_counts), 2, max(ch_counts)), fill_value=np.nan)
        ymat = np.full(shape=(len(ch_counts), max(ch_counts)), fill_value=np.nan)
        for i, (ch, n) in enumerate(ch_counts.sort_index().items()):
            df_hits = df_orbit[df_orbit.CHAMBER == ch]
            xmat[i, 0, :n] = df_hits.HIT_X
            xmat[i, 1, :n] = df_hits.HIT_X2
            ymat[i, :n] = df_hits.CELL_Y

        # We have two lines for each chamber (the best and the 2-nd best), we try to find the best possible line
        # using a set of points for each chamber (so we try all the combinations of best and 2nd-best to
        # get the global track)
        result_list = fit_chambers_by_bruteforce(xmat, ymat)

        slopes = np.zeros(4)
        intercepts = np.zeros(4)

        # here we get put into a list the paramters of the local tracks
        slopes[chambers] = df_track.SLOPE.values
        intercepts[chambers] = df_track.INTERCEPT.values

        # we convert the local slopes into angles (degree)
        angles = np.arctan(slopes) * 180 / np.pi
        # we get the result of the best global line
        slope, intercept = result_list[0][0].slope, result_list[0][0].intercept
        # we convert the global slope in angle
        angle = np.arctan(slope) * 180 / np.pi
        # and for each chamber we save the difference
        diff_angles[index, 0:2] = slope, intercept

        for ch in chambers:
            diff_angles[index, ch+2] = angle - angles[ch]

    return pd.DataFrame(data=diff_angles, columns=columns_names).dropna(how="all").drop('CHAMBER1', axis=1)

In [21]:
meta = pd.DataFrame([], dtype=np.ulonglong, columns=['G_SLOPE', 'G_INTERCEPT', 'CHAMBER0', 'CHAMBER2', 'CHAMBER3'])
results = dframe.map_partitions(calculate_global_track, analized_data, tracks, meta=meta)

In [22]:
results.compute()

Unnamed: 0,G_SLOPE,G_INTERCEPT,CHAMBER0,CHAMBER2,CHAMBER3
1,-3.804938,2938.997779,,-0.590259,0.513742
2,12.922701,-5701.190492,3.670327,1.460818,
3,-17.365750,8440.696446,,-1.998658,-0.716284
4,11.070797,-3940.833542,,1.469188,2.895434
5,4.720422,-1252.927269,,112.709154,140.778708
...,...,...,...,...,...
463,-11.752821,6157.708499,2.811242,-1.176699,-0.062390
464,7.987432,-2568.374241,-1.594026,0.618705,25.746092
466,5.561562,-1583.920262,-0.877544,-0.075801,
467,-4.151788,2959.465317,-2.497380,0.747371,-2.015913


In [None]:
end = time.time()

In [None]:
total_time = (end-start)/60
print(f"The total time is {total_time} mins") # 17 minutes

In [24]:
# da.histogram(results)