In [6]:
"""CNN model training
"""
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
import numpy as np
from tensorflow.keras.callbacks import EarlyStopping

# Set directory path for test data
data_dir = '../data/processed/'

# Load test data
X_train = np.load(data_dir + 'X_train_cs.npy')
y_train = np.load(data_dir + 'y_train_cs.npy', allow_pickle=True)

# Build CNN model
model_cnn = Sequential([
   Conv2D(32, (3, 3), activation='relu', input_shape=(300, 300, 3)),
   MaxPooling2D((2, 2)),
   Conv2D(64, (3, 3), activation='relu'),
   MaxPooling2D((2, 2)),
   Conv2D(128, (3, 3), activation='relu'),
   MaxPooling2D((2, 2)),
   Flatten(),
   Dense(512, activation='relu'),
   Dropout(0.5),
   Dense(4, activation='softmax')  # Four outputs for four-class problem
])

model_cnn.compile(optimizer='adam',
                 loss='categorical_crossentropy',  # Loss function for multi-class classification
                 metrics=['accuracy'])

# Optional: Add early stopping to prevent overfitting
early_stopping = EarlyStopping(monitor='val_loss', patience=5)

# Train CNN model
history = model_cnn.fit(
   X_train, y_train,
   epochs=5,  # Number of epochs or the number of times the model sees the data
   batch_size=32,
   validation_split=0.2,  # Use 20% of the data for validation
   callbacks=[early_stopping]
)

# Save trained model
model_cnn.save('../models/cnn_model.h5')

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [26]:
"""cnn_types train """
import numpy as np
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, Flatten, concatenate, Dropout
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam

# Set directory path for processed data
data_dir = '../data/processed/'

# Load processed data
X_train_images = np.load(data_dir + 'X_train_cs_type_pc.npy')
types_train = np.load(data_dir + 'types_train_cs_type_pc.npy')
y_train = np.load(data_dir + 'y_train_cs_type_pc.npy')

X_test_images = np.load(data_dir + 'X_test_cs_type_pc.npy')
types_test = np.load(data_dir + 'types_test_cs_type_pc.npy')
y_test = np.load(data_dir + 'y_test_cs_type_pc.npy')


# Assuming `types_train` and `types_test` are already integer encoded
num_types = np.max(types_train) + 1  # Assuming types are 0-indexed

