In [1]:
import numpy as np
import os
import sys
import h5py

In [2]:
### Reading the HDF5 file ###

READPATH = "/fs/project/PAS1405/General/Kimmel_Chris/061120_8079m6A_IVTcarrierRNA_ligation_polyA.tombo.per_read_stats"

with h5py.File(READPATH, 'r') as hdf5file:
    block_stats = hdf5file['Statistic_Blocks']['Block_0']['block_stats'] 
    read_ids = hdf5file['Statistic_Blocks']['Block_0']['read_ids']
    
    bs_struct_array = np.asarray(block_stats)
    riv_array = np.asarray(read_ids)

bs_rec_array = np.rec.array(bs_struct_array)

In [3]:
len(bs_rec_array)

28107436

In [4]:
### Calculate some basic data and initialize the table ###

pos_array = bs_rec_array['pos'] # make array from the nucleotide-position column
rin_array = bs_rec_array['read_id'] # make array from the read-id-number column

pos_set = set(pos_array)
num_poss = len(pos_set)
pos_tuple = sorted(tuple(pos_set))

rin_set = set(rin_array)
num_reads = len(rin_set)
rin_tuple = sorted(tuple(rin_set))

table = np.full((num_reads, num_poss), np.nan, np.dtype('f8'), order='C') # Interestingly, order='F' doesn't seem slower
''' There is one row in table for every rin, and one column for every position.
The rin corresponding to row i is rin_tuple(i). The position corresponding to position j is pos_tuple(j).
We need to fill the table very quickly, so we need a fast way to look up the *row* corresponding
to a given *rin*, and the *column* corresponding to a given *position*.
That's what pos_to_col_index and rin_to_row_index are for.
'''
pos_to_col_index = {pos: index for index, pos in enumerate(pos_tuple)}
rin_to_row_index = {read_id_number: index for index, read_id_number in enumerate(rin_tuple)}
# Hopefully we won't need pos_to_col_index after we implement a faster way to fill in the table. NOPE we'll still need it.

new_table = np.full((num_reads, num_poss), np.nan, np.dtype('f8'), order='C') # Interestingly, order='F' doesn't seem slower

In [5]:
# Sort bs_rec_array. Maybe try sorting primarily by 'pos'. That might help us fill the table faster below.
# This step took me 20 seconds on a bs_rec_array with 28 million entries (Owens cluster, 1 node, 28 cores)
bs_rec_array_sorted = bs_rec_array.copy()
bs_rec_array_sorted.sort(order=['read_id','pos'], kind='mergesort')

In [6]:
### OLD METHOD ###

# On a 28-core CPU in the Owens cluster this loop took a little over 3 minutes on a rec_array with 28 million entries:
num_checkpoints = 20
checkpoint_size = len(bs_rec_array_sorted)//num_checkpoints
for i, (pos, stat, read_id_number) in enumerate(bs_rec_array_sorted):
    # TODO: Assert that this table position is already empty?
    row_index = rin_to_row_index[read_id_number]
    col_index = pos_to_col_index[pos]
    table[row_index, col_index] = stat
    if i % checkpoint_size == 0:
        print('Progress: {}/{}'.format(i//checkpoint_size, num_checkpoints))

Progress: 0/20
Progress: 1/20
Progress: 2/20
Progress: 3/20
Progress: 4/20
Progress: 5/20
Progress: 6/20
Progress: 7/20
Progress: 8/20
Progress: 9/20
Progress: 10/20
Progress: 11/20
Progress: 12/20
Progress: 13/20
Progress: 14/20
Progress: 15/20
Progress: 16/20
Progress: 17/20
Progress: 18/20
Progress: 19/20
Progress: 20/20


In [19]:
### Pull out slices of bs_rec_array_sorted, each one corresponding to a read ###

number_of_records = bs_rec_array_sorted.shape[0]

indices_preceeding_discontinuities = np.where((np.diff(bs_rec_array_sorted['read_id']) != 0)
                                      | (np.diff(bs_rec_array_sorted['pos']) != 1)
                                     )[0]
indices_following_discontinuities = indices_preceeding_discontinuities + 1
slice_boundaries = np.concatenate(([0], indices_following_discontinuities, [number_of_records]))

for i, (st, sp) in enumerate(zip(slice_boundaries, slice_boundaries[1:])):
    rin = bs_rec_array_sorted['read_id'][st]
    row_index = rin_to_row_index[rin]
    
    # The business of looking up which positions we need to start and stop the entries at deserves closer scrutiny
    # The code currently assumes that given slices of bs_rec_array_sorted have positions that increment one at a time.
    # Combine ( np.diff(bs_rec_array_sorted['pos']) != 1 ) with np.diff(bs_rec_array_sorted['read_id']).nonzero() using
    # ... some kind of entry-wise AND.
    assert st < sp
    low_pos = bs_rec_array_sorted['pos'][st]
    high_pos = bs_rec_array_sorted['pos'][sp-1] # -1 because we're accessing a value...
    assert low_pos < high_pos
    low_col_index = pos_to_col_index[low_pos]
    high_col_index = pos_to_col_index[high_pos]
    assert low_col_index < high_col_index
    
    new_table[row_index,low_col_index:(high_col_index+1)] = bs_rec_array_sorted['stat'][st:sp]

Progress: 0/5
Progress: 1/5
Progress: 2/5
Progress: 3/5
Progress: 4/5
Progress: 5/5


In [9]:
np.all(np.isclose(new_table,table,equal_nan=True))

True

In [10]:
np.where(~np.isclose(new_table, table, equal_nan=True))

(array([], dtype=int64), array([], dtype=int64))

In [11]:
set(np.where(~np.isclose(new_table, table, equal_nan=True))[0])

set()

In [12]:
rin[14]

IndexError: invalid index to scalar variable.

In [17]:
set(indices_preceeding_discontinuities) >= set(indices_preceeding_rid_changes)

True