# Creation of Doppler spectrograms from raw data

In [0]:
# Uncomment to set matplotlib backend (much more efficient as no longer showing plot)
# import matplotlib
# matplotlib.use('Agg')

In [0]:
# Needed for notebook version
%matplotlib inline

In [0]:
import os
if os.getcwd() == '/content':
    from google.colab import drive
    drive.mount('/content/gdrive')
    BASE_PATH = '/content/gdrive/My Drive/Level-4-Project/'
    os.chdir('gdrive/My Drive/Level-4-Project/')
    
elif os.getcwd() == 'D:\\Google Drive\\Level-4-Project\\notebooks':
    BASE_PATH = "D:/Google Drive/Level-4-Project/"
    
else:
    BASE_PATH = "/export/home/2192793m/Level-4-Project/"
    
INTERIM_PATH = BASE_PATH + 'data/interim/'
PROCESSED_PATH = BASE_PATH + 'data/interim/doppler_spectrograms/'
if not os.path.exists(PROCESSED_PATH):
    os.makedirs(PROCESSED_PATH)

In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import mlab
from matplotlib import colors
from scipy.signal import butter, lfilter

In [0]:
WINDOW_LENGTH = 3  # 3 second window

### Function to aid processing Labels.csv

In [0]:
def find_label(movement):
    """
    Convert movement description to one word label
    :param movement: movement description from experiment notes
    :type movement: str
    :return: one word label
    :rtype: str
    """
    if movement == "Walking":
        return "walking"
    if movement == "Moving arm faster towards radar, slower away":
        return "pushing"
    if movement == "Sitting and standing":
        return "sitting"
    if movement == "Moving arm slower towards radar, faster away":
        return "pulling"
    if movement == "Circling arm forwards":
        return "circling"
    if movement == "Clapping":
        return "clapping"
    if movement == "Bending to pick up and back up":
        return "bending"


def identify_angle(angle):
    """
    Strips " deg" from input
    For example:
    "0 deg" would return "0"
    :param angle: angle in format "0 deg"
    :type angle: str
    :return: angle
    :rtype: str
    """
    return angle.split()[0]


def is_on_place(angle):
    """
    Identifies if measurement has "on place" flag for it's aspect angle
    :param angle: angle in format "0 deg"
    :type angle: str
    :return: if angle measurement is "on place"
    :rtype: bool
    """
    if len(angle.split()) > 2:
        return True
    return False


def assign_user_label(name):
    """
    Takes in subjects name and returns a letter to represent that subject
    :param name: 
    :type name: str
    :return: Letter to represent subject
    :rtype: str 
    """
    if name == "Aleksandar":
        return "A"
    if name == "Francesco":
        return "B"
    if name == "Nadezhda":
        return "C"
    if name == "Leila":
        return "D"
    if name == "Hadi":
        return "E"
    if name == "Ivelina":
        return "F"

### Function to make a directory for the spectrograms to go in

In [0]:
def make_directory(interim_path, window_size, user_label, angle_label, action_label):
    """
    Make a directory path for the spectrograms to go in and return that path
    :param interim_path: Path to interim directory
    :type interim_path: str
    :param window_size: Size of window used
    :type window_size: int
    :param user_label: Subject letter (A-F)
    :type user_label: str
    :param angle_label: Aspect Angle (0, 30, 45 or 60)
    :type angle_label: str
    :param action_label: Action type
    :type action_label: str
    :return: directory path to put spectrogram in
    :rtype: str
    """
    #  interim/window_size/user_label/angle_label/action_label
    window_directory = interim_path + str(window_size)
    if not os.path.exists(window_directory):
        os.makedirs(window_directory)

    user_directory = window_directory + "/" + user_label
    if not os.path.exists(user_directory):
        os.makedirs(user_directory)

    angle_directory = user_directory + "/" + angle_label
    if not os.path.exists(angle_directory):
        os.makedirs(angle_directory)

    action_directory = angle_directory + "/" + action_label
    if not os.path.exists(action_directory):
        os.makedirs(action_directory)

    return action_directory

### Function to compute spectrograms from the raw data

