In [1]:
import sys
import os
import time
sys.path.insert(0, '/mnt/cbis/home/amandakau/pyReCoDe')
from pyrecode.recode_reader import ReCoDeReader

import multiprocessing
import queue
import numpy as np
from scipy.sparse import coo_matrix
import matplotlib.pyplot as plt

When reading ReCoDe part files, first fetch data in batches using __ReaderNode__. Downstream applications may read data provided by ReaderNode instead of by ReCoDeReader. One application to aggregate all available part files is achieved in the __ReCoDeViewer__ and __ReCoDeViewer_MP__ classes. The main difference between these two classes is the former doesn't support multiprocessing while the latter does.

In [2]:
class ReaderNode:
    def __init__(self, file_name, file_id):
        self._file_name = file_name
        self._file_id = file_id
        self._is_read = False
        
        self._reader = ReCoDeReader(file_name, is_intermediate=True)
        self._reader.open(print_header=False)
        header = self._reader.get_header().as_dict()
        
        self._shape = (header['nx'], header['ny'])
        self._curr_position = self._reader._current_frame_index
        
    def get_next_batch(self, batch_size=60):
        count = 0
        batch_data = []
        
        while count < batch_size and self._is_read == False:
            frame_data = self._reader.get_next_frame()
            if frame_data is None:
                self.is_read = True
                break
            else:
                frame_id = list(frame_data.keys())[0]
                batch_data.append(frame_data[frame_id]['data'])
                count += 1
            del(frame_data)
        
        self._curr_position = self._reader._current_frame_index
        return (self._file_id, batch_data)
    
    def get_shape(self):
        return self._shape
    
    def get_status(self):
        read_status = 'read' if self._is_read else 'still being read'
        pos_status = '' if self._is_read else f' (Current position at {self._curr_position})'
        print(f'{self._file_name} corresponding to part file #{self._file_id} is {read_status}{pos_status}')

---

In [3]:
class ReCoDeViewer:
    def __init__(self, folder_path, file_name, num_parts, batch_size=60):
        self._folder_path = folder_path
        self._file_name = file_name
        self._num_parts = num_parts
        self._batch_size = batch_size
        
        #Create ReaderNodes for each part file to fetch batch data
        self._part_files = {}
        for index in range(self._num_parts):
            part_file_name = os.path.join(self._folder_path, self._file_name + '_part' + '{0:03d}'.format(index))
            self._part_files[index] = ReaderNode(part_file_name, index)
    
    def get_info(self):
        print(f'Reading from {self._num_parts} part files:')
        for index in self._part_files.keys():
            self._part_files[index].get_status()
    
    def sum_frames(self, part_file_id, batch_data, final_result):
        summed_frame = coo_matrix(self._part_files[0]._shape)
        for frame in batch_data:
            summed_frame += frame # Maintains the COO matrix
                
        #Save result
        if part_file_id not in final_result:
            final_result[part_file_id] = summed_frame
        else:
            final_result[part_file_id] = np.add(final_result[part_file_id], summed_frame)
        
        batch_data.clear()
        return summed_frame
    
    def combine_part_files(self, final_result):
        shape = self._part_files[0]._shape
        summed_frame = np.zeros(shape)
        
        for index in range(self._num_parts):
            if index not in final_result:
                break
                
            #If the image in the part file has different dimensions, skip this part file
            elif np.shape(final_result[index]) != shape:
                print(f'Part file #{index} does not have the same dimensions - cannot be aggregated.')
                break

            summed_frame = np.add(summed_frame, final_result[index].toarray())

        return summed_frame
    
    def plot_frame(self, frame, part_file_id, num_frames):
        fig, ax = plt.subplots(1, 1, figsize=(20,10))
        im = ax.imshow(frame, vmax=np.amax(frame))
        if (part_file_id == -1) and (num_frames == 0):
            ax.set(title = 'Showing aggregate image sum of all part files combined')
        else:
            ax.set(title = f'Showing sum of {num_frames} frames from part file #{part_file_id}')
        fig.colorbar(im)
        plt.show()

    def start(self, plot_result=True, plot_intermediate=False, interval=50):
        """Returns array containing the aggregate sum of all part files.
        
        Parameters:
        plot_result: True if want the final aggregate sum of all part files to be plotted
        plot_intermediate: True if want data to be plotted every n-th iteration, where n is specified in 'interval'
        interval: n, where data is aggregated and plotted every n-th iteration
        """
        start = time.time()
        is_completed = False
        iteration = 0
        
        job_queue = queue.Queue()
        final_result = {}
        
        while not is_completed:
            num_files_done = 0
            
            for index in range(self._num_parts):
                if (self._part_files[index]._is_read == False): #There's still a job to do
                    data = self._part_files[index].get_next_batch()
                    job_queue.put(data)  
                else:
                    num_files_done += 1
                    if (num_files_done == self._num_parts):
                        is_completed = True
            
            #Do jobs in queue
            while not job_queue.empty():
                summed_frame = self.sum_frames(*job_queue.get(), final_result)
            
            iteration += 1
            if (iteration % interval == 0):
                print(f"Time elapsed (Iteration #{iteration}): {time.time()-start} seconds")
                if plot_intermediate:
                    agg = self.combine_part_files(final_result)
                    self.plot_frame(agg, -1, 0)
            
        #Compute and plot aggregate image sum of all part files
        print("Getting aggregate...")
        agg_frame = self.combine_part_files(final_result)
        if plot_result:
            self.plot_frame(agg_frame, -1, 0)
        print(f"Total time elapsed: {time.time()-start} seconds")
            
        return agg_frame

