In [None]:
# !conda install openradar scipy torch scikit-learn tensorflow-gpu -y

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
%cd /content/drive/My Drive/Colab Notebooks/

/content/drive/My Drive/Colab Notebooks


In [None]:
%ls

2plus1.ipynb      _data.zip       [0m[01;34mlogs[0m/                  splitsets.keras
best_model.h5     data.zip        radar_cnn_model.h5     Untitled
best_model.keras  heatmaps.ipynb  radar_cnn_model.keras


In [None]:
%mkdir /content/data/

In [None]:
%cp data.zip /content/

In [None]:
!unzip /content/data.zip -d /content/

Archive:  /content/data.zip
   creating: /content/data/fall/
   creating: /content/data/fall/00/
  inflating: /content/data/fall/00/adc_data_LogFile.txt  
  inflating: /content/data/fall/00/adc_data_Raw_0.bin  
  inflating: /content/data/fall/00/adc_data_Raw_LogFile.csv  
   creating: /content/data/fall/01/
  inflating: /content/data/fall/01/adc_data_LogFile.txt  
  inflating: /content/data/fall/01/adc_data_Raw_0.bin  
  inflating: /content/data/fall/01/adc_data_Raw_LogFile.csv  
   creating: /content/data/fall/02/
  inflating: /content/data/fall/02/adc_data_LogFile.txt  
  inflating: /content/data/fall/02/adc_data_Raw_0.bin  
  inflating: /content/data/fall/02/adc_data_Raw_LogFile.csv  
   creating: /content/data/fall/03/
  inflating: /content/data/fall/03/adc_data_LogFile.txt  
  inflating: /content/data/fall/03/adc_data_Raw_0.bin  
  inflating: /content/data/fall/03/adc_data_Raw_LogFile.csv  
   creating: /content/data/fall/04/
  inflating: /content/data/fall/04/adc_data_LogFile.txt

In [None]:
import numpy as np
# from mmwave.dataloader import DCA1000
import matplotlib.pyplot as plt
from scipy.signal import get_window
import os

params = {
    'c': 3e8,
    'k': 5e13,
    'fs': 1e7,
    'fft_range': 256,
    'fft_doppler': 128,
    'num_chirps': 128,
    'num_samples': 256,
    'num_rx': 4,
    'num_tx': 3
}

In [None]:
def plot_heatmap(heatmap, range_bins, doppler_bins, frame_idx=0):
    """Plot range-doppler heatmap using matplotlib"""
    plt.figure(figsize=(12, 6))
    plt.imshow(20 * np.log10(heatmap[frame_idx]),
               aspect='auto',
               extent=[range_bins[0], range_bins[-1],
                       doppler_bins[0], doppler_bins[-1]],
               cmap='jet')
    plt.xlabel('Range (m)')
    plt.ylabel('Velocity (m/s)')
    plt.colorbar(label='dB')
    plt.title(f'Range-Doppler Heatmap (Frame {frame_idx})')
    plt.show()

Process raw radar data to generate range-doppler heatmaps

Args:
    raw_bytes: Raw binary data from file (bytes object)\
    params: Dictionary containing radar parameters:

        {
            'c': speed of light,
            'k': chirp slope (Hz/s),
            'fs': sampling frequency,
            'fft_range': range FFT size,
            'fft_doppler': doppler FFT size,
            'num_chirps': chirps per frame,
            'num_samples': samples per chirp,
            'num_rx': number of receivers,
            'num_tx': number of transmitters
        }

Returns:
    Tuple of (range_doppler_maps, range_bins, doppler_bins)

In [None]:
def process_radar_data(data, params):
    # Convert bytes to complex samples
    # data = np.frombuffer(raw_bytes, dtype=np.int16)
    iq = data[::2] + 1j * data[1::2]

    # Reshape into frames/chirps/samples
    num_channels = params['num_tx'] * params['num_rx']
    samples_per_frame = params['num_chirps'] * params['num_samples'] * num_channels
    num_frames = len(iq) // samples_per_frame
    iq = iq[:num_frames * samples_per_frame]

    # Reshape to (frames, chirps, samples, channels)
    frame_data = iq.reshape(num_frames, params['num_chirps'],
                            params['num_samples'], num_channels)

    # Initialize output
    range_doppler_maps = []

    # Create windows
    range_win = get_window('hamming', params['num_samples'])
    doppler_win = get_window('hamming', params['num_chirps'])

    for frame in frame_data:
        # Process each channel (simple sum across channels)
        channel_sum = np.sum(frame, axis=-1)

        # Range FFT processing
        range_win_mat = np.tile(range_win, (params['num_chirps'], 1))
        range_fft = np.fft.fft(channel_sum * range_win_mat,
                               n=params['fft_range'], axis=1)
        range_fft = np.fft.fftshift(range_fft, axes=1)

        # Doppler FFT processing
        doppler_win_mat = np.tile(doppler_win, (params['fft_range'], 1)).T
        doppler_fft = np.fft.fft(range_fft * doppler_win_mat,
                                 n=params['fft_doppler'], axis=0)
        doppler_fft = np.fft.fftshift(doppler_fft, axes=0)

        range_doppler_maps.append(np.abs(doppler_fft))

    # Calculate physical units
    range_res = params['c'] / (2 * params['k'] * params['fft_range'])
    doppler_res = params['c'] / (2 * params['num_chirps'] * params['k'] * 1 / params['fs'])

    range_bins = np.arange(-params['fft_range'] // 2, params['fft_range'] // 2) * range_res
    doppler_bins = np.arange(-params['fft_doppler'] // 2, params['fft_doppler'] // 2) * doppler_res

    return np.array(range_doppler_maps), range_bins, doppler_bins

In [None]:
# # Read binary data
# adc_data = np.fromfile('./content/data/fall/79/adc_data_Raw_0.bin', dtype=np.uint16)

# # Process data
# heatmaps, range_bins, doppler_bins = process_radar_data(adc_data, params)

# # Plot first frame
# plot_heatmap(heatmaps, range_bins, doppler_bins, frame_idx=0)

In [None]:
# according to the process above, create a function for data preprocessing,
# read folder structure in the data folder
def preprocess_data(fall_folder_path, walk_folder_path):
    adc_data_list = np.array([], dtype=complex)
    label_list = []
    for folder_path, label in [(fall_folder_path, 1), (walk_folder_path, 0)]:
        count = 0
        for root, dirs, files in os.walk(folder_path):
            for file in files:
                if file.endswith('.bin'):
                    count += 1
                    if count >= 80:
                        break
                    print(root)
                    bin_path = os.path.join(root, file)
                    raw_data = np.fromfile(bin_path, dtype=np.uint16)
                    adc_data, range_bins, doppler_bins = process_radar_data(raw_data, params)
                    if adc_data_list.size == 0:
                        adc_data_list = adc_data[np.newaxis, ...]
                    else:
                        adc_data_list = np.concatenate((adc_data_list, adc_data[np.newaxis, ...]), axis=0)
                    label_list.append(label)
    return adc_data_list, label_list


fall_folder_path = '/content/data/fall/'
walk_folder_path = '/content/data/nonfall/'
adc_data_list, label_list = preprocess_data(fall_folder_path, walk_folder_path)

/content/data/fall/73
/content/data/fall/00
/content/data/fall/19
/content/data/fall/77
/content/data/fall/41
/content/data/fall/16
/content/data/fall/06
/content/data/fall/60
/content/data/fall/24
/content/data/fall/74
/content/data/fall/10
/content/data/fall/79
/content/data/fall/54
/content/data/fall/04
/content/data/fall/39
/content/data/fall/57
/content/data/fall/46
/content/data/fall/11
/content/data/fall/62
/content/data/fall/37
/content/data/fall/14
/content/data/fall/80
/content/data/fall/03
/content/data/fall/69
/content/data/fall/18
/content/data/fall/34
/content/data/fall/13
/content/data/fall/52
/content/data/fall/21
/content/data/fall/20
/content/data/fall/66
/content/data/fall/32
/content/data/fall/25
/content/data/fall/42
/content/data/fall/44
/content/data/fall/56
/content/data/fall/33
/content/data/fall/51
/content/data/fall/23
/content/data/fall/43
/content/data/fall/48
/content/data/fall/36
/content/data/fall/02
/content/data/fall/08
/content/data/fall/31
/content/d

In [None]:
adc_data_list.shape

(158, 100, 128, 256)

In [None]:
# Import necessary libraries
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score

# Assuming adc_data_list and label_list are already defined
# adc_data_list: List of radar data arrays with shape (100, 384, 4, 256)
# label_list: List of corresponding labels (0 or 1)
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    # Restrict TensorFlow to only use the first GPU
    try:
        tf.config.set_visible_devices(gpus[0], 'GPU')
    except:
        print('er')

In [None]:
augmented_list = np.flip(adc_data_list, 2)
final_data_list = np.concatenate((adc_data_list, augmented_list), axis=0)
final_label_list = label_list + label_list

In [None]:

# Convert the list to a numpy array
X = np.abs(np.array(final_data_list, dtype=complex)[..., np.newaxis])  # Shape: (num_samples, 100, 128, 256, 1)
y = np.array(final_label_list, dtype=np.int8)[..., np.newaxis]  # Shape: (num_samples, 1)

In [None]:
X.shape, y.shape

((316, 100, 128, 256, 1), (316, 1))

In [None]:

# Split the dataset into training, testing, and validation sets
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4,)# random_state=42)
X_test, X_val, y_test, y_val = train_test_split(X_temp, y_temp, test_size=0.5,)# random_state=42)


In [None]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
from sklearn.metrics import classification_report
import datetime
from keras import saving
# Configuration
FRAMES = 100
HEIGHT = 256
WIDTH = 128
CHANNELS = 1  # Grayscale channel dimension
BATCH_SIZE = 8
EPOCHS = 100
CLASS_NAMES = ['fall', 'nonfall']


@saving.register_keras_serializable(package="CustomLayers", name="Conv2Plus1D")
class Conv2Plus1D(layers.Layer):
    def __init__(self, filters, kernel_size, **kwargs):
        super().__init__()
        self.spatial_conv = layers.Conv3D(
            filters, (1, kernel_size[1], kernel_size[2]),
            padding='same', activation=None
        )
        self.temporal_conv = layers.Conv3D(
            filters, (kernel_size[0], 1, 1),
            padding='same', activation=None
        )
        self.norm = layers.BatchNormalization()
        self.activation = layers.ReLU()

    def call(self, inputs):
        x = self.spatial_conv(inputs)
        x = self.temporal_conv(x)
        x = self.norm(x)
        return self.activation(x)

@saving.register_keras_serializable(package="CustomLayers", name="ResidualBlock")
class ResidualBlock(layers.Layer):
    def __init__(self, filters, kernel_size, **kwargs):
        super().__init__()
        self.conv1 = Conv2Plus1D(filters, kernel_size)
        self.conv2 = Conv2Plus1D(filters, kernel_size)
        self.projection = layers.Conv3D(filters, 1, strides=1)

    def call(self, inputs):
        residual = inputs
        x = self.conv1(inputs)
        x = self.conv2(x)

        if residual.shape[-1] != x.shape[-1]:
            residual = self.projection(residual)

        return layers.add([residual, x])


def build_model(input_shape):
    inputs = layers.Input(shape=input_shape)

    # Stem
    x = Conv2Plus1D(16, (7, 3, 3))(inputs)
    x = layers.MaxPool3D(pool_size=(2, 4, 4))(x)  # Temporal & spatial downsampling

    # Residual Blocks
    x = ResidualBlock(32, (3, 3, 3))(x)
    x = layers.MaxPool3D((2, 2, 2))(x)
    x = ResidualBlock(64, (3, 3, 3))(x)
    # x = layers.MaxPool3D((2, 2, 2))(x)
    # x = ResidualBlock(128, (3, 3, 3))(x)

    # Head
    x = layers.GlobalAveragePooling3D()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(2, activation='softmax')(x)

    return tf.keras.Model(inputs, outputs)

start_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
def get_callbacks():
    return [
        tf.keras.callbacks.ModelCheckpoint(
            f'{start_time}'+'_epoch_{epoch:03d}.weights.h5',
            save_best_only=True,
            save_weights_only=True,
            save_freq=5,
        ),
        tf.keras.callbacks.TensorBoard(
            log_dir='logs',
            histogram_freq=1,
            profile_batch=0
        ),
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True
        )
    ]


model = build_model((FRAMES, WIDTH, HEIGHT, CHANNELS))
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss='sparse_categorical_crossentropy',
    metrics=[
        'accuracy',
    ]
)


In [None]:
model.summary()

In [None]:
Start training
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=get_callbacks(),
    shuffle=True
)
# model = tf.keras.models.load_model('/content/best_model.keras',custom_objects={'Conv2Plus1D': Conv2Plus1D, 'ResidualBlock':ResidualBlock})




In [None]:
model.evaluate(X_test, y_test)

[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 10s/step - accuracy: 0.8104 - loss: 0.6627


[0.6864119172096252, 0.7936508059501648]

In [None]:
model.save('model.keras')

In [None]:
def generate_reports(model, test_data, test_labels):
    # Quantitative metrics
    test_loss, test_acc = model.evaluate(test_data, test_labels)
    print(f"\nPerformance Metrics:")
    print(f"Accuracy: {test_acc:.2%}")

    # Classification report
    y_pred = model.predict(test_data)
    print("\nClassification Report:")
    print(classification_report(test_labels, np.argmax(y_pred, axis=1),
                                target_names=CLASS_NAMES))


'\nRecommended ranges for tuning:\n- Learning rate: 1e-5 to 1e-3 (exponential decay preferred)\n- Batch size: 4-16 (depends on GPU memory)\n- Filters: Start with 16-32, double after each pooling\n- Dropout rate: 0.3-0.6\n- Kernel sizes: (3,5,5) to (5,7,7)\n- Pooling strategy: Balance between 2x2x2 and 4x4x4\n- Data augmentation: Random crops, flips, temporal shifts\n\nTraining strategy:\n1. Start with small spatial dimensions (192x128)\n2. Gradually increase to full resolution with transfer learning\n3. Use mixup augmentation for better generalization\n4. Apply gradient clipping (norm=1.0) for stability\n'


Recommended ranges for tuning:
- Learning rate: 1e-5 to 1e-3 (exponential decay preferred)
- Batch size: 4-16 (depends on GPU memory)
- Filters: Start with 16-32, double after each pooling
- Dropout rate: 0.3-0.6
- Kernel sizes: (3,n,n) to (7,n,n) and  (m,3,3) to (m,7,7)
- Data augmentation: Random crops, flips, temporal shifts

Training strategy:
1. Start with small spatial dimensions (192x128)
2. Gradually increase to full resolution with transfer learning
3. Use mixup augmentation for better generalization
4. Apply gradient clipping (norm=1.0) for stability



In [None]:
model = tf.keras.models.load_model('/content/best_model.keras',custom_objects={'Conv2Plus1D': Conv2Plus1D, 'ResidualBlock':ResidualBlock})

y_pred = model.predict(X_test)
classification_report(y_test, np.argmax(y_pred, axis=1), target_names=CLASS_NAMES)

  saveable.load_own_variables(weights_store.get(inner_path))


[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 786ms/step


'              precision    recall  f1-score   support\n\n        fall       1.00      0.53      0.70        30\n     nonfall       0.70      1.00      0.82        33\n\n    accuracy                           0.78        63\n   macro avg       0.85      0.77      0.76        63\nweighted avg       0.84      0.78      0.76        63\n'

In [None]:

X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4,)# random_state=42)
X_test, X_val, y_test, y_val = train_test_split(X_temp, y_temp, test_size=0.5,)# random_state=42)
# Generate all reports
generate_reports(model, X_test, y_test)

[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 202ms/step - accuracy: 0.8315 - loss: 0.6865

Performance Metrics:
Accuracy: 82.54%
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 190ms/step

Classification Report:
              precision    recall  f1-score   support

        fall       1.00      0.66      0.79        32
     nonfall       0.74      1.00      0.85        31

    accuracy                           0.83        63
   macro avg       0.87      0.83      0.82        63
weighted avg       0.87      0.83      0.82        63



In [None]:


print(X_test.shape)

# Make predictions on the testing dataset
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)  # Convert probabilities to class labels

# Calculate confusion matrix
conf_matrix = confusion_matrix(y_test, y_pred_classes)
tn, fp, fn, tp = conf_matrix.ravel()

# Calculate precision, recall, and F1 score
precision = precision_score(y_test, y_pred_classes)
recall = recall_score(y_test, y_pred_classes)
f1 = f1_score(y_test, y_pred_classes)
accuracy = accuracy_score(y_test, y_pred_classes)
# Print results

print(f'No. Predictions: {y_pred.shape[0]}')
print(f"True Positives: {tp}")
print(f"True Negatives: {tn}")
print(f"False Positives: {fp}")
print(f"False Negatives: {fn}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f'Accuracy: {accuracy:.4f}')

(63, 100, 128, 256, 1)
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 188ms/step
No. Predictions: 63
True Positives: 33
True Negatives: 16
False Positives: 14
False Negatives: 0
Precision: 0.7021
Recall: 1.0000
F1 Score: 0.8250
Accuracy: 0.7778
