In [None]:
!pip install einops 

In [None]:
import imageio
import glob
import os
import time
import cv2
import tensorflow as tf
from tensorflow.keras import layers
from IPython import display
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
from tensorflow import keras
import pydicom as dicom
from skimage.util import view_as_windows
from skimage import exposure, io, filters
import scipy
from scipy.linalg import svd
import matplotlib.patches as patches
from PIL import Image
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Layer, GlobalAveragePooling2D, Conv2D, ReLU, Multiply
from tensorflow.keras.models import Model
import pandas as pd
from tensorflow.keras.callbacks import EarlyStopping
from keras.metrics import Recall, Precision
from keras.regularizers import l2,l1
import numpy as np
import tensorflow_addons as tfa
from keras.applications.vgg16 import VGG16




In [None]:
def read_cvs_file(path):
    csv_matrix = np.genfromtxt(path, delimiter=',', skip_header=1)
    return csv_matrix

In [None]:
label_path = "your path to BCS-DBT boxes-train-v2.csv"
labels_csv = pd.read_csv(label_path)
training = "your path"
test = "your path"
validation = "your path"
extension = '.png'


In [None]:
#labels per mass no mass classification
def assign_labels(directory):
    dataset_tuples = []
    coordinates = []
    index = 0
    for subdir, dirs, files in os.walk(directory):
        #PER OGNI IMMAGINE
            for file in files:
                if file.endswith(extension):
                    filepath = os.path.join(subdir, file)
                    img = Image.open(filepath)
                
                    data = np.array(img)
                    data = data.astype(np.float32)
                    data = tf.expand_dims(data, axis=-1)
                    data = tf.convert_to_tensor(data, dtype=(tf.float32))
                    paziente = os.path.basename(subdir)
                    patient_row = labels_csv[(labels_csv['PatientID'] == paziente)]
                    index = index +1

                    
                    if not patient_row.empty:  # Controlla se la riga è stata trovata nel DataFrame
                        label = 1
                    else:
                        label= 0
                    data_complete = (data/255., label)
                    dataset_tuples.append(data_complete)
                    print(f'{subdir} e {data_complete[1]}')
    return dataset_tuples

                
                

In [None]:
training_tuples = assign_labels(training)

In [None]:
validation_tuples = assign_labels(validation)

In [None]:
test_tuples = assign_labels(test)

In [None]:
train_images = [item[0] for item in training_tuples]
train_labels = [item[1] for item in training_tuples]

validation_images = [item[0] for item in validation_tuples]
validation_labels = [item[1] for item in validation_tuples]

test_images = [item[0] for item in test_tuples]
test_labels = [item[1] for item in test_tuples]

# Creazione di un dataset TensorFlow
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
validation_dataset = tf.data.Dataset.from_tensor_slices((validation_images, validation_labels))
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))



In [None]:
batch_size = 64
buffer_size = 1000

train_dataset = train_dataset.shuffle(buffer_size=buffer_size).batch(batch_size)
validation_dataset = validation_dataset.batch(batch_size)
test_dataset = test_dataset.batch(batch_size)




In [None]:
class_counts = {}

# Conta le occorrenze di ciascuna classe
for label in train_labels:
    if label in class_counts:
        class_counts[label] += 1
    else:
        class_counts[label] = 1

# Stampa il numero di elementi per ciascuna classe
for label, count in class_counts.items():
    print(f"Classe {label}: {count} elementi")


In [None]:
class SEBlock(tf.keras.layers.Layer):
    def __init__(self, channels, reduction=16, **kwargs):
        super(SEBlock, self).__init__(**kwargs)
        self.channels = channels
        self.reduction = reduction
        self.pooling = tf.keras.layers.GlobalAveragePooling2D()
        self.fc1 = tf.keras.layers.Dense(units=self.channels // self.reduction, activation='relu',kernel_regularizer=l1(0.1),kernel_initializer = keras.initializers.HeNormal())
        self.fc2 = tf.keras.layers.Dense(units=self.channels, activation='sigmoid',kernel_regularizer=l2(0.1), kernel_initializer = keras.initializers.GlorotNormal())
    
    def call(self, inputs):
        x = self.pooling(inputs)
        x = self.fc1(x)
        x = self.fc2(x)
        x = tf.reshape(x, shape=(-1, 1, 1, self.channels))
        return inputs * x

def build_cnn_with_se(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)
    #x = data_augmentation(inputs)
    x = layers.Conv2D(16, (7, 7), activation='relu', kernel_regularizer=l2(0.01),kernel_initializer = keras.initializers.HeNormal())(inputs)
    x = layers.Dropout(0.2)(x)
    x = SEBlock(channels=16)(x) 
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)

    x = layers.Conv2D(32, (7, 7), activation='relu', kernel_regularizer=l2(0.01),kernel_initializer = keras.initializers.HeNormal())(x)
    x = layers.Dropout(0.2)(x)
    x = SEBlock(channels=32)(x) 
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)

    x = layers.Conv2D(64, (5, 5), activation='relu', kernel_regularizer=l2(0.01),kernel_initializer = keras.initializers.HeNormal())(x)
    x = layers.Dropout(0.2)(x)
    x = SEBlock(channels=64)(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)
    
    x = layers.Conv2D(128, (3, 3), activation='relu', kernel_regularizer=l2(0.01),kernel_initializer = keras.initializers.HeNormal())(x)
    x = layers.Dropout(0.2)(x)    
    x = SEBlock(channels=128)(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((3, 3))(x)

    x = layers.Flatten()(x)
    x = layers.Dense(8, activation='relu',kernel_initializer = keras.initializers.RandomNormal(),kernel_regularizer=l2(0.001))(x)
    x = layers.Dropout(0.2)(x)
    
    x = layers.Flatten()(x)
    x = layers.Dense(4, activation='relu',kernel_initializer = keras.initializers.RandomNormal(),kernel_regularizer=l2(0.001))(x)
    x = layers.Dropout(0.2)(x)

    x = layers.Dense(num_classes, activation='sigmoid')(x)
    
    model = Model(inputs=inputs, outputs=x)
    model.summary()
    return model


In [None]:
input_shape = (100,200,1)
num_classes = 1
model = build_cnn_with_se(input_shape,num_classes)
model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0001), 
              loss='binary_crossentropy', 
              metrics=['accuracy', Precision(), Recall() ])


In [None]:
num_epochs = 50
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Addestramento del modello con early stopping
history = model.fit(train_dataset, epochs=num_epochs, validation_data=validation_dataset, callbacks=[early_stopping])


In [None]:
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Test Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
loss, accuracy, precision, recall = model.evaluate(test_dataset)
print("Test Loss:", loss)
print("Test Accuracy {:.2f} %".format(accuracy))
print("Test Precision {:.2f} %".format(precision))
print("Test Recall {:.2f} %".format(recall))

In [None]:
accuracy = modello.evaluate(test_dataset)