In [104]:
import numpy as np
import os

file_list = os.listdir('/content/SR250Breath')
windows = []
labels = []

for file_name in file_list:
    file_path = os.path.join('/content/SR250Breath', file_name)
    try:
        data = np.load(file_path)

        if file_path.endswith('.window.npy'):
            windows.append(data)
        elif file_path.endswith('.label.npy'):
            labels.append(data)
        else:
            print(f"Unknown file type: {file_path}")
    except Exception as e:
        print(f"Could not load {file_name}: {e}")

# You can access the arrays using the file names as keys, for example:
# print(data_arrays['Millenials_E_breath_sitting_desk_20250909-115853.window.npy'])

In [105]:
windows = np.concat(windows)
labels = np.concat(labels)

print(windows.shape)
print(labels.shape)

(36833, 11, 3, 120)
(36833,)


In [106]:
# in maniera tale che l'ultimo asse corrisponde alle antenne
windows = windows.transpose(0, 1, 3, 2)
print(windows.shape)

(36833, 11, 120, 3)


In [107]:
abs_windows = np.abs(windows)
print(abs_windows.shape)

(36833, 11, 120, 3)


In [108]:
phase_windows = np.angle(windows)

In [109]:
all = np.concatenate((abs_windows, phase_windows), axis=-1)
# all = phase_windows
print(all.shape)

(36833, 11, 120, 6)


In [110]:
import tensorflow as tf

# Assuming 'labels' is your numpy array of labels
# You might need to adjust the 'depth' parameter based on the number of unique classes in your labels
one_hot_labels = tf.one_hot(labels, depth=2) # Assuming 2 classes for demonstration

print(one_hot_labels.shape)

(36833, 2)


In [111]:
from sklearn.model_selection import train_test_split
import numpy as np

# Convert TensorFlow tensor to NumPy array
one_hot_labels_np = one_hot_labels.numpy()

# Split into training and temporary sets (for test and validation)
X_train, X_temp, y_train, y_temp = train_test_split(phase_windows, one_hot_labels_np, test_size=0.3, random_state=42)

# Split the temporary set into test and validation sets
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

print("Training set shapes:")
print("X_train:", X_train.shape)
print("y_train:", y_train.shape)
print("\nValidation set shapes:")
print("X_val:", X_val.shape)
print("y_val:", y_val.shape)
print("\nTest set shapes:")
print("X_test:", X_test.shape)
print("y_test:", y_test.shape)

Training set shapes:
X_train: (25783, 11, 120, 3)
y_train: (25783, 2)

Validation set shapes:
X_val: (5525, 11, 120, 3)
y_val: (5525, 2)

Test set shapes:
X_test: (5525, 11, 120, 3)
y_test: (5525, 2)


# Task
Produce a CNN model for the prediction of the label and train it using the training, test, and validation data split from the `abs_windows` and `labels` variables.

## Define the cnn model architecture

### Subtask:
Define the layers of the CNN model, including convolutional layers, pooling layers, and dense layers, suitable for your data shape.


**Reasoning**:
Define the CNN model architecture using convolutional, pooling, and dense layers.



**Reasoning**:
The error indicates that the input shape is too small for the convolutional and pooling layers. Adjust the kernel size and pooling size to be smaller to avoid negative dimensions.



In [112]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense

model = Sequential()

# Add convolutional layers with adjusted kernel and pooling sizes
model.add(Conv2D(32, (1, 1), activation='relu', input_shape=X_train.shape[1:]))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(64, (2, 2), activation='relu'))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(128, (1, 1), activation='relu'))
model.add(MaxPooling2D((2, 4)))

# Flatten the output
model.add(Flatten())

# Add dense layers
# model.add(Dense(128, activation='relu'))
model.add(Dense(y_train.shape[1], activation='sigmoid'))

model.summary()

model_small = model

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [113]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense

model = Sequential()

# Add convolutional layers with adjusted kernel and pooling sizes
model.add(Conv2D(32, (2, 2), activation='relu', input_shape=X_train.shape[1:]))
model.add(MaxPooling2D((1, 2))) # Pool only along the second dimension
model.add(Conv2D(64, (2, 2), activation='relu'))
model.add(MaxPooling2D((1, 2)))
model.add(Conv2D(128, (2, 2), activation='relu'))
model.add(MaxPooling2D((1, 2)))


