In [37]:
from typing import Any, List
import h5py as hp
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view

def test_monotonic_increasing(sequence):
    array = np.array([0 for _ in range(len(sequence))])
    sequence.read_direct(array)
    diff = np.diff(array)

    print(f"   |num non-monotonic elements = {len(diff[diff < 0])}")

def test_uniqueness(sequence):
    array = np.array([0 for _ in range(len(sequence))])
    sequence.read_direct(array)
    unique_values = np.unique(array)

    print(f"   |num duplicate elements found = {len(array) - len(unique_values)}")

def test_outliers(sequence, expected_diff):
    array = np.array([0 for _ in range(len(sequence))])
    sequence.read_direct(array)
    num_outliers = np.count_nonzero(array > 2*len(array)*expected_diff)
    print(f"   |num outliers (i.e. greater than 2*{len(array)*expected_diff}) = {num_outliers}")
    #if len(outliers) > 0:
    #    print(f"   |   outlier values = {len(outliers)}")

def test_distribution(sequence, num_channels):
    array = np.array([0 for _ in range(len(sequence))])
    sequence.read_direct(array)
    distr = np.array([np.count_nonzero(array == c) for c in range(num_channels)])
    if np.any(distr == 0):
        print(f"   |num channels with zero events = {np.count_nonzero(distr == 0)}")


    

In [51]:

def test_duplicate_events(time_zero, indices, events):
    time_zero_array = np.array([0 for _ in range(len(time_zero))])
    time_zero.read_direct(time_zero_array)
    events_lists = chop_events(indices, events)
    if len(time_zero_array) != len(events_lists):
        print("Events lists fuckt")

    unique_values, inverse, counts = np.unique(time_zero_array, return_counts = True, return_inverse = True)
    for idx, uv in enumerate(unique_values):
        if counts[idx] > 1:
            indices = [i for i,_ in enumerate(inverse == uv)]
            lists = [events_lists[i] for i in indices]
            if np.all([lists[0] == l for l in lists[1:]]):
                pass #print("   |Duplicates, are really duplicates")
            else:
                print("   |Duplicates, are not really duplicates")


def chop_events(indices, events) -> List[Any]:
    index_array = np.array([0 for _ in range(len(indices) + 1)])
    indices.read_direct(index_array[:-1])
    events_array = np.array([0 for _ in range(len(events))])
    events.read_direct(events_array)
    index_array[-1] = len(events_array)
    return [events_array[i1:i2] for (i1,i2) in sliding_window_view(index_array, window_shape = 2)]


def assess_file(file: hp.File):
    start_time = file.get("/raw_data_1/start_time")
    end_time = file.get("/raw_data_1/end_time")
    event_id = file.get("/raw_data_1/detector_1/event_id")
    event_time_zero = file.get("/raw_data_1/detector_1/event_time_zero")
    event_index = file.get("/raw_data_1/detector_1/event_index")

    print(f"frame {f}:")

    print(f"  start = {start_time.values()}")
    val = end_time.get()
    print(f"  start = {val}")
    print(f"  end = {event_time_zero.len()}")
    print(f"  num frames = {event_time_zero.len()}")
    if event_time_zero.len() != 0:
        test_monotonic_increasing(event_time_zero)
        test_uniqueness(event_time_zero)
        test_outliers(event_time_zero, 20000000)
    
    print(f"  num events = {len(event_id)}")
    #test_duplicate_events(event_time_zero,event_index,event_id)
    test_distribution(event_id, 64)


In [52]:
f1,f2 = 4911,5536
path = "../archive/incoming/hifi/HIFI0000"

for f in range(f1,f2):
    try:
        file = hp.File(f"{path}{f}.nxs")
        assess_file(file)
    except ZeroDivisionError as e:
        print(f"{e}")
    except FileNotFoundError as e:
        print(f"{e}")

frame 4911:


AttributeError: 'Dataset' object has no attribute 'values'