ReCoDeViewer can be initialised and started by calling the method as shown below. Only the final result will be returned in the form of an array.

In [None]:
_data_folder = '/scratch/loh/abhik/2Sep2020/captures/'
_tag = 'streampix_2k_60fps_run_4'
_num_part_files = 10

viewer = ReCoDeViewer(_data_folder, _tag + '.rc1', _num_part_files)
final_result = viewer.start(interval=1, plot_intermediate=True)

---

If a large amount of data needs to be processed, it is recommended to use multiprocessing for faster results. __ReCoDeViewer_MP__ is based on __ReCoDeViewer__ but adapted to support multiprocessing. It can be initialised and started in the same way as ReCoDeViewer.

In [4]:
class ReCoDeViewer_MP(ReCoDeViewer):
    def __init__(self, folder_path, file_name, num_parts, batch_size=60):
        super().__init__(folder_path, file_name, num_parts, batch_size)
        
    def start(self, plot_result=True, num_threads=5, plot_intermediate=False, interval=50):
        """Returns array containing the aggregate sum of all part files.
        
        Parameters:
        plot_result: True if want the final aggregate sum of all part files to be plotted
        num_threads: Number of processes available for multiprocessing
        plot_intermediate: True if want data to be plotted every n-th iteration, where n is specified in 'interval'
        interval: n, where data is aggregated and plotted every n-th iteration
        """
        
        start = time.time()
        is_completed = False
        iteration = 0
        
        job_queue = multiprocessing.Queue()
        manager = multiprocessing.Manager()
        final_result = manager.dict()
        processes = []
        
        while not is_completed:
            num_files_done = 0
            
            for index in range(self._num_parts):
                if (self._part_files[index]._is_read == False):
                    data = self._part_files[index].get_next_batch()
                    job_queue.put(data)
                    
                else:
                    num_files_done += 1
                    if (num_files_done == self._num_parts):
                        is_completed = True
                        job_queue.close()
                        job_queue.join_thread()

            while not job_queue.empty():
                for i in range(num_threads):
                    if job_queue.empty():
                        break

                    p = multiprocessing.Process(target=ReCoDeViewer.sum_frames, args=(self, *job_queue.get(), final_result))
                    processes.append(p)
                    p.start()

                for p in processes:
                    p.join()
                    p.terminate()
                
                processes.clear()
            
            iteration += 1
            if (iteration % interval == 0):
                print(f"Time elapsed (Iteration #{iteration}): {time.time()-start} seconds")
                if plot_intermediate:
                    agg = self.combine_part_files(final_result)
                    self.plot_frame(agg, -1, 0) 
            
        #Compute and plot aggregate image sum of all part files
        print("Getting aggregate...")
        agg_frame = self.combine_part_files(final_result)
        if plot_result:
            self.plot_frame(agg_frame, -1, 0)
        print(f"Total time elapsed: {time.time()-start} seconds")
            
        return agg_frame

In [None]:
_data_folder = '/scratch/loh/abhik/2Sep2020/captures/'
_tag = 'streampix_2k_60fps_run_4'
_num_part_files = 10

viewer_MP = ReCoDeViewer_MP(_data_folder, _tag + '.rc1', _num_part_files)
final_result_MP = viewer_MP.start(interval=1, plot_intermediate=True)