In [95]:
import numpy as np



In [96]:
class Buffer:
    """
    A class to buffer data streams in batches (NumPy arrays).
    """
    def __init__(self, input_buffer, nfeatures, dtype):
        self.buffer = {'data': np.empty((0, nfeatures), dtype=dtype),  # Empty array with room for buffer size
                       'times': np.empty((0, ), dtype=dtype)}  # Empty array with room for buffer size
        self.input_buffer = input_buffer
        self.nfeatures = nfeatures
        self.dtype = dtype

    def add_batch(self, data, times):
        """
        Adds a batch of data points to the buffer and returns remaining data.

        Args:
          data: A NumPy array of shape (batch_size, n_features).

        Returns:
          A NumPy array of remaining data points that couldn't fit in the buffer.
        """

        # Add data to the buffer
        self.buffer['data'] = np.vstack((self.buffer['data'],  data))
        self.buffer['times'] = np.hstack((self.buffer['times'], times))

        # Check if buffer needs flushing
        return self.flush()

    def flush(self):
        """
        Calls the provided program function with the buffered data and clears the buffer.

        Returns:
          data: A NumPy array of flushed data points.
          times: A NumPy array of corresponding times.
        """
        if self.buffer['data'].shape[0] >= self.input_buffer:
            data = self.buffer['data'][:self.input_buffer]  # Slice to get requested elements
            times = self.buffer['times'][:self.input_buffer]  # Slice to get requested elements
            self.buffer['data'] = self.buffer['data'][self.input_buffer:]  # Remove returned elements
            self.buffer['times'] = self.buffer['times'][self.input_buffer:]  # Remove returned elements
            return data, times

        return np.empty((0, self.nfeatures), dtype=self.dtype), np.empty((0, ), dtype=self.dtype)  # Empty buffer if no data




In [97]:
# Test the Buffer class
input_buffer = 4  # Buffer size
buffer = Buffer(input_buffer, nfeatures=2, dtype=np.float64)

# Add batches of data
data_batch_1 = np.array([[1, 2], [3, 4], [5, 6]])
times_batch_1 = times = np.arange(1, 1 + 3)
data_batch_2 = np.array([[7, 8], [9, 10], [11, 12]])
times_batch_2 = times = np.arange(1, 1 + 3)
data_batch_3 = np.array([[13, 14], [15, 16], [17, 18]])
times_batch_3 = times = np.arange(1, 1 + 3)

data, times = buffer.add_batch(data_batch_1, times_batch_1)
while data.shape[0]>0:
    print("data:", data)
    print("times:", times)
    data, times = buffer.flush()

data, times = buffer.add_batch(data_batch_2, times_batch_2)
while data.shape[0]>0:
    print("data:", data)
    print("times:", times)
    data, times = buffer.flush()
    

# flushed_data, flushed_times = buffer.flush()
# print("Flushed data:", flushed_data)
# print("Flushed times:", flushed_times)

data, times = buffer.add_batch(data_batch_3, times_batch_3)
while data.shape[0]>0:
    print("data:", data)
    print("times:", times)
    data, times = buffer.flush()
    

data: [[1. 2.]
 [3. 4.]
 [5. 6.]
 [7. 8.]]
times: [1. 2. 3. 1.]
data: [[ 9. 10.]
 [11. 12.]
 [13. 14.]
 [15. 16.]]
times: [2. 3. 1. 2.]


In [98]:
# Test the Buffer class
input_buffer = 4  # Buffer size
buffer = Buffer(input_buffer, nfeatures=1, dtype=np.float64)


# Add batches of data
data_batch_1 = np.array([[1], [3], [5]])
times_batch_1 = times = np.arange(1, 1 + 3)
data_batch_2 = np.array([[1], [2], [5]])
times_batch_2 = times = np.arange(1, 1 + 3)
data_batch_3 = np.array([[1], [4], [5]])
times_batch_3 = times = np.arange(1, 1 + 3)

data, times = buffer.add_batch(data_batch_1, times_batch_1)
while data.shape[0]>0:
    print("data:", data)
    print("times:", times)
    data, times = buffer.flush()
print("")
data, times = buffer.add_batch(data_batch_2, times_batch_2)
while data.shape[0]>0:
    print("data:", data)
    print("times:", times)
    data, times = buffer.flush()
print("")

# flushed_data, flushed_times = buffer.flush()
# print("Flushed data:", flushed_data)
# print("Flushed times:", flushed_times)
print("")
data, times = buffer.add_batch(data_batch_3, times_batch_3)
while data.shape[0]>0:
    print("data:", data)
    print("times:", times)
    data, times = buffer.flush()


data: [[1.]
 [3.]
 [5.]
 [1.]]
times: [1. 2. 3. 1.]


data: [[2.]
 [5.]
 [1.]
 [4.]]
times: [2. 3. 1. 2.]


In [99]:
data_batch_1.shape[1]

1