In [1]:
import pandas, numpy, dask, bokeh, numba, scipy
from prefect import flow, task
import dask.dataframe as dd
from dask.distributed import Client

import os, struct

server,port = "192.168.1.90","8786"

client = Client(n_workers=7,processes=True,threads_per_worker=2) if os.system(f"telnet {server} {port}") else Client(f"tcp://{server}:{port}") 

telnet: Unable to connect to remote host: No route to host


Trying 192.168.1.90...


2023-04-23 23:15:53,295 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-mb2wrvpn', purging


In [2]:
file = "../../data/Multi_iTED_characterization/test_D.2023_03_31_T.16_26_55_C.itedABCD_lab_custom_2023.02.22_4.0v_887_5s.singles.ldat"

In [3]:
deconvolution_dict = {
    0:{
        "iTED": 1,
        "Crystal": 0,
        "Offset": 0
    },
    1:{
        "iTED": 1,
        "Crystal": 1,
        "Offset": 128
    },
    2:{
        "iTED": 1,
        "Crystal": 2,
        "Offset": 256
    },
    3:{
        "iTED": 1,
        "Crystal": 3,
        "Offset": 384
    },
    4:{
        "iTED": 1,
        "Crystal": 4,
        "Offset": 512
    },
    5:{
        "iTED": 2,
        "Crystal": 0,
        "Offset": 640
    },
    6:{
        "iTED": 2,
        "Crystal": 1,
        "Offset": 768
    },
    7:{
        "iTED": 2,
        "Crystal": 2,
        "Offset": 896
    },
    8:{
        "iTED": 3,
        "Crystal": 0,
        "Offset": 131072
    },
    10:{
        "iTED": 3,
        "Crystal": 1,
        "Offset": 131328
    },
    11:{
        "iTED": 3,
        "Crystal": 2,
        "Offset": 131456
    },
    12:{
        "iTED": 3,
        "Crystal": 3,
        "Offset": 131584
    },
    13:{
        "iTED": 3,
        "Crystal": 4,
        "Offset": 131712
    },
    14:{
        "iTED": 2,
        "Crystal": 3,
        "Offset": 131840
    },
    15:{
        "iTED": 2,
        "Crystal": 4,
        "Offset": 131968
    },
    16:{
        "iTED": 4,
        "Crystal": 0,
        "Offset": 262144
    },
    18:{
        "iTED": 4,
        "Crystal": 1,
        "Offset": 262400
    },
    19:{
        "iTED": 4,
        "Crystal": 2,
        "Offset": 262528
    },
    20:{
        "iTED": 4,
        "Crystal": 3,
        "Offset": 262656
    },
    21:{
        "iTED": 4,
        "Crystal": 4,
        "Offset": 262784
    }
}

deconvolution_df = pandas.DataFrame(deconvolution_dict).T

In [4]:
@dask.delayed
def get_entry_from_binary(file_name, chunk_size=8+4+4):
    with open(file_name, "rb") as f: 
        while (entry := f.read(chunk_size)):
            yield dict(zip(("time","energy","id"), struct.unpack("qfi", entry)))

def identification(record):
    deconvolute = lambda x: deconvolution_dict[(x % (131072 * 2) + 2048) // 128 if x >= 131072 * 2 else (x % 131072 + 1024) // 128 if x >= 131072 else x // 128]
    
    ited, crystal, offset = deconvolute(record['id']).values()
    
    return {
        'time': record['time'], # in ps
        'energy': record['energy'],
        'iTED': ited,
        'crystal': crystal,
        'pixel': record['id']-offset,
    }

thresholds = lambda current_group, threshold_energy, threshold_events: current_group.energy.sum() >= threshold_energy and len(current_group) >= threshold_events

@dask.delayed
def get_groups(df, interval, thresholds,threshold_energy,threshold_events):
        
    start_time = df['time'].min()
    end_time = start_time + interval
    
    current_group = pandas.DataFrame(
        columns=["time","energy","iTED","crystal","pixel"]
    ).astype(dtype={"time": "int64", "energy": "float32", "iTED": "int8", "crystal": "int8", "pixel": "int8"})
    
    for _, row in df.iterrows():
        
        if row['time'] > end_time:
            if thresholds(current_group,threshold_energy,threshold_events):
                yield current_group
                
            current_group = pandas.DataFrame(
                columns=["time","energy","iTED","crystal","pixel"]
            ).astype(dtype={"time": "int64", "energy": "float32", "iTED": "int8", "crystal": "int8", "pixel": "int8"})
            
            start_time = row['time']
            end_time = start_time + interval
        
        current_group.loc[len(current_group)] = row
    
    if thresholds(current_group,threshold_energy,threshold_events):
        yield current_group

def group_by_crystal(df):
    return df.groupby(["iTED","crystal"])

def branches(event):
    
    return {
        "time_first_event": event.iloc[0].time.round(0), #int64
        "timestamp": event.iloc[:9].time.mean().round(0), #int64
        "energy": event.energy.sum(), #float32
        "iTED": event.iloc[0].iTED, #int8
        "crystal": event.iloc[0].crystal #int8
    }

In [6]:
def get_interactions(file_name):
    
    entries_bag = dask.bag.from_delayed(get_entry_from_binary(file_name))

    entires_df  = entries_bag.map(identification).to_dataframe(
        meta={"time": "int64", "energy": "float32", "iTED": "int8", "crystal": "int8", "pixel": "int8"},
        optimize_graph=True
    )

    events_series = group_by_crystal(entires_df)

    events_bag = dask.bag.from_delayed(
        events_series.apply(lambda x : get_groups(x, 100_000, thresholds,10,1), meta=('x', 'object'))
    )
        
    events_df = events_bag.map(branches).to_dataframe(
        meta={"time_first_event": "int64", "timestamp": "int64", "iTED": "int8", "crystal": "int8", "energy": "float32"},
        optimize_graph=True
    )
    
    return events_df.compute()

In [7]:
get_interactions(file)

2023-04-23 23:40:44,941 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f7a22138f40>>, <Task finished name='Task-717801' coro=<Hardware.__init__.<locals>.f() done, defined at /usr/lib/python3.10/site-packages/distributed/dashboard/components/scheduler.py:848> exception=CommClosedError('in <TCP (closed) ConnectionPool.benchmark_network local=tcp://127.0.0.1:38444 remote=tcp://127.0.0.1:44349>: Stream is closed')>)
Traceback (most recent call last):
  File "/usr/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib/python3.10/site-packages/tornado/ioloop.py", line 740, in _run_callback
    ret = callback()
  File "/usr/lib/