In [0]:
def make_spectrograms(df, window_length):
    """
    Create an array of spectrograms from a 60 second radar recording 
    (based off of the code in "03_data_processing_demonstration.ipynb")
    :param df: Data frame containing the radar measurements
    :type df: DataFrame
    :param window_length: Length to make the spectrograms
    :type window_length: int
    :return: array of spectrograms
    :rtype: array of spectrograms
    """
    # Grab RADAR settings from top of file
    center_frequency = float(df.iloc[1])  # 5800000000Hz (5.6 GHz)
    sweep_time = float(df.iloc[2]) / 1000  # convert to seconds (0.001 seconds)
    number_of_time_samples = float(df.iloc[3])  # 128
    bandwidth = float(df.iloc[4])  # 400000000Hz (400 MHz)
    sampling_frequency = number_of_time_samples / sweep_time
    '''
    record length = 60s
              = 60000 chirps with sweep time of 1ms
              = (7680000 measurements / 128 time samples) with sweep time of 1ms
    '''
    record_length = (len(df.iloc[5:])/number_of_time_samples) * sweep_time

    number_of_chirps = record_length / sweep_time  # 60000

    # Put data values into an array
    data = df.iloc[5:].apply(complex).values

    # Reshape into chirps over time
    data_time = np.reshape(data, (int(number_of_chirps), int(number_of_time_samples)))
    data_time = np.rot90(data_time)


    win = np.ones((int(number_of_time_samples), data_time.shape[1]))
    # Apply fast fourier transform to give Range FFT
    fft_applied = np.fft.fftshift(np.fft.fft((data_time * win), axis=0), 0)

    # take relevant half (other half appears to contain only noise)
    data_range = fft_applied[1:int(number_of_time_samples / 2), :]

    '''
    Moving Target Indicator (MTI) Filter:
        * Suppress echos from clutter
        * Clutter is stationary or close to stationary
        * The MTI filter is a high pass filter that filters
          out the low Doppler frequencies
    Information taken from
    http://www.diva-portal.se/smash/get/diva2:1143293/FULLTEXT01.pdf section 5.1
    '''
    x = data_range.shape[1]
    # set ns to nearest even number to x
    if x % 2 == 0:
        ns = x
    else:
        ns = x - 1
    data_range_MTI = np.zeros((data_range.shape[0], ns), dtype=np.complex128)
    # create filter
    (b, a) = butter(4, 0.01, btype="high")
    # apply filter
    for i in range(data_range.shape[0]):
        data_range_MTI[i, :ns] = lfilter(b, a, data_range[i, :ns], axis=0)
   
    
    # Spectrogram processing for 2nd FFT to get Doppler FFT
    bin_indl = 5
    bin_indu = 25
    time_window_length = 200
    overlap_factor = 0.95
    overlap_length = np.round(time_window_length * overlap_factor)
    pad_factor = 4
    fft_points = pad_factor * time_window_length

    data_spec_MTI2 = 0
    for rbin in range(bin_indl - 1, bin_indu):
        s, f, t = mlab.specgram(data_range_MTI[rbin, :],
                                Fs=1,
                                window=np.hamming(time_window_length),
                                noverlap=overlap_length,
                                NFFT=time_window_length,
                                mode='complex',
                                pad_to=fft_points)

        data_spec_MTI2 = data_spec_MTI2 + abs(s)

    window_size = int(window_length * 100)
    iterations = data_spec_MTI2.shape[1] - window_size
    step_size = 10  # 0.1 seconds
    spectrograms = []
    for i in range(0, iterations, step_size):
        center = int(data_spec_MTI2.shape[0]/2)
        data_spec_small = data_spec_MTI2[(center-150):(center+150), i:(i + window_size)]
        spectrograms.append(data_spec_small)

    return spectrograms

## Create spectrograms from the raw data files

In [None]:
df_labels = pd.read_csv(INTERIM_PATH + 'Labels.csv')
df_labels.rename(columns={'dataset ID': 'dataset_id'}, inplace=True)
df_labels["label"] = df_labels.movement.apply(find_label)
df_labels["user_label"] = df_labels.person.apply(assign_user_label)
df_labels["aspect_angle"] = df_labels.angle.apply(identify_angle)
df_labels["on_place"] = df_labels.angle.apply(is_on_place)

In [0]:
image_width = 150
image_height = 150
minimum_value = 35
norm = colors.Normalize(vmin=minimum_value, vmax=None, clip=True)

In [0]:
number_of_rows = df_labels.shape[0]
current_row = 1
for row in df_labels.itertuples():
    print("Processing row", current_row, "of", number_of_rows)
    file_name = INTERIM_PATH + "Dataset_" + str(row.dataset_id) + ".dat"
    file_path = make_directory(
        PROCESSED_PATH, WINDOW_LENGTH, row.user_label, row.aspect_angle, row.label)
    
    radar_df = pd.read_csv(file_name, header=None)[1]
    
    spectrograms = make_spectrograms(radar_df, WINDOW_LENGTH)
#     current_row += 1
    count = 1
    for spectrogram in spectrograms:
        fig = plt.figure(frameon=False)
        fig.set_size_inches(image_width, image_height)
        ax = plt.Axes(fig, [0., 0., 1., 1.])
        ax.set_axis_off()
        fig.add_axes(ax)
        ax.imshow(20 * np.log10(abs(spectrogram)), cmap='jet', norm=norm)
        fig.savefig(file_path + "/" + str(current_row) + "_" + str(count)+".png", dpi=1)
        plt.close(fig)
        count += 1

    current_row += 1

Processing row 1 of 123


KeyboardInterrupt: 