# Main Code

In [9]:
# Libraries
from kvikio.nvcomp_codec import NvCompBatchCodec
from kvikio import defaults, CuFile
import uproot
import numpy as np
import cupy as cp
import awkward as ak
from uproot.models.RNTuple import _recursive_find

def GPU_read_col_col_cluster_pages(in_ntuple, ncol, cluster_i, filehandle):
    linklist = in_ntuple.page_list_envelopes.pagelinklist[cluster_i]
    pagelist = linklist[ncol].pages if ncol < len(linklist) else []
    dtype_byte = in_ntuple.column_records[ncol].type
    dtype_str = uproot.const.rntuple_col_num_to_dtype_dict[dtype_byte]
    
    if dtype_str == "switch":
        dtype = np.dtype([("index", "int64"), ("tag", "int32")])
    elif dtype_str == "bit":
        dtype = np.dtype("bool")
    else:
        dtype = np.dtype(dtype_str)
    split = dtype_byte in uproot.const.rntuple_split_types


    n_pages = len(pagelist)
    output_buffers = []
    compressed_buffers = []
    futures = []

    for page_desc in pagelist:
        n_elements = page_desc.num_elements
        loc = page_desc.locator
        n_bytes = loc.num_bytes
        isbit = dtype_str == "bit"
        len_divider = 8 if isbit else 1
        num_elements = n_elements
        n_elements_toread = num_elements // dtype.itemsize
        if isbit:
            num_elements_toread = int(numpy.ceil(num_elements / 8))
        elif dtype_str in ("real32trunc", "real32quant"):
            num_elements_toread = int(numpy.ceil((num_elements * 4 * nbits) / 32))
            dtype = numpy.dtype("uint8")
        else:
            num_elements_toread = num_elements

        uncomp_size = num_elements_toread * dtype.itemsize

        
        out_buff = cp.empty(n_elements_toread, dtype = dtype)


        # Use locator to read page
        
        comp_buff = cp.empty(n_bytes - 9, dtype = "b")
        fut = filehandle.pread(comp_buff,
                              size = int(n_bytes - 9),
                              file_offset = int(loc.offset + 9))


        output_buffers.append(out_buff)
        compressed_buffers.append(comp_buff)
        futures.append(fut)
        
    for future in futures:
        future.get()
            
    return (compressed_buffers, output_buffers, futures)
    
        
    
def GPU_read_col_clusters(in_ntuple, ncol, cluster_range, dtype_byte):
    filepath = in_ntuple.file.source.file_path
    compressed_buffers = []
    output_buffers = []
    futures = []
    print("Reading column {}".format(ncol))
    with CuFile(filepath, "rb") as filehandle:
        for i, cluster_i in enumerate(cluster_range):
            (cluster_compressed_buffers,
             cluster_output_buffers,
             cluster_futures) = GPU_read_col_col_cluster_pages(in_ntuple,
                                                                ncol,
                                                                cluster_i,
                                                                filehandle)
            # Aggregate results
            compressed_buffers.extend(cluster_compressed_buffers)
            output_buffers.extend(cluster_output_buffers)
            futures.extend(cluster_futures)

    # Does not work.... futures maybe cannot be passed arbitrarily deep
    # Wait for all reads to complete
        # for fut in futures:
        #     if not fut.done():
        #         print(f"Future not completed: {fut}")
        #         fut.get()  

    return compressed_buffers, output_buffers
    

def GPU_read_cols(in_ntuple, columns, start_cluster_idx, stop_cluster_idx):
    compressed_buffers = []
    output_buffers = []
    for key in columns:
        if "column" in key and "union" not in key:
            key_nr = int(key.split("-")[1])
            dtype_byte = in_ntuple.ntuple.column_records[key_nr].type
            cluster_range = range(start_cluster_idx, stop_cluster_idx)
            
            compressed_buffers_col, output_buffers_col = GPU_read_col_clusters(in_ntuple,
                                                                       key_nr,
                                                                       cluster_range,
                                                                       dtype_byte)
            compressed_buffers.extend(compressed_buffers_col)
            output_buffers.extend(output_buffers_col)
            
    #print(len(compressed_buffers), len(output_buffers))
    #print(compressed_buffers, output_buffers)
    return(compressed_buffers, output_buffers)
            

