# Generate mock data

In [None]:
import random
import json
import os
import numpy as np
import tensorflow as tf
import cv2

# Part 1: Generate Mock Data
data_dir = "data"
dict_dir="mock_classes.json"
# Generating mock image names and a JSON file
num_images = 1000
class_labels = ["cat", "dog", "bird", "fish"]
mock_image = np.array(np.random.randint(0, 256, [5, 5, 3]), dtype=np.uint8)
mock_json = {
    1: ["18274", "cat"],
    2: ["15938", "dog"],
    3: ["70382", "bird"],
    4: ["28673", "fish"],
}
os.makedirs(data_dir, exist_ok=True)
for i in range(500):
    for class_idx in mock_json:
        cv2.imwrite(os.path.join(data_dir, f"{mock_json[class_idx][0]}_{i:04d}.JPEG"), mock_image)
json.dump(mock_json, open("mock_classes.json", "w"))

# Step 1
Read image xxxxx_xxxx.jpeg (imageID_xxxx.jpeg). Return its class index and class label.
Input:
.json dictionary. key: class index, value: [imageID, class label]
image folder
Output:
[class index, class label]


In [None]:
check_dict = json.load(open(dict_dir, "r"))
imgid_class = {}
for class_idx in check_dict:
    imgid_class[check_dict[class_idx][0]] = [int(class_idx), check_dict[class_idx][1]]

def get_idx_lbl(img_name):
    segments = img_name.split("_")
    imgid = segments[0]
    return imgid_class[imgid]

# for item in os.listdir(data_dir):
#     print(get_idx_lbl(item))

# Step 2: Prediction
Randomly sample 500 images in the directory.

Create a 4-d tensorflow tensor X.

get the corresponding labels with the method provided in step 1.

Shuffle the images. Predict with two models.

In [None]:
import random
batch_size = 500
all_images = os.listdir(data_dir)
samples = random.sample(all_images, 500)

def load_and_preprocess_image(image_path):
    # Load the image
    image = cv2.imread(image_path)
    # Resize the image (assuming 224x224 for this example)
    # image = cv2.resize(image, (224, 224))
    # Convert the image to a float32 numpy array and normalize it
    image = image.astype('float32') / 255.0
    image = np.transpose(image, (2, 0, 1))
    return image

# preprocessed_images = [load_and_preprocess_image(os.path.join(data_dir, img_name)) for img_name in samples]
# preprocessed_labels = [get_idx_lbl(img_name)[0] for img_name in samples]
# images_tensor = tf.convert_to_tensor(preprocessed_images)
# labels_tensor = tf.convert_to_tensor(preprocessed_labels)

images = [load_and_preprocess_image(os.path.join(data_dir, img_name)) for img_name in all_images]
labels = [get_idx_lbl(img_name)[0] for img_name in all_images]
images_tensor = tf.convert_to_tensor(images)
labels_tensor = tf.convert_to_tensor(labels)
file_names_tensor = tf.convert_to_tensor(all_images)

dataset = tf.data.Dataset.from_tensor_slices((images_tensor, labels_tensor, file_names_tensor))

# # 如果权重不平衡
# from sklearn.utils.class_weight import compute_class_weight
# # 计算类别权重
# class_weights = compute_class_weight('balanced', classes=np.unique(labels_tensor.numpy()), y=labels_tensor.numpy())
# # 创建一个权重映射（class_weights_map），使得每个标签都对应一个权重
# class_weights_map = {i: weight for i, weight in enumerate(class_weights)}
# # 创建一个加权的数据集
# weighted_dataset = tf.data.Dataset.from_tensor_slices((images_tensor, labels_tensor, file_names_tensor, sample_weights))
# # 进行加权随机采样
# # 这里的steps_per_epoch是您预期的每个epoch中的步骤数，通常等于样本总数除以batch大小
# shuffled_dataset = weighted_dataset.shuffle(buffer_size=len(weighted_dataset)).map(lambda x, y, z, _: (x, y, z))


