<a href="https://colab.research.google.com/github/Arhin-Eben/Machine-learning-with-python/blob/master/OSVFuseNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [55]:
import zipfile
from google.colab import files
import os
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, models, Input, callbacks
from tensorflow.keras.layers import DepthwiseConv1D, Conv1D, BatchNormalization, ReLU, MaxPooling1D, GlobalAveragePooling1D, Dense, Dropout, Flatten, concatenate, Reshape
import re # Import the re module used in load_signatures_and_labels_from_folder


In [56]:
# Specify the path to the zip file
zip_file_path = '/content/SVC-2004_Task1.zip'

# Extract the zip file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall('/content/') # Extract to the root /content/

# Verify the extraction
# Now the files should be in /content/SVC-2004_Task1
extracted_path = '/content/SVC-2004_Task1'
print("Contents of extracted directory:")
print(os.listdir(extracted_path)) # Print directory contents to verify

Contents of extracted directory:
['Task1']


In [57]:
# 1. HANDCRAFTED FEATURE EXTRACTION (Modified to use 4 columns)
def extract_handcrafted_features(sig):
    # Assume data is [time, x, y, button]
    t, x, y, p = [sig[:, i] for i in range(4)] # Changed from range(6) to range(4)
    dt = np.diff(t) + 1e-6
    dx, dy = np.diff(x), np.diff(y)
    velocity = np.sqrt(dx**2 + dy**2) / dt
    acceleration = np.diff(velocity) / dt[1:] if len(velocity) > 1 else np.zeros(1)
    jerk = np.diff(acceleration) / dt[2:] if len(acceleration) > 1 else np.zeros(1)
    # Curvature calculation still uses dx, dy which are derived from x, y
    curvature = (
        np.abs(dx[1:] * dy[:-1] - dy[1:] * dx[:-1]) /
        (dx[:-1]**2 + dy[:-1]**2 + 1e-6)**1.5 if len(dx) > 1 else np.zeros(1)
    )
    features = [
        t[-1] - t[0],
        np.max(velocity) if len(velocity) else 0,
        np.mean(velocity) if len(velocity) else 0,
        np.std(velocity) if len(velocity) else 0,
        np.max(acceleration) if len(acceleration) else 0,
        np.mean(acceleration) if len(acceleration) else 0,
        np.std(acceleration) if len(acceleration) else 0,
        np.max(jerk) if len(jerk) else 0,
        np.mean(jerk) if len(jerk) else 0,
        np.std(jerk) if len(jerk) else 0,
        np.max(p), np.mean(p), np.std(p),
        # Removed az and alt features as they are not in the data
        np.mean(curvature) if len(curvature) else 0,
        np.std(curvature) if len(curvature) else 0
    ]
    # Number of handcrafted features will be less (19 instead of 21)
    # (t_duration, v_max, v_mean, v_std, a_max, a_mean, a_std, j_max, j_mean, j_std, p_max, p_mean, p_std, curv_mean, curv_std) = 15 features
    # Let's recalculate based on the list: 1 + 3*3 + 3 + 2 = 1+9+3+2 = 15 features.
    # The original calculation of 21 included 3*3 for az and alt which are now removed.
    return np.array(features, dtype=np.float32)