# Build CNN model
image_input = Input(shape=(300, 300, 3), name='image_input')
x = Conv2D(32, (3, 3), activation='relu')(image_input)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(128, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Flatten()(x)

# Type input
type_input = Input(shape=(1,), name='type_input')
type_embedding = Dense(8, activation='relu')(type_input)

# Concatenate image and type features
combined_features = concatenate([x, type_embedding])

# Dense layers
z = Dense(512, activation='relu')(combined_features)
z = Dropout(0.5)(z)
output = Dense(4, activation='softmax')(z)

# Create model
model = Model(inputs=[image_input, type_input], outputs=output)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Reshape types to be compatible with Dense layer input
types_train_reshaped = types_train.reshape(-1, 1)
types_test_reshaped = types_test.reshape(-1, 1)

print(types_test_reshaped.shape)
# Train model
history = model.fit(
    [X_train_images, types_train_reshaped],
    y_train,
    batch_size=32,
    epochs=10,
    validation_data=
    ([X_test_images, types_test_reshaped], y_test)
)

# Save the trained model
model.save('../models/cnn_multi_input_pc_model.h5')

(756, 1)
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
"""SVM model training """

from sklearn import svm
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report
from pathlib import Path
import numpy as np

# Set the directory path for the split data
data_dir = Path('../data/processed/')  # This path is relative to the 'notebooks' directory

# Ensure the path is correct
assert data_dir.exists(), f"Path does not exist: {data_dir}"

# Load the data
X_train = np.load(data_dir / 'X_train.npy')
X_test = np.load(data_dir / 'X_test.npy')
y_train = np.load(data_dir / 'y_train.npy', allow_pickle=True)
y_test = np.load(data_dir / 'y_test.npy', allow_pickle=True)

# Flatten the image data to fit the SVM classifier
X_train_flat = X_train.reshape((X_train.shape[0], -1))

# Create an SVM classifier instance
svc = svm.SVC(kernel='rbf', class_weight='balanced', probability=True)

# Perform PCA dimensionality reduction to reduce the computational complexity
pca = PCA(n_components=150, whiten=True, random_state=42)

# Normalize the data
scaler = StandardScaler()

# Create a pipeline that includes normalization, PCA, and SVM
pipeline_svm = Pipeline([
   ('scaler', scaler),
   ('pca', pca),
   ('svc', svc)
])

# Convert one-hot encoded labels to integer class labels
y_train_labels = np.argmax(y_train, axis=1)

# Train the model
pipeline_svm.fit(X_train_flat, y_train_labels)

# The model can now be used for prediction or further evaluation

# Save the SVM model
import joblib
joblib.dump(pipeline_svm, '../models/svm_model.joblib')

In [None]:
""" Normal Transformer model train """

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Flatten, Dropout, GlobalAveragePooling2D
from tensorflow.keras.applications import EfficientNetB0  # Using EfficientNet as the feature extractor
from tensorflow.keras.layers import LayerNormalization, MultiHeadAttention, Add, Embedding
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, GlobalAveragePooling2D, Reshape
from tensorflow.keras.layers import LayerNormalization, MultiHeadAttention, Dropout, Add
from tensorflow.keras.layers import Flatten, GlobalAveragePooling1D
import numpy as np

# Set the directory path for the data
data_dir = '../data/processed/'

# Load the training data
X_train_transformer = np.load(data_dir + 'X_train_transformer2.npy')
y_train_transformer = np.load(data_dir + 'y_train_transformer2.npy', allow_pickle=True)

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
   # Multi-head self-attention (MHA) layer
   x = MultiHeadAttention(
       key_dim=head_size, num_heads=num_heads, dropout=dropout
   )(inputs, inputs)
   x = Dropout(dropout)(x)
   x = Add()([x, inputs])

   # Feed forward network (FFN)
   x = Dense(ff_dim, activation="relu")(x)
   x = Dropout(dropout)(x)
   x = Dense(inputs.shape[-1])(x)
   x = Add()([x, inputs])

   # Layer normalization
   x = LayerNormalization(epsilon=1e-6)(x)
   return x

def build_transformer_model(input_shape, head_size, num_heads, ff_dim, num_transformer_blocks, mlp_units, dropout=0, mlp_dropout=0):
   inputs = Input(shape=input_shape)
   efficientnet = EfficientNetB0(include_top=False, input_tensor=inputs, weights="imagenet")
   efficientnet.trainable = False

   # Use GlobalAveragePooling2D to reduce dimensionality
   x = GlobalAveragePooling2D()(efficientnet.output)
   x = Reshape((-1, x.shape[-1]))(x)  # Convert to a 2D sequence

   # Transformer encoder
   for _ in range(num_transformer_blocks):
       x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

   # After the Transformer, use GlobalAveragePooling1D to flatten the features or use Flatten()
   x = GlobalAveragePooling1D()(x)

   # MLP head
   for dim in mlp_units:
       x = Dense(dim, activation="relu")(x)
       x = Dropout(mlp_dropout)(x)

   # Output layer
   outputs = Dense(4, activation="softmax")(x)
   model = Model(inputs, outputs)
   return model


input_shape = (300, 300, 3)  # Adjust to match the actual image size and channel count
head_size = 256
num_heads = 4
ff_dim = 4
num_transformer_blocks = 4
mlp_units = [128]
dropout = 0.1
mlp_dropout = 0.1


# Build the model
model = build_transformer_model(
   input_shape, head_size, num_heads, ff_dim, num_transformer_blocks, mlp_units, dropout, mlp_dropout
)

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Model summary
# model.summary()

# train Transformer model
history = model.fit(
    X_train, y_train,
    batch_size=32,
    epochs=10,  # 根据需要调整epochs数量
    validation_split=0.25,  # 保留20%的数据用作验证集
    shuffle=True  # 打乱数据
)


# save model 
model.save('../models/transformer_model.h5')

In [2]:
"""image_types processed transformer train """
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Input, Dense, GlobalAveragePooling2D, Concatenate, Embedding, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import LayerNormalization, MultiHeadAttention, Add, Dropout
from tensorflow.keras.utils import plot_model
from tensorflow.keras.layers import Input, Dense, GlobalAveragePooling2D, Reshape

# Data path
data_dir = '../data/processed/'
labels_file = '../data/raw/labels6.csv'

# Load data
X_train = np.load(data_dir + 'X_train_transformer_types_aug.npy')
y_train = np.load(data_dir + 'y_train_transformer_types_aug.npy')
labels_df = pd.read_csv(labels_file, names=['image_id', 'probability', 'type'], sep='\s+')
types = labels_df['type'].values

# Adjust y_train shape if incorrect
# Check shape of y_train
print("y_train shape before squeeze:", y_train.shape)

# If y_train's second dimension is 1, remove it using np.squeeze
if y_train.shape[1] == 1:
    y_train = np.squeeze(y_train, axis=1)

print("y_train shape after squeeze:", y_train.shape)

# Encode type labels
label_encoder = LabelEncoder()
types_encoded = label_encoder.fit_transform(types)
num_types = len(np.unique(types_encoded))  # Get the number of unique types

# Convert type labels to one-hot encoding
types_train_one_hot = to_categorical(types_encoded)

# Definition of Transformer encoder layer
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    x = LayerNormalization(epsilon=1e-6)(inputs)
    attention_output = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)
    x = Add()([attention_output, inputs])
    x = LayerNormalization(epsilon=1e-6)(x)
    ff_output = Dense(ff_dim, activation="relu")(x)
    ff_output = Dropout(dropout)(ff_output)
    ff_output = Dense(inputs.shape[-1])(ff_output)
    x = Add()([ff_output, x])
    return x

