In [31]:
import pandas as pd
import numpy as np
import tabulate as tb
from typing import Dict
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import accuracy_score, f1_score
from tensorflow.keras.applications import MobileNetV2

SEED = 42
np.random.seed(SEED)

In [32]:
# file_path = './DeepFake Annotations/A-FF++.csv'
file_path = './data/HAM10000_metadata.csv'
df = pd.read_csv(file_path, sep=',')
df['file_path'] = 'data/HAM10000_images/' + df['image_id'] + '.jpg'

df = df.drop(columns=['image_id', 'lesion_id', 'dx_type', 'age', 'localization'])

print("classes amt: ", df['dx'].value_counts().to_dict())
df = df[df['dx'].isin(['nv', 'mel', 'bkl'])]


def get_balanced_subset(df, class_col, feature_col, feature_value, samples_per_class, randomize=True, reset_index=False):
    tmp = df[df[feature_col] == feature_value]
    
    unique_classes = tmp[class_col].unique()
    for cl in unique_classes:
        amt = len(tmp[tmp[class_col] == cl])
        if amt < samples_per_class:
            raise ValueError(f"Not enough samples for class '{cl}' in feature '{feature_value}'. "
                             f"Required: {samples_per_class}, Available: {len(tmp[tmp[class_col] == cl])}")
   
    tmp = tmp.groupby(class_col).head(samples_per_class)
    if randomize:
        tmp = tmp.sample(len(tmp), random_state=SEED)
    if reset_index:
        tmp = tmp.reset_index(drop=True)
    
    return tmp    

tmp = get_balanced_subset(df=df, class_col='dx', feature_col='sex', feature_value='male', samples_per_class=2)
print(tb.tabulate(tmp, headers='keys', tablefmt='psql'))

classes amt:  {'nv': 6705, 'mel': 1113, 'bkl': 1099, 'bcc': 514, 'akiec': 327, 'vasc': 142, 'df': 115}
+------+------+-------+---------------------------------------+
|      | dx   | sex   | file_path                             |
|------+------+-------+---------------------------------------|
|    0 | bkl  | male  | data/HAM10000_images/ISIC_0027419.jpg |
|    1 | bkl  | male  | data/HAM10000_images/ISIC_0025030.jpg |
| 2982 | nv   | male  | data/HAM10000_images/ISIC_0031325.jpg |
|   64 | nv   | male  | data/HAM10000_images/ISIC_0024698.jpg |
| 1214 | mel  | male  | data/HAM10000_images/ISIC_0031023.jpg |
| 1213 | mel  | male  | data/HAM10000_images/ISIC_0027190.jpg |
+------+------+-------+---------------------------------------+


In [33]:
def get_exp_data(df, class_col, feature_col, ratio : Dict, size, randomize=True, exclude_column=None, exclude_df=None):
    if randomize:
        df_rnd = df.sample(frac=1, random_state=SEED).reset_index(drop=True)
    else:
        df_rnd = df.copy()
        
    if exclude_column is not None and exclude_df is not None:
        if exclude_column not in df_rnd.columns:
            raise ValueError(f"Column '{exclude_column}' not found in DataFrame.")
        if exclude_column not in exclude_df.columns:
            raise ValueError(f"Column '{exclude_column}' not found in exclude DataFrame.")
        df_rnd = df_rnd[~df_rnd[exclude_column].isin(exclude_df[exclude_column])]
         

        
    uniq_classes = df_rnd[class_col].unique()
    uniq_features = df_rnd[feature_col].unique()
    
    def get_exp_data_inner(tmp_df, size):
        df_tmp = None
        for uf in uniq_features:
            if ratio.get(uf) is None:
                print(f"Feature '{uf}' not found in ratios. Skipping.")
                continue            
            c_amt = int(size * ratio[uf] / len(uniq_classes))
            # if c_amt <= 0:
            #     raise ValueError(f"Calculated samples per class ({c_amt}) is less than or equal to zero for feature '{uf}' with ratio {ratio}.")
            tmp = get_balanced_subset(df=tmp_df, class_col=class_col, feature_col=feature_col, feature_value=uf, 
                                        samples_per_class=c_amt, randomize=False)
            if df_tmp is None:
                df_tmp = tmp
            else:
                df_tmp = pd.concat([df_tmp, tmp])
        return df_tmp
            
    df_res = get_exp_data_inner(df_rnd, size)
    
    if len(df_res) < size:
        print(f"Samples for ({len(df_res)}) are less than requested ({size}).")
    
    #PRINT RATIOS
    ratios_fet = df_res[feature_col].value_counts(normalize=False).to_dict()
    ratios_cls = df_res[class_col].value_counts(normalize=False).to_dict()
    print(f"[] Ratios for {feature_col}: {ratios_fet}")
    print(f"[] Ratios for {class_col}: {ratios_cls}")
    print()
    
    df_res = df_res.reset_index(drop=True)
    
    return df_res      