AttributeError: 'DataFrame' object has no attribute 'visualize'

In [25]:
a.apply(lambda x : type(x))

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  a.apply(lambda x : type(x))


Dask Series Structure:
npartitions=1
    object
       ...
dtype: object
Dask Name: lambda, 7 graph layers

In [16]:
%%prun -l 10

a=get_interactions(file)

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  events_series.apply(lambda x : get_groups(x, 100_000, thresholds,10,1))


KeyboardInterrupt: 

In [7]:
%%prun -l 10

a.head()


KeyboardInterrupt



Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
2023-04-23 19:24:36,253 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
2023-04-23 19:24:36,253 - distributed.nanny - ERROR - Worker process died unexpectedly
Traceback (most recent call last):
  File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 636, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.10/asyncio/base_events.py", line 603, in run_forever
    self._run_once()
  File "/usr/lib/python3.10/asyncio/base_events.py", line 1871, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.10/selectors.py", line 469, in sel



    self._handle_write()




  File "/usr/lib/python3.10/site-packages/tornado/iostream.py", line 973, in _handle_write








    num_bytes = self.write_to_fd(self._write_buffer.peek(size))
  File "/usr/lib/python3.10/site-packages/tornado/iostream.py", line 1146, in write_to_fd
    return self.socket.send(data)  # type: ignore
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.10/site-packages/distributed/process.py", line 188, in _run
    target(*args, **kwargs)
  File "/usr/lib/python3.10/site-packages/distributed/nanny.py", line 955, in _run
    with contextlib.ExitStack() as stack:
  File "/usr/lib/python3.10/contextlib.py", line 576, in __exit__
    raise exc_details[1]
  File "/usr/lib/python3.10/contextlib.py", line 561, in __exit__
    if cb(*exc_details):
  File "/usr/lib/python3.10/contextli

    self._wait_for_tstate_lock(timeout=max(timeout, 0))
  File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock


    if lock.acquire(block, timeout):
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/usr/lib/python3.10/multiprocessing/spawn.py", line 129, in _main
    return self._bootstrap(parent_sentinel)
  File "/usr/lib/python3.10/multiprocessing/process.py", line 330, in _bootstrap
    traceback.print_exc()
  File "/usr/lib/python3.10/traceback.py", line 179, in print_exc
    print_exception(*sys.exc_info(), limit=limit, file=file, chain=chain)
  File "/usr/lib/python3.10/traceback.py", line 119, in print_exception
    te = TracebackException(type(value), value, tb, limit=limit, compact=True)
  File "/usr/lib/python3.10/traceback.py", line 502, in __init__
    self.stack = StackSummary.extract(
  File "/usr/lib/python3.10/traceback.py", line 383, in extr



In [None]:
a.energy.compute()

In [35]:
a.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, time_first_event to energy
dtypes: float32(1), int64(2), int8(2)

In [7]:
get_interactions(file).head().visualize(filename='test.svg', optimize_graph=True)

KeyboardInterrupt: 

2023-04-23 19:28:29,829 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Traceback (most recent call last):
2023-04-23 19:28:29,830 - distributed.nanny - ERROR - Worker process died unexpectedly
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.10/site-packages/distributed/process.py", line 188, in _run
    target(*args, **kwargs)
  File "/usr/lib/python3.10/site-packages/distributed/nanny.py", line 981, in _run
    asyncio.run(run())
  File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 636, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.10/asyncio/base_events.py", line 603, in run_forever
    self._run_once()
 





In [37]:
a.dask

0,1
"layer_type  MaterializedLayer  is_materialized  True  number of outputs  38  npartitions  19  columns  ['time_first_event', 'timestamp', 'iTED', 'crystal', 'energy']  type  dask.dataframe.core.DataFrame  dataframe_type  pandas.core.frame.DataFrame  series_dtypes  {'time_first_event': dtype('int64'), 'timestamp': dtype('int64'), 'iTED': dtype('int8'), 'crystal': dtype('int8'), 'energy': dtype('float32')}",

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,38
npartitions,19
columns,"['time_first_event', 'timestamp', 'iTED', 'crystal', 'energy']"
type,dask.dataframe.core.DataFrame
dataframe_type,pandas.core.frame.DataFrame
series_dtypes,"{'time_first_event': dtype('int64'), 'timestamp': dtype('int64'), 'iTED': dtype('int8'), 'crystal': dtype('int8'), 'energy': dtype('float32')}"