# Build a multi-input Transformer model
def build_transformer_with_type_model(num_types, input_shape=(300, 300, 3), num_classes=4):
    # Image input
    image_input = Input(shape=input_shape, name='image_input')
    
    # Pre-trained EfficientNet as feature extractor
    base_model = EfficientNetB0(include_top=False, input_tensor=image_input, weights="imagenet")
    base_model.trainable = False

    # Use GlobalAveragePooling2D to reduce dimensions
    image_features = GlobalAveragePooling2D()(base_model.output)
    # Flatten the features
    image_features = Flatten()(image_features)
    
    # Type input
    type_input = Input(shape=(1,), name='type_input')
    # Embedding for types
    type_embedding = Embedding(input_dim=num_types, output_dim=8)(type_input)
    # Flatten embedded features
    type_features = Flatten()(type_embedding)
    
    # Combine image and type features
    combined_features = Concatenate()([image_features, type_features])
    # Reshape to three-dimensional tensor to fit MultiHeadAttention
    reshaped_features = Reshape((1, -1))(combined_features)

    # Add Transformer encoder layer
    transformer_output = transformer_encoder(reshaped_features, head_size=256, num_heads=4, ff_dim=1024, dropout=0.1)
    
    # Remove sequence dimension as we only have one sequence
    # This changes the shape of the output from (None, 1, 1288) to (None, 1288)
    transformer_output = Flatten()(transformer_output)
    # Classification head
    classification_output = Dense(num_classes, activation='softmax')(transformer_output)
    
    # Create the model
    model = Model(inputs=[image_input, type_input], outputs=classification_output)
    
    model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model

# Model parameters
input_shape = (300, 300, 3)  # Adjust to actual image size and channels
num_classes = 4  # Number of classes

# Create model
model = build_transformer_with_type_model(num_types, input_shape, num_classes)

# Train the model
history = model.fit(
    [X_train, types_encoded.reshape(-1, 1)],  # Ensure type input is the correct shape
    y_train,
    batch_size=32,
    epochs=8,
    validation_split=0.2
)

# Save the model
model.save('../models/transformer_with_type_aug_model.h5')

# Visualize the model structure
plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)


y_train shape before squeeze: (2267, 4)
y_train shape after squeeze: (2267, 4)
Epoch 1/8
Epoch 2/8
Epoch 3/8
Epoch 4/8
Epoch 5/8
Epoch 6/8
Epoch 7/8
Epoch 8/8
('Failed to import pydot. You must `pip install pydot` and install graphviz (https://graphviz.gitlab.io/download/), ', 'for `pydotprint` to work.')