def Process_decompressed_content(in_ntuple, columns,
                                 start_cluster_idx, stop_cluster_idx,
                                 all_decompressed_content):
    for key in columns:
        n_pages = 0

        if "column" in key and "union" not in key:
            key_nr = int(key.split("-")[1])
            dtype_byte = in_ntuple.ntuple.column_records[key_nr].type
            cluster_range = range(start_cluster_idx, stop_cluster_idx)
            page_i = -1
            
            for i in cluster_range:
                arrays = []
                linklist = self.page_list_envelopes.pagelinklist[cluster_i]
                pagelist = linklist[ncol].pages if ncol < len(linklist) else []
                n_pages += len(pagelist)
                dtype_byte = self.column_records[ncol].type
                dtype_str = uproot.const.rntuple_col_num_to_dtype_dict[dtype_byte]
                total_len = numpy.sum([desc.num_elements for desc in pagelist], dtype=int)
                if dtype_str == "switch":
                    dtype = numpy.dtype([("index", "int64"), ("tag", "int32")])
                elif dtype_str == "bit":
                    dtype = numpy.dtype("bool")
                else:
                    dtype = numpy.dtype(dtype_str)
                res = cp.empty(total_len, dtype)
                split = dtype_byte in uproot.const.rntuple_split_types
                zigzag = dtype_byte in uproot.const.rntuple_zigzag_types
                delta = dtype_byte in uproot.const.rntuple_delta_types
                index = dtype_byte in uproot.const.rntuple_index_types
                nbits = uproot.const.rntuple_col_num_to_size_dict[dtype_byte]
                tracker = 0
                cumsum = 0
                for page_desc in pagelist:
                    page_i += 1
                    n_elements = page_desc.num_elements
                    tracker_end = tracker + n_elements
                    content = all_decompressed_content[i]

                    loc = page_desc.locator
                    context = {}
                    # bool in RNTuple is always stored as bits
                    isbit = dtype_str == "bit"
                    len_divider = 8 if isbit else 1

                    if split:
                        content = content.view(cp.uint8)
            
                        if nbits == 16:
                            # AAAAABBBBB needs to become
                            # ABABABABAB
                            res = cp.empty(len(content), cp.uint8)
                            res[0::2] = content[len(res) * 0 // 2 : len(res) * 1 // 2]
                            res[1::2] = content[len(res) * 1 // 2 : len(res) * 2 // 2]
            
                        elif nbits == 32:
                            # AAAAABBBBBCCCCCDDDDD needs to become
                            # ABCDABCDABCDABCDABCD
                            res = cp.empty(len(content), cp.uint8)
                            res[0::4] = content[len(res) * 0 // 4 : len(res) * 1 // 4]
                            res[1::4] = content[len(res) * 1 // 4 : len(res) * 2 // 4]
                            res[2::4] = content[len(res) * 2 // 4 : len(res) * 3 // 4]
                            res[3::4] = content[len(res) * 3 // 4 : len(res) * 4 // 4]
            
                        elif nbits == 64:
                            # AAAAABBBBBCCCCCDDDDDEEEEEFFFFFGGGGGHHHHH needs to become
                            # ABCDEFGHABCDEFGHABCDEFGHABCDEFGHABCDEFGH
                            res = cp.empty(len(content), cp.uint8)
                            res[0::8] = content[len(res) * 0 // 8 : len(res) * 1 // 8]
                            res[1::8] = content[len(res) * 1 // 8 : len(res) * 2 // 8]
                            res[2::8] = content[len(res) * 2 // 8 : len(res) * 3 // 8]
                            res[3::8] = content[len(res) * 3 // 8 : len(res) * 4 // 8]
                            res[4::8] = content[len(res) * 4 // 8 : len(res) * 5 // 8]
                            res[5::8] = content[len(res) * 5 // 8 : len(res) * 6 // 8]
                            res[6::8] = content[len(res) * 6 // 8 : len(res) * 7 // 8]
                            res[7::8] = content[len(res) * 7 // 8 : len(res) * 8 // 8]
            
                        content[:] = res.view(dtype)
            
                    if isbit:
                        content[:] = (
                            cp.unpackbits(content.view(dtype=numpy.uint8))
                            .reshape(-1, 8)[:, ::-1]
                            .reshape(-1)
                        )
                    content = content[:n_elements]
                    res[tracker:tracker_end] = content

                    if delta:
                        res[tracker] -= cumsum
                        cumsum += cp.sum(res[tracker:tracker_end])
                    tracker = tracker_end

                    if index:
                        res = cp.insert(res, 0, 0)  # for offsets
                    if zigzag:
                        res = _from_zigzag(res)
                    elif delta:
                        res = cp.cumsum(res)

                    arrays.append(res)
        
        # Check if column stores offset values for jagged arrays (splitindex64) (applies to cardinality cols too):
        if dtype_byte in uproot.const.rntuple_delta_types:
            # Extract the last offset values:
            last_elements = [
                arr[-1] for arr in arrays[:-1]
            ]  # First value always zero, therefore skip first arr.
            # Compute cumulative sum using itertools.accumulate:
            last_offsets = list(accumulate(last_elements))
            # Add the offsets to each array
            for i in range(1, len(arrays)):
                arrays[i] += last_offsets[i - 1]
            # Remove the first element from every sub-array except for the first one:
            arrays = [arrays[0]] + [arr[1:] for arr in arrays[1:]]

        res = cp.concatenate(arrays, axis=0)

        if pad_missing_element:
            first_element_index = self.column_records[ncol].first_element_index
            res = numpy.pad(res, (first_element_index, 0))
                    

            




            
def kvikuproot_open_RNTuple(in_ntuple_path, columns, classname, entry_start = 0, entry_stop = None):
    in_ntuple = uproot.open(in_ntuple_path)[classname]
    entry_stop = entry_stop or in_ntuple.ntuple.num_entries
    
    # Find clusters to read that contain data from entry_start to entry_stop
    clusters = in_ntuple.ntuple.cluster_summaries
    cluster_starts = np.array([c.num_first_entry for c in clusters])

    start_cluster_idx = (
        np.searchsorted(cluster_starts, entry_start, side="right") - 1
    )
    stop_cluster_idx = np.searchsorted(cluster_starts, entry_stop, side="right")
    cluster_num_entries = np.sum(
        [c.num_entries for c in clusters[start_cluster_idx:stop_cluster_idx]]
    )

    # Get form for requested columns
    form = in_ntuple.to_akform().select_columns(
        columns, prune_unions_and_records=False
    )

    # Only read columns mentioned in the awkward form
    target_cols = []
    container_dict = {}
    _recursive_find(form, target_cols)

    # Read all columns 'compressed' data
    all_compressed_buffers, all_output_buffers = GPU_read_cols(in_ntuple,
                                                             target_cols,
                                                             start_cluster_idx,
                                                             stop_cluster_idx)

    # Decompression GPU
    print("GPU decompression")
    codec = NvCompBatchCodec("zstd")
    all_decompressed_content = codec.decode_batch(all_compressed_buffers,
                                                 all_output_buffers)

    # Process decompressed data
    # arrays = Process_decompressed_content(in_ntuple,
                                          # target_cols,
                                          # start_cluster_idx,
                                          # stop_cluster_idx,
                                          # all_decompressed_content)
    

    # Process decompressed data
    # for key in target_cols:
    #     if "column" in key and "union" not in key:
    #         key_nr = int(key.split("-")[1])
    #         dtype_byte = in_ntuple.ntuple.column_records[key_nr].type
    #         cluster_range = range(start_cluster_idx, stop_cluster_idx)
    #         arrays = []
    #         for i, cluster_i in enumerate(cluster_range):
    #                 linklist = self.page_list_envelopes.pagelinklist[cluster_i]
    #                 pagelist = linklist[ncol].pages if ncol < len(linklist) else []
    #                 dtype_byte = self.column_records[ncol].type
    #                 dtype_str = uproot.const.rntuple_col_num_to_dtype_dict[dtype_byte]
    #                 total_len = numpy.sum([desc.num_elements for desc in pagelist], dtype=int)
    #                 if dtype_str == "switch":
    #                     dtype = numpy.dtype([("index", "int64"), ("tag", "int32")])
    #                 elif dtype_str == "bit":
    #                     dtype = numpy.dtype("bool")
    #                 else:
    #                     dtype = numpy.dtype(dtype_str)
    #                 split = dtype_byte in uproot.const.rntuple_split_types
    #                 zigzag = dtype_byte in uproot.const.rntuple_zigzag_types
    #                 delta = dtype_byte in uproot.const.rntuple_delta_types
    #                 index = dtype_byte in uproot.const.rntuple_index_types
    #                 nbits = uproot.const.rntuple_col_num_to_size_dict[dtype_byte]
    #                 tracker = 0
    #                 cumsum = 0
    #                 for page_desc in pagelist:
    #                     n_elements = page_desc.num_elements
    #                     tracker_end = tracker + n_elements
    #                     self.read_pagedesc(
    #                         res[tracker:tracker_end], page_desc, dtype_str, dtype, nbits, split
    #                     )
    #                     if delta:
    #                         res[tracker] -= cumsum
    #                         cumsum += cp.sum(res[tracker:tracker_end])
    #                     tracker = tracker_end
            
    #                 if index:
    #                     res = cp.insert(res, 0, 0)  # for offsets
    #                 if zigzag:
    #                     res = _from_zigzag(res)
    #                 elif delta:
    #                     res = cp.cumsum(res)

                    
                
        
        
        
    return 0

path_rntuple = "/home/fstrug/uscmshome/nobackup/GPU/kvikio_playground/TTToSemiLeptonic_UL18JMERNTuple-zstd.root"
cols = ["Jet_pt"]
classname = "Events"
kvikuproot_open_RNTuple(path_rntuple, cols, classname)

Reading column 1408
Reading column 1442
GPU decompression


ValueError: could not broadcast input array from shape (7818,) into shape (977,)

# Fluff

In [2]:
array = cp.array([1], dtype = cp.int64)

In [4]:
help(array)

Help on ndarray in module cupy object:

class ndarray(cupy._core.core._ndarray_base)
 |  ndarray(*args, _obj=None, _no_init=False, **kwargs)
 |  
 |  __init__(self, shape, dtype=float, memptr=None, strides=None, order='C')
 |  
 |  Multi-dimensional array on a CUDA device.
 |  
 |  This class implements a subset of methods of :class:`numpy.ndarray`.
 |  The difference is that this class allocates the array content on the
 |  current GPU device.
 |  
 |  Args:
 |      shape (tuple of ints): Length of axes.
 |      dtype: Data type. It must be an argument of :class:`numpy.dtype`.
 |      memptr (cupy.cuda.MemoryPointer): Pointer to the array content head.
 |      strides (tuple of ints or None): Strides of data in memory.
 |      order ({'C', 'F'}): Row-major (C-style) or column-major
 |          (Fortran-style) order.
 |  
 |  Attributes:
 |      base (None or cupy.ndarray): Base array from which this array is
 |          created as a view.
 |      data (cupy.cuda.MemoryPointer): Pointe