In [34]:
def load_image(file_path, target_size=(224, 224)):
    image = tf.io.read_file(file_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, target_size)
    image = image / 255.0  # Normalize to [0, 1]
    return image

def get_data_for_model(df, class_col, files_col):
    image_paths = df[files_col].values
    labels = df[class_col].values
    labels = df[class_col].astype('category').cat.codes.values #classes strs to ints
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))
    dataset = dataset.map(lambda path, label: (load_image(path), label))
    dataset = dataset.batch(32)
    return dataset
    
def create_simple_model(num_classes, input_shape=(224, 224, 3)):
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        MaxPooling2D(pool_size=(2, 2)),
        Conv2D(64, (3, 3), activation='relu'),
        MaxPooling2D(pool_size=(2, 2)),
        Flatten(),
        Dense(128, activation='relu'),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer=Adam(learning_rate=0.001), 
                  loss='sparse_categorical_crossentropy', 
                  metrics=['accuracy'])
    return model, "CNV"


def create_mobile_net2(num_classes, input_shape=(224, 224, 3)):
    base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model.trainable = False
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(128, activation='relu')(x)
    predictions = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=predictions)
    model.compile(optimizer=Adam(learning_rate=0.001), 
                  loss='sparse_categorical_crossentropy', 
                  metrics=['accuracy'])
    return model, "mobile net 2"

In [35]:
# train_50_50 = get_exp_data(df, class_col='dx', feature_col='sex', ratio={'male':0.5, 'female': 0.5}, size=500)
# # print(tb.tabulate(train_50_50, headers='keys', tablefmt='psql'))

# test_50_50 = get_exp_data(df, class_col='dx', feature_col='sex', ratio={'male':0.5, 'female': 0.5}, size=100, exclude_column='file_path', exclude_df=train_50_50)
# # test_40_60 = get_exp_data(df, class_col='dx', feature_col='sex', ratio={'male':0.4, 'female': 0.6}, size=100, exclude_column='file_path', exclude_df=train_50_50)
# # test_30_70 = get_exp_data(df, class_col='dx', feature_col='sex', ratio={'male':0.3, 'female': 0.7}, size=100, exclude_column='file_path', exclude_df=train_50_50)

# num_classes = len(df['dx'].unique())
# model = create_simple_model(num_classes=num_classes)

# # Train the model
# train_dataset = get_data_for_model(train_50_50, class_col='dx', files_col='file_path')
# model.fit(train_dataset, epochs=20)

# # Make test predictions
# test_dataset = get_data_for_model(test_50_50, class_col='dx', files_col='file_path')
# predictions = model.predict(test_dataset)

# # calcu accuracy, F1
# y_true = test_50_50['dx'].astype('category').cat.codes.values
# y_pred = np.argmax(predictions, axis=1)
# accuracy = accuracy_score(y_true, y_pred)
# f1 = f1_score(y_true, y_pred, average='weighted')
# print(f"Accuracy: {accuracy:.4f}")

In [36]:
def perform_tests(df, train_meta, test_metas, reps, class_col, feature_split_col, exclude_column, files_col, get_model, epochs_num):
    res = []
    for r in range(reps):
        train = get_exp_data(df, class_col=class_col, feature_col=feature_split_col, ratio=train_meta['ratio'], size=train_meta['size'])
        tests = [
            get_exp_data(df, class_col=class_col, feature_col=feature_split_col, ratio=tm['ratio'], size=tm['size'], exclude_column=exclude_column, exclude_df=train) for tm in test_metas
        ]

        train_dataset = get_data_for_model(train, class_col=class_col, files_col=files_col)
        test_datasets = [
            get_data_for_model(test, class_col=class_col, files_col=files_col) for test in tests
        ]

        #traind df ratios of sex column:
        # print(f"Train ratios: {train[feature_split_col].value_counts(normalize=True).to_dict()}")
        #traind df ratios of dx column:
        # print(f"Train class ratios: {train[class_col].value_counts(normalize=True).to_dict()}")


        model, model_name = get_model(num_classes=len(df[class_col].unique()))
        model.fit(train_dataset, epochs=epochs_num)
        
        for test_dataset, test_meta, test_df in zip(test_datasets, test_metas, tests):
            predictions = model.predict(test_dataset)            
            y_true = test_df[class_col].astype('category').cat.codes.values
            y_pred = np.argmax(predictions, axis=1)
            acc = accuracy_score(y_true, y_pred)
            f1 = f1_score(y_true, y_pred, average='weighted')
            
            test_ratio = '/'.join([f"{k}:{v}" for k, v in test_meta['ratio'].items()])
            
            res.append([
                r,
                model_name,
                feature_split_col,
                train_meta['size'], 
                '/'.join([f"{k}:{v}" for k, v in train_meta['ratio'].items()]),
                test_meta['size'],
                test_ratio,
                acc,
                f1
            ])    

            print(f"Rep: {r:2} | Model: {model_name} | Feature Split: {feature_split_col} | Ratio: {test_ratio} | Acc: {acc:.2f}")
            
            res_df = pd.DataFrame(res, columns=[
                'rep', 'model_name', 'feature_split_col', 
                'train_size', 'train_ratio', 'test_size', 'test_ratio',
                'accuracy', 'f1_score'
            ])
            
            res_df.to_csv(f'res/res_{model_name}.csv', index=False)         
        
        