# Flatten the output
model.add(Flatten())

# Add dense layers
model.add(Dense(128, activation='relu'))
model.add(Dense(y_train.shape[1], activation='sigmoid'))

model.summary()

model_medium = model

In [114]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense

model = Sequential()

# Add convolutional layers with further adjusted kernel and pooling sizes
model.add(Conv2D(32, (1, 1), activation='relu', input_shape=X_train.shape[1:]))
model.add(MaxPooling2D((1, 1))) # Use (1,1) pooling to avoid reducing dimension too quickly
model.add(Conv2D(64, (1, 1), activation='relu'))
model.add(MaxPooling2D((1, 1)))
model.add(Conv2D(128, (1, 1), activation='relu'))
model.add(MaxPooling2D((1, 1)))


# Flatten the output
model.add(Flatten())

# Add dense layers
model.add(Dense(128, activation='relu'))
model.add(Dense(y_train.shape[1], activation='sigmoid'))

model.summary()

model_large = model

**Reasoning**:
The model summary shows that the output shape after the last MaxPooling2D layer has a dimension of 0, which is incorrect. The input shape is likely still too large for the kernel and pooling sizes. I need to further adjust the kernel and pooling sizes to ensure the output dimensions remain positive.



In [115]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense

model = Sequential()

# Add convolutional layers with adjusted kernel and pooling sizes
model.add(Conv2D(32, (1, 1), activation='relu', input_shape=X_train.shape[1:]))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(64, (2, 2), activation='relu'))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(128, (1, 1), activation='relu'))
model.add(MaxPooling2D((2, 4)))


# Flatten the output
model.add(Flatten())

# Add dense layers
model.add(Dense(128, activation='relu'))
model.add(Dense(y_train.shape[1], activation='sigmoid'))

model.summary()

model_small_test1 = model

**Reasoning**:
The model summary now shows positive dimensions after the pooling layers, but using (1,1) kernel and pooling sizes essentially bypasses the spatial feature extraction capabilities of CNNs. I need to find a balance between kernel/pooling sizes and the input shape to effectively use convolutional layers without causing negative dimensions. I will try increasing the kernel size slightly while keeping the pooling size small.



In [116]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense

model = Sequential()

# Add convolutional layers with adjusted kernel and pooling sizes
model.add(Conv2D(32, (2, 2), activation='relu', input_shape=X_train.shape[1:]))
model.add(MaxPooling2D((1, 2))) # Pool only along the second dimension
model.add(Conv2D(64, (2, 2), activation='relu'))
model.add(MaxPooling2D((1, 2)))
model.add(Conv2D(128, (2, 2), activation='relu'))
model.add(MaxPooling2D((1, 2)))


# Flatten the output
model.add(Flatten())

# Add dense layers
# model.add(Dense(128, activation='relu'))
model.add(Dense(y_train.shape[1], activation='sigmoid'))

model.summary()

model_small_test2 = model

## Compile the model

### Subtask:
Compile the CNN model by specifying the optimizer, loss function, and metrics.


**Reasoning**:
Compile the CNN model with the Adam optimizer, binary crossentropy loss, and accuracy metric.



In [132]:
from tensorflow.keras.optimizers import Adam

model = model_small

model.compile(optimizer=Adam(),
              loss='binary_crossentropy',
              metrics=['accuracy'])

## Train the model

### Subtask:
Train the compiled model using the training data (`X_train`, `y_train`) and validate it using the validation data (`X_val`, `y_val`).


**Reasoning**:
Train the compiled model using the training and validation data.



In [133]:
history = model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_val, y_val))