In [60]:
# 2. DATA LOADING FROM 'Task1' FOLDER (Modified to skip first line and expect 4 columns)
def load_signatures_and_labels_from_folder(folder_path):
    """
    Loads all .TXT files in folder_path as signatures.
    Assumes filenames are of the form U<userID>S<sampleID>.TXT.
    Samples 1–20: genuine (label 0), samples 21–40: forgery (label 1).
    Data format assumed to be: time, x, y, button (4 columns).
    Skips the first line of each file.
    """
    # import re # Already imported at the top
    signatures = []
    labels = []
    print(f"Attempting to load from: {folder_path}") # Add print for debugging
    if not os.path.isdir(folder_path):
        print(f"Error: Folder not found at {folder_path}")
        return signatures, labels # Return empty lists if folder doesn't exist

    for fname in os.listdir(folder_path):
        if not fname.lower().endswith('.txt'):
            continue
        fpath = os.path.join(folder_path, fname)
        # Extract sample number (S##) from filename
        match = re.search(r'S(\d+)', fname, re.IGNORECASE)
        if not match:
            print(f"Skipping {fname}: cannot extract sample number (should match 'S##')")
            continue
        sample_num = int(match.group(1))
        # Label: 0 = genuine (1–20), 1 = forgery (21–40)
        if 1 <= sample_num <= 20:
            label = 0
        elif 21 <= sample_num <= 40:
            label = 1
        else:
            print(f"Skipping {fname}: sample number out of expected range (1-40)")
            continue
        # Load data
        data = []
        with open(fpath, 'r') as f:
            next(f) # Skip the first line (header)
            for line in f:
                parts = line.strip().split()
                # Expecting 4 columns now
                if len(parts) >= 4: # Changed from 6 to 4
                    try:
                        # Only take the first 4 columns
                        data.append([float(x) for x in parts[:4]]) # Changed from 6 to 4
                    except ValueError: # Catch specific ValueError for safer parsing
                        print(f"Skipping line in {fname} due to parsing error: {line.strip()}")
                        continue
        if data:
            data = np.array(data)
            if data.shape[1] == 4: # Changed from 6 to 4
                signatures.append(data)
                labels.append(label)
            else:
                # This case should ideally not happen if len(parts) >= 4
                print(f"Skipping {fname}: processed data does not have 4 columns (shape {data.shape}).")
        else:
             print(f"Skipping {fname}: file is empty or contains no valid data lines after skipping header.")
    return signatures, labels

In [63]:
# 4. DATA AUGMENTATION (unchanged logic, relies on preprocess_signature handling shape)
def augment_signature(sig, max_len):
    # Ensure sig has content before augmenting
    if sig.shape[0] == 0:
        return preprocess_signature(sig, max_len=max_len) # Return padded zeros if input is empty

    sig_aug = sig + np.random.normal(0, 0.01, sig.shape)
    factor = np.random.uniform(0.9, 1.1)
    num_samples_aug = int(sig.shape[0] * factor)
    if num_samples_aug == 0:
        num_samples_aug = 1
    idxs = np.linspace(0, sig.shape[0]-1, num_samples_aug).astype(int)
    idxs = np.clip(idxs, 0, sig.shape[0]-1)
    sig_aug = sig_aug[idxs]
    return preprocess_signature(sig_aug, max_len=max_len)


In [51]:
# 3. DATA PREPROCESSING (unchanged)
def preprocess_signature(sig, max_len=200):
    T = sig.shape[0]
    if T < max_len:
        pad = np.zeros((max_len-T, sig.shape[1]))
        sig = np.vstack([sig, pad])
    elif T > max_len:
        sig = sig[:max_len]
    scaler = MinMaxScaler()
    sig = scaler.fit_transform(sig)
    return sig

def preprocess_dataset(signatures, max_len=200):
    X, X_hand = [], []
    for sig in signatures:
        X.append(preprocess_signature(sig, max_len))
        X_hand.append(extract_handcrafted_features(sig))
    return np.array(X, dtype=np.float32), np.array(X_hand, dtype=np.float32)


In [81]:
# 5. MODEL ARCHITECTURE (Modified input shapes)
def build_cae_encoder(input_shape=(200,4)): # Changed input shape to 4 features
    inputs = Input(shape=input_shape)
    x = Conv1D(32, 5, activation='relu', padding='same')(inputs)
    x = MaxPooling1D(2)(x)
    x = Conv1D(64, 3, activation='relu', padding='same')(x)
    encoded = MaxPooling1D(2)(x)
    encoder = models.Model(inputs, encoded)
    return encoder