buffer_size = 1000
shuffled_dataset = dataset.shuffle(buffer_size)


from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten

# Define a simple model
model1 = Sequential([
    Flatten(input_shape=(3, 5, 5)),  # Flatten the input
    Dense(128, activation='relu'),       # Dense layer with 128 units
    Dense(len(class_labels), activation='softmax')  # Output layer
])

# Compile the model
model1.compile(optimizer='adam',
      loss='sparse_categorical_crossentropy',
      metrics=['accuracy'])

# Define a simple model
model2 = Sequential([
    Flatten(input_shape=(3, 5, 5)),
    Dense(256, activation='relu'),
    Dense(len(class_labels), activation='softmax')
])

# Compile the model
model2.compile(optimizer='adam',
      loss='sparse_categorical_crossentropy',
      metrics=['accuracy'])

train_size = 1800
test_size = 100
# 分割数据集为训练集和测试集
train_dataset = shuffled_dataset.take(train_size)
test_dataset = shuffled_dataset.skip(train_size)

# 设置批量大小
batch_size = 200

# 批量化测试数据
test_dataset = test_dataset.batch(batch_size)

# 使用模型进行批量预测
y_pred1 = []
y_pred2 = []
y_true = []

for img_batch, label_batch, _ in test_dataset:
    pred_batch1 = model1.predict(img_batch)
    pred_class_batch1 = tf.argmax(pred_batch1, axis=1).numpy()
    y_pred1.extend(pred_class_batch1)

    pred_batch2 = model2.predict(img_batch)
    pred_class_batch2 = tf.argmax(pred_batch2, axis=1).numpy()
    y_pred2.extend(pred_class_batch2)
    y_true.extend(label_batch.numpy())

y_pred1 = np.array(y_pred1)
y_pred2 = np.array(y_pred2)
y_true = np.array(y_true)



# Step 3: Calculate accuracy


In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


# Accuracy = TP/TOT
# Precision = TP/(TP + FP)
# Recall = TP/(TP + FN)
# F1 = 2 * Precision * Recall / (Precision + Recall)


accuracy1 = accuracy_score(y_true, y_pred1)
accuracy2 = accuracy_score(y_true, y_pred2)
precision1 = precision_score(y_true, y_pred1, average='weighted', zero_division=0)
precision2 = precision_score(y_true, y_pred2, average='weighted', zero_division=0)
recall1 = recall_score(y_true, y_pred1, average='weighted')
recall2 = recall_score(y_true, y_pred2, average='weighted')
f1_model1 = f1_score(y_true, y_pred1, average='weighted')
f1_model2 = f1_score(y_true, y_pred2, average='weighted')

print("Accuracy:", accuracy1, accuracy2)
print("Precision of Model 1:", precision1, "Precision of Model 2:", precision2)
print("Recall of Model 1:", recall1, "Recall of Model 2:", recall2)
print("F1 Score of Model 1:", f1_model1, "F1 Score of Model 2:", f1_model2)

Accuracy: 0.245 0.275
Precision of Model 1: 0.060024999999999995 Precision of Model 2: 0.07562500000000001
Recall of Model 1: 0.245 Recall of Model 2: 0.275
F1 Score of Model 1: 0.09642570281124498 F1 Score of Model 2: 0.11862745098039218


# Step 4: Get the correct predictions

In [None]:
import os
import shutil

correct_predictions_dir = "model_correct"
os.makedirs(correct_predictions_dir, exist_ok=True)

correct_predictions = y_pred1 == y_true

correct_files = []
index = 0
for _, _, file_name_tensor in test_dataset:
    for file_name in file_name_tensor:
        if correct_predictions[index]:
            correct_files.append(file_name.numpy().decode())
        index += 1

for file_name in correct_files:
    source_path = os.path.join(data_dir, file_name)
    destination_path = os.path.join(correct_predictions_dir, file_name)
    shutil.copy(source_path, destination_path)