Epoch 1/10
[1m806/806[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 6ms/step - accuracy: 0.8338 - loss: 0.4511 - val_accuracy: 0.8291 - val_loss: 0.4575
Epoch 2/10
[1m806/806[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4ms/step - accuracy: 0.8343 - loss: 0.4417 - val_accuracy: 0.8291 - val_loss: 0.4680
Epoch 3/10
[1m806/806[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 4ms/step - accuracy: 0.8417 - loss: 0.4102 - val_accuracy: 0.8275 - val_loss: 0.4499
Epoch 4/10
[1m806/806[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 4ms/step - accuracy: 0.8438 - loss: 0.3848 - val_accuracy: 0.8306 - val_loss: 0.4577
Epoch 5/10
[1m806/806[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 4ms/step - accuracy: 0.8494 - loss: 0.3635 - val_accuracy: 0.8266 - val_loss: 0.4157
Epoch 6/10
[1m806/806[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4ms/step - accuracy: 0.8611 - loss: 0.3319 - val_accuracy: 0.8380 - val_loss: 0.3972
Epoch 7/10
[1m806/806[0m 

**Reasoning**:
The first step is to load the data from the CSV file into a pandas DataFrame and display the first few rows to understand its structure.



## Evaluate the model

### Subtask:
Evaluate the trained model on the test data (`X_test`, `y_test`) to assess its performance.

**Reasoning**:
Evaluate the trained model on the test data to assess its performance.

In [134]:
loss, accuracy = model.evaluate(X_test, y_test)

print(f"Test Loss: {loss}")
print(f"Test Accuracy: {accuracy}")

[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - accuracy: 0.8520 - loss: 0.4095
Test Loss: 0.39029690623283386
Test Accuracy: 0.8568325638771057


In [135]:
import tensorflow as tf

# Quantize the model
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
quantized_tflite_model = converter.convert()

# Save the quantized model (optional)
# with open('quantized_model.tflite', 'wb') as f:
#     f.write(quantized_tflite_model)

# Load the quantized model
interpreter = tf.lite.Interpreter(model_content=quantized_tflite_model)
interpreter.allocate_tensors()

# Get input and output tensors
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Prepare test data for inference
# TFLite models expect float32 inputs
X_test_float32 = X_test.astype(np.float32)

# Run inference on the test set
predictions = []
for i in range(X_test_float32.shape[0]):
    interpreter.set_tensor(input_details[0]['index'], np.expand_dims(X_test_float32[i], axis=0))
    interpreter.invoke()
    output_data = interpreter.get_tensor(output_details[0]['index'])
    predictions.append(output_data[0])

predictions = np.array(predictions)

# Evaluate the quantized model (assuming y_test is already one-hot encoded)
# For binary classification with sigmoid output, you can use binary_accuracy
from tensorflow.keras.metrics import BinaryAccuracy

binary_accuracy = BinaryAccuracy()
binary_accuracy.update_state(y_test, predictions)

print(f"Quantized Model Test Accuracy: {binary_accuracy.result().numpy()}")

# If you want to calculate loss, you'd need to implement it manually or convert predictions to match y_test format
# For example, if y_test is one-hot, and predictions are probabilities for each class:
# from tensorflow.keras.losses import BinaryCrossentropy
# bce = BinaryCrossentropy()
# loss = bce(y_test, predictions).numpy()
# print(f"Quantized Model Test Loss: {loss}")

Saved artifact at '/tmp/tmpio1nu_a4'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 11, 120, 3), dtype=tf.float32, name='keras_tensor_590')
Output Type:
  TensorSpec(shape=(None, 2), dtype=tf.float32, name=None)
Captures:
  132555547778832: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132562230917648: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132562230917264: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132562230918224: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132562230914576: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132562230917840: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132562230918608: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132562230915920: TensorSpec(shape=(), dtype=tf.resource, name=None)


    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


Quantized Model Test Accuracy: 0.8566516041755676


In [136]:
import os

# Save the quantized model temporarily to check its size
quantized_model_path = 'quantized_model.tflite'
with open(quantized_model_path, 'wb') as f:
    f.write(quantized_tflite_model)

# Get the file size in bytes and convert to kilobytes
model_size_bytes = os.path.getsize(quantized_model_path)
model_size_kb = model_size_bytes / 1024

print(f"Dimensioni del modello quantizzato: {model_size_bytes} bytes ({model_size_kb:.2f} KB)")

# Clean up the temporary file
os.remove(quantized_model_path)

Dimensioni del modello quantizzato: 25464 bytes (24.87 KB)