# dws_conv_block remains the same as it operates on the processed features
def dws_conv_block(x, filters, kernel_size, strides=1):
    x = DepthwiseConv1D(kernel_size, strides=strides, padding='same')(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = Conv1D(filters, 1, padding='same')(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    return x

def build_osvfusenet(input_shape=(200,4), num_handcrafted_features=15): # Changed input shape and num_handcrafted_features
    sig_input = Input(shape=input_shape, name='signature_input')
    hand_input = Input(shape=(num_handcrafted_features,), name='handcrafted_input')
    encoder = build_cae_encoder(input_shape)
    deep_features = encoder(sig_input)
    deep_features = Flatten()(deep_features)
    fusion = concatenate([deep_features, hand_input])

    total_features = fusion.shape[-1]
    x = Reshape((total_features, 1))(fusion)

    x = dws_conv_block(x, 64, 3)
    x = MaxPooling1D(2)(x)
    x = dws_conv_block(x, 128, 3)
    x = GlobalAveragePooling1D()(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.5)(x)
    output = Dense(1, activation='sigmoid')(x)
    return models.Model([sig_input, hand_input], output)

def compile_model(model, lr=0.001):
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return model


if __name__ == "__main__":
    # Load signatures and labels
    dataset_dir = "/content/SVC-2004_Task1/Task1" # Changed path here

    signatures, labels = load_signatures_and_labels_from_folder(dataset_dir)
    print(f'Loaded {len(signatures)} signatures from {dataset_dir}.')

    # --- Add check for loaded data ---
    if not signatures:
        print("Error: No signatures were loaded. Please check the dataset path and file format.")
    else:
        # Preprocess
        MAX_LEN = 100 # Define max_len here
        # Update expected number of features
        expected_sig_features = 4
        expected_hand_features = 15

        X, X_hand = preprocess_dataset(signatures, max_len=MAX_LEN)
        y = np.array(labels)

        # (Optional) Data augmentation
        # Check if X is not empty before augmenting
        if X.shape[0] > 0:
            aug_X, aug_hand, aug_y = [], [], []
            for xi, hi, yi in zip(X, X_hand, y):
                # Add original data
                aug_X.append(xi)
                aug_hand.append(hi)
                aug_y.append(yi)

                # Add augmented data
                xi_aug = augment_signature(xi, max_len=MAX_LEN) # Pass max_len here
                # Ensure xi_aug is not empty before extracting features
                if xi_aug.shape[0] > 0:
                    hi_aug = extract_handcrafted_features(xi_aug)
                    aug_X.append(xi_aug)
                    aug_hand.append(hi_aug)
                    aug_y.append(yi)
                else:
                     print(f"Warning: Augmentation resulted in an empty signature, skipping.")

            # Update X, X_hand, y with augmented data
            X, X_hand, y = np.array(aug_X), np.array(aug_hand), np.array(aug_y)
        else:
            print("Warning: No data initially loaded, skipping augmentation.")


        # Ensure there are still samples after augmentation (or if augmentation was skipped)
        if len(X) == 0:
             print("Error: Data augmentation resulted in an empty dataset or no data was initially loaded.")
        else:
            # Train/test split
            # Ensure stratify is used with a non-empty y array
            if len(y) > 1 and len(np.unique(y)) > 1: # Check if stratify is possible
                 X_train, X_test, Xh_train, Xh_test, y_train, y_test = train_test_split(
                    X, X_hand, y, test_size=0.2, random_state=42, stratify=y
                )
            elif len(y) > 0: # If only one class or only one sample
                 X_train, X_test, Xh_train, Xh_test, y_train, y_test = train_test_split(
                    X, X_hand, y, test_size=0.2 if len(y) > 5 else (0.5 if len(y) > 1 else 0), random_state=42 # Adjust test_size for very small datasets
                )
                 if len(y_test) == 0 and len(y_train) > 0: # Ensure test set is not empty if possible
                     print("Warning: Could not create a test set with stratify due to small dataset. Test set is empty.")
            else: # No data to split
                print("Error: No data available for train/test split.")
                X_train, X_test, Xh_train, Xh_test, y_train, y_test = np.empty((0, MAX_LEN, expected_sig_features)), np.empty((0, MAX_LEN, expected_sig_features)), np.empty((0, expected_hand_features)), np.empty((0, expected_hand_features)), np.empty((0,)), np.empty((0,))


            # Only proceed if training data exists
            if X_train.shape[0] > 0:
                # Build model
                # Check if shapes match expectations before building the model
                # Use the updated expected_sig_features and expected_hand_features
                expected_sig_shape = (MAX_LEN, expected_sig_features)

                if X_train.shape[1:] != expected_sig_shape:
                     print(f"Error: Signature data shape mismatch during split. Expected {expected_sig_shape}, got {X_train.shape[1:]}")
                elif Xh_train.shape[1] != expected_hand_features:
                     print(f"Error: Handcrafted features shape mismatch during split. Expected {expected_hand_features}, got {Xh_train.shape[1]}")
                else:
                    model = build_osvfusenet(input_shape=(MAX_LEN, expected_sig_features), num_handcrafted_features=expected_hand_features) # Use updated values
                    model = compile_model(model)
                    model.summary()

                    early_stop = callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

                    # Train
                    history = model.fit(
                        [X_train, Xh_train], y_train,
                        validation_data=([X_test, Xh_test], y_test),
                        epochs=20,
                        batch_size=16,
                        callbacks=[early_stop],
                        verbose=2
                    )

                    # Evaluate - This evaluation block is now correctly inside the if X_train.shape[0] > 0: block
                # Check if test data exists before evaluating
                if X_test.shape[0] > 0:
                    loss, acc = model.evaluate([X_test, Xh_test], y_test, verbose=0)
                    print(f"Test accuracy: {acc:.4f}")
                else:
                    print("No test data available for evaluation.")

            else: # This else is now correctly aligned with the if X_train.shape[0] > 0: block
                print("No training data available after split.")

Attempting to load from: /content/SVC-2004_Task1/Task1
Loaded 1600 signatures from /content/SVC-2004_Task1/Task1.


Epoch 1/20
160/160 - 28s - 175ms/step - accuracy: 0.5000 - loss: 0.6932 - val_accuracy: 0.5000 - val_loss: 0.6932
Epoch 2/20
160/160 - 41s - 258ms/step - accuracy: 0.4930 - loss: 0.6932 - val_accuracy: 0.5000 - val_loss: 0.6931
Epoch 3/20
160/160 - 38s - 240ms/step - accuracy: 0.5000 - loss: 0.6932 - val_accuracy: 0.5000 - val_loss: 0.6931
Epoch 4/20
160/160 - 23s - 141ms/step - accuracy: 0.4906 - loss: 0.6932 - val_accuracy: 0.5000 - val_loss: 0.6931
Epoch 5/20
160/160 - 18s - 116ms/step - accuracy: 0.4977 - loss: 0.6932 - val_accuracy: 0.5000 - val_loss: 0.6931
Epoch 6/20
160/160 - 18s - 114ms/step - accuracy: 0.4688 - loss: 0.6932 - val_accuracy: 0.5000 - val_loss: 0.6931
Epoch 7/20
160/160 - 23s - 145ms/step - accuracy: 0.4922 - loss: 0.6932 - val_accuracy: 0.5000 - val_loss: 0.6931
Epoch 8/20
160/160 - 18s - 114ms/step - accuracy: 0.4930 - loss: 0.6932 - val_accuracy: 0.5000 - val_loss: 0.6931
Epoch 9/20
160/160 - 19s - 116ms/step - accuracy: 0.5000 - loss: 0.6932 - val_accuracy: 