This notebook contains a class for processing an FMCW RADAR data capture. 

In [1]:
import h5py
import numpy as np
import os
import imageio
from datetime import datetime
from scipy.signal import find_peaks, correlate
from scipy.ndimage import convolve
import matplotlib.pyplot as plt


In [2]:
class FMCWRADARDataCapture:
    """
    Class for handling the capture, processing, and saving of FMCW RADAR data.

    This class is designed to load Frequency-Modulated Continuous-Wave (FMCW) RADAR data from a specified HDF5 file,
    process the data into a usable format, and save it as a NumPy file (either .npy or .npz format).

    Attributes:
        file_path (str): Path to the HDF5 file containing the RADAR data.
    """

    def __init__(self, file_path):
        """
        Initializes the FMCWRADARDataCapture class with the specified file path.

        Args:
            file_path (str): Path to the HDF5 file to be loaded and processed.
        """
        
        if not os.path.isfile(file_path):
            raise FileNotFoundError(f"The file '{file_path}' does not exist.")

        self.file_path = file_path
        self.output_path = self.file_path.replace("_Data", "_Data_NP")

    def load_and_save(self, output_path=None, format='npy', save_npy = False):
        if output_path is None:
            output_path = self.output_path 
            output_path = os.path.splitext(output_path)[0]
            
        # Ensure the directory of the output path exists
        output_dir = os.path.dirname(output_path)
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        dataCubes = np.zeros((4, 1000, 128, 256))
        

        # Open the HDF5 file for reading
        with h5py.File(self.file_path, 'r') as file:
            # Extract parameters from the file
            FreqStrt = file['/BrdCfg/FreqStrt'][()]
            FreqStop = file['/BrdCfg/FreqStop'][()]
            N = 256  # Number of samples per chirp, assuming it's given or extracted correctly
            Np = 128  # Number of chirps per frame, assuming it's given or extracted correctly

            # Calculate the effective bandwidth
            B = (FreqStop - FreqStrt) / 284 * 256
            
            # Read the If signal
            If = file['/If'][:]
            NrFrms, Nx = If.shape
            
            # Initialize the dataCubes array to hold the processed data for each channel
            dataCubes = np.zeros((4, NrFrms, N, Np))
            
            # Process each frame
            for frame_idx in range(NrFrms-1):
                print(f'Processing RD Frame: {frame_idx + 1}')
                for channel_idx in range(4):
                    start_idx = channel_idx * N * Np
                    end_idx = (channel_idx + 1) * N * Np
                    # Correctly reshape and assign the data to the corresponding channel and frame
                    reshaped_data = If[frame_idx, start_idx:end_idx].reshape(Np, N)
                    dataCubes[channel_idx, frame_idx, :, :] = reshaped_data.transpose()  # Transpose operation

            # Verify by plotting the first channel of the first frame
            plt.imshow(dataCubes[0, 0, :, :])
            plt.colorbar()
            plt.show()
        
            if save_npy:
                # Save data in the specified format
                if format == 'npy':
                    np.save(output_path, dataCubes)
                elif format == 'npz':
                    np.savez(output_path, dataCubes)
                else:
                    raise ValueError("Unsupported format. Use 'npy' or 'npz'.")
            
        return dataCubes

    @staticmethod
    def rawDataToDataCube(rawData, numFrames, numChirpsPerFrame, numSamplesPerChirp, numAntennas):
        # Reshape and rearrange the rawData
        matrixData = rawData.T.reshape(numChirpsPerFrame * numSamplesPerChirp, numFrames * numAntennas)
        dataCubes = np.zeros((numFrames, numChirpsPerFrame, numSamplesPerChirp, numAntennas))

        for frame in range(numFrames):
            for antenna in range(numAntennas):
                chirps = matrixData[:, frame * numAntennas + antenna]
                chirpsMatrix = chirps.reshape(numSamplesPerChirp, numChirpsPerFrame)
                dataCubes[frame, :, :, antenna] = chirpsMatrix.T
                

        return dataCubes.transpose((3,0,1,2))
    
    def range_doppler_processing(self, dataCubes):
        """
        Processes each frame in the dataCube for each channel to generate Range-Doppler Maps.

        Args:
            dataCube (np.ndarray): The raw data cubes to be processed.

        Returns:
            np.ndarray: Array of processed Range-Doppler Maps for each channel.
        """
        n_channels, n_frames, n_bins, n_doppler = dataCubes.shape
        rdm_all_channels = []

        # Define a window function for the range and Doppler dimensions
        range_window = np.hanning(n_bins)
        doppler_window = np.hanning(n_doppler)

        for channel_idx in range(n_channels):
            rdm_list = []
            for frame_idx in range(n_frames):
                # Extract current data for the frame and channel
                current_data = dataCubes[channel_idx, frame_idx, :, :]

                # Apply the Hanning window function
                windowed_data = np.outer(range_window, doppler_window) * current_data

                # Apply 2D FFT and shift
                rdm = np.fft.fft2(windowed_data)
                rdm = np.fft.fftshift(rdm, axes=1)  # Shift along the Doppler axis (second axis in Python)

                # Take the absolute value
                rdm = np.abs(rdm)
                
                # Normalize the data before logarithmic scaling
                rdm_max = np.max(rdm)
                rdm_min = np.min(rdm)
                rdm = (rdm - rdm_min) / (rdm_max - rdm_min + 1e-3)  # Avoid division by zero

                # Slice the RDM to remove the mirrored part (keep only one half)
                # Assuming the mirrored part is along the Range axis (axis 0)
                half_index = rdm.shape[0] // 2
                rdm = rdm[:half_index, :]

                # Append to the list for the current channel
                rdm_list.append(rdm)

            # Append the result for the current channel
            rdm_all_channels.append(rdm_list)

        return np.array(rdm_all_channels)
    
    
    def angle_of_arrival_processing(self, dataCube):
        """
        Processes each frame in the dataCube for each channel to generate Angle of Arrival (AoA) heatmaps.

        Args:
            dataCube (np.ndarray): The raw data cubes to be processed.

        Returns:
            np.ndarray: Array of processed AoA heatmaps for each channel.
        """
        n_channels, n_frames, n_bins, n_elements = dataCube.shape
        aoa_all_channels = []

        # Define a window function for the spatial dimension (assuming ULA)
        spatial_window = np.hanning(n_elements)

        for channel_idx in range(n_channels):
            aoa_list = []
            for frame_idx in range(n_frames):
                # Extract current data for the frame and channel
                current_data = dataCube[channel_idx, frame_idx, :, :]

                # Apply the window function to the antenna elements
                windowed_data = current_data * spatial_window

                # Apply 1D FFT along the spatial dimension (assuming antenna elements are along the last axis)
                aoa_spectrum = np.fft.fft(windowed_data, axis=1)
                aoa_spectrum = np.fft.fftshift(aoa_spectrum, axes=1)  # Shift to center the zero-frequency component

                # Take the absolute value and normalize
                aoa_spectrum = np.abs(aoa_spectrum)
                aoa_spectrum_max = np.max(aoa_spectrum)
                aoa_spectrum_min = np.min(aoa_spectrum)
                aoa_normalized = (aoa_spectrum - aoa_spectrum_min) / (aoa_spectrum_max - aoa_spectrum_min + 1e-3)

                # Append to the list for the current channel
                aoa_list.append(aoa_normalized)

            # Append the result for the current channel
            aoa_all_channels.append(aoa_list)

        return np.array(aoa_all_channels)
    
    
    def generate_actuator_filter(self, dataCubes):
        """
        Processes each frame in the dataCube for each channel to generate Range-Doppler Maps.

        Args:
            dataCube (np.ndarray): The raw data cubes to be processed.

        Returns:
            np.ndarray: Array of processed Range-Doppler Maps for each channel.
        """
        n_channels, n_frames, n_bins, n_doppler = dataCubes.shape
        rdm_all_channels = []

        # Define a window function for the range and Doppler dimensions
        range_window = np.hanning(n_bins)
        doppler_window = np.hanning(n_doppler)

        for channel_idx in range(n_channels):
            rdm_list = []
            for frame_idx in range(n_frames):
                # Extract current data for the frame and channel
                current_data = dataCubes[channel_idx, frame_idx, :, :]

                # Apply the Hanning window function
                windowed_data = np.outer(range_window, doppler_window) * current_data

                # Apply 2D FFT and shift
                rdm = np.fft.fft2(windowed_data)
                rdm = np.fft.fftshift(rdm, axes=1)  # Shift along the Doppler axis (second axis in Python)

                # Take the absolute value
                rdm = np.abs(rdm)
                
                # Normalize the data before logarithmic scaling
                rdm_max = np.max(rdm)
                rdm_min = np.min(rdm)
                rdm = (rdm - rdm_min) / (rdm_max - rdm_min + 1e-3)  # Avoid division by zero

                # Slice the RDM to remove the mirrored part (keep only one half)
                # Assuming the mirrored part is along the Range axis (axis 0)
                half_index = rdm.shape[0] // 2
                rdm = rdm[:half_index, :]

                # Append to the list for the current channel
                rdm_list.append(rdm)

            # Append the result for the current channel
            rdm_all_channels.append(rdm_list)

        return np.array(rdm_all_channels)[0,50:105,:,:]
    
    def manual_correlation(self, pattern, data, start_frame, end_frame):
        pattern_normalized = (pattern - np.mean(pattern)) / np.std(pattern)
        data_segment = data[start_frame:end_frame + 1]  # Plus 1 because the upper bound is exclusive
        data_segment_normalized = (data_segment - np.mean(data_segment)) / np.std(data_segment)

        correlation_score = np.sum(pattern_normalized * data_segment_normalized)

        return correlation_score


    def slide_pattern_over_data(self, pattern, data):
        if data.ndim == 4:
            data = data[0,:,:,:]
        
        # Normalize the pattern and data (z-scoring)
        pattern_mean = np.mean(pattern)
        pattern_std = np.std(pattern)
        # Check if standard deviation is zero and handle it
        pattern_normalized = (pattern - pattern_mean) / pattern_std if pattern_std else pattern - pattern_mean
        
        data_mean = np.mean(data, axis=(1, 2), keepdims=True)
        data_std = np.std(data, axis=(1, 2), keepdims=True)
        # Check if standard deviation is zero and handle it
        data_normalized = (data - data_mean) / (data_std + 1e-8)  # Added a small constant to avoid division by zero

        # Check for NaN or inf values
        if np.isnan(data_normalized).any() or np.isinf(data_normalized).any():
            raise ValueError("Data contains NaN or infinite values after normalization.")

        # Perform 3D correlation
        match_scores = correlate(data_normalized, pattern_normalized, mode='valid', method='auto')
        
        # Sum over the spatial dimensions to get a single match score per frame position
        match_scores_summed = match_scores.sum(axis=(1, 2))

                
        # Find peaks in the match scores
        peaks, properties = find_peaks(match_scores_summed)

        # Ensure 'peak_heights' is in properties, otherwise get the height of all peaks
        if 'peak_heights' not in properties:
            properties['peak_heights'] = match_scores_summed[peaks]

        # Sort the peaks by their height
        sorted_peaks = np.argsort(properties['peak_heights'])[::-1]

        # Filter out peaks that are within 100 frames of each other
        filtered_peaks = []
        for peak_index in sorted_peaks:
            peak = peaks[peak_index]
            if not any(abs(peak - p) <= 100 for p in filtered_peaks):
                filtered_peaks.append(peak)
            if len(filtered_peaks) == 2:  # Stop when the top two valid peaks are found
                break

                
        # Return the indices of the peaks as well as the match scores for plotting or further analysis
        return filtered_peaks, match_scores_summed



    def plot_match_scores(self, match_scores):
        """
        Plots the match scores to help identify where the best matches are.

        Args:
            match_scores (np.ndarray): Array of match scores.
        """
        plt.figure(figsize=(10, 6))
        plt.plot(match_scores, marker='o', linestyle='-')
        plt.title('Pattern Match Score Across Frames')
        plt.xlabel('Frame Position')
        plt.ylabel('Match Score')
        plt.grid(True)
        plt.show()


    def create_gif(self, data, gif_filename, duration=0.038596):
        """
        Creates a GIF from the provided data and saves it to the 'data/gifs' directory within the current working directory.

        Args:
            data (np.ndarray): 3D or 4D array containing the image data.
            gif_filename (str): Filename for the GIF, without path.
            duration (float): Duration of each frame in the GIF.
        """
        # Define the directory to save GIFs
        gif_dir = os.path.join(os.getcwd(), 'data/gifs')
        # Create the directory if it doesn't exist
        os.makedirs(gif_dir, exist_ok=True)
        # Full path for the GIF
        gif_path = os.path.join(gif_dir, gif_filename)
        
        if data.ndim == 4:
            data = data[0,:,:,:]

        with imageio.get_writer(gif_path, mode='I', duration=duration) as writer:
            if data.ndim == 3:  # If data is 3D, treat it as a sequence of 2D frames
                for i in range(data.shape[0]):
                    # Convert the data to an image (you might need to scale/normalize)
                    frame = data[i, :, :].T
                    plt.imshow(frame, cmap='gray')  # Use appropriate colormap
                    plt.axis('off')  # Turn off axis
                    plt.savefig('temp_frame.png', bbox_inches='tight', pad_inches=0)
                    plt.close()
                    writer.append_data(imageio.imread('temp_frame.png'))
            # elif data.ndim == 4:  # If data is 4D, process each channel separately
            #     for channel in range(data.shape[0]):
            #         for i in range(data.shape[1]):
            #             frame = data[channel, i, :, :].T
            #             plt.imshow(frame, cmap='gray')  # Use appropriate colormap
            #             plt.axis('off')  # Turn off axis
            #             plt.savefig('temp_frame.png', bbox_inches='tight', pad_inches=0)
            #             plt.close()
            #             writer.append_data(imageio.imread('temp_frame.png'))

        # Remove temporary frame image file
        os.remove('temp_frame.png')

        return gif_path  # Return the path where the GIF was saved