perform_tests(df=df, 
              train_meta={'ratio': {'male':0.0, 'female':1.0}, 'size': 500},
              test_metas=[
                  {'ratio': {'male':1.0, 'female':0.0}, 'size': 100},
                  {'ratio': {'male':0.8, 'female':0.2}, 'size': 100},
                  {'ratio': {'male':0.6, 'female':0.4}, 'size': 100},
                  {'ratio': {'male':0.4, 'female':0.6}, 'size': 100},
                  {'ratio': {'male':0.2, 'female':0.8}, 'size': 100},
                  {'ratio': {'male':0.0, 'female':1.0}, 'size': 100},
              ],
              reps=2,
              class_col='dx',
              feature_split_col='sex',
              exclude_column='file_path',
              files_col='file_path',
              get_model=create_mobile_net2,
              epochs_num=10
              )

Feature 'unknown' not found in ratios. Skipping.
Samples for (498) are less than requested (500).
[] Ratios for sex: {'female': 498}
[] Ratios for dx: {'nv': 166, 'mel': 166, 'bkl': 166}

Feature 'unknown' not found in ratios. Skipping.
Samples for (99) are less than requested (100).
[] Ratios for sex: {'male': 99}
[] Ratios for dx: {'nv': 33, 'bkl': 33, 'mel': 33}

Feature 'unknown' not found in ratios. Skipping.
Samples for (96) are less than requested (100).
[] Ratios for sex: {'male': 78, 'female': 18}
[] Ratios for dx: {'nv': 32, 'bkl': 32, 'mel': 32}

Feature 'unknown' not found in ratios. Skipping.
Samples for (99) are less than requested (100).
[] Ratios for sex: {'male': 60, 'female': 39}
[] Ratios for dx: {'nv': 33, 'bkl': 33, 'mel': 33}

Feature 'unknown' not found in ratios. Skipping.
Samples for (99) are less than requested (100).
[] Ratios for sex: {'female': 60, 'male': 39}
[] Ratios for dx: {'nv': 33, 'bkl': 33, 'mel': 33}

Feature 'unknown' not found in ratios. Skippin

In [37]:
res = pd.read_csv('res/res_mobile net 2.csv')

gr = res.groupby(['test_ratio']).agg(
    Model=('model_name', 'first'),
    TrainRatio=('train_ratio', 'first'),
    Accuracy= ('accuracy', 'mean'),
    AccuracySTD= ('accuracy', 'std'),
    F1=('f1_score', 'mean'),
    F1STD=('f1_score', 'std')
)
gr = gr.round(2).sort_values(by='test_ratio', ascending=False).reset_index()
              
print(tb.tabulate(gr, headers='keys', tablefmt='psql'))

+----+---------------------+--------------+---------------------+------------+---------------+------+---------+
|    | test_ratio          | Model        | TrainRatio          |   Accuracy |   AccuracySTD |   F1 |   F1STD |
|----+---------------------+--------------+---------------------+------------+---------------+------+---------|
|  0 | male:1.0/female:0.0 | mobile net 2 | male:0.0/female:1.0 |       0.65 |          0    | 0.65 |    0    |
|  1 | male:0.8/female:0.2 | mobile net 2 | male:0.0/female:1.0 |       0.65 |          0.01 | 0.65 |    0.01 |
|  2 | male:0.6/female:0.4 | mobile net 2 | male:0.0/female:1.0 |       0.69 |          0.01 | 0.69 |    0.01 |
|  3 | male:0.4/female:0.6 | mobile net 2 | male:0.0/female:1.0 |       0.7  |          0.02 | 0.7  |    0.02 |
|  4 | male:0.2/female:0.8 | mobile net 2 | male:0.0/female:1.0 |       0.7  |          0.01 | 0.7  |    0.02 |
|  5 | male:0.0/female:1.0 | mobile net 2 | male:0.0/female:1.0 |       0.72 |          0.03 | 0.72 |   