<a href="https://colab.research.google.com/github/Annie00000/Project/blob/main/1_10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import os
from PIL import Image
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import random
import time
from tensorflow.keras.preprocessing.image import load_img, img_to_array

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.utils import Sequence



In [None]:
# cpu
import os

cpu_cores = os.cpu_count()
print("CPU cores:", cpu_cores)


# GPU
import tensorflow as tf

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    for gpu in gpus:
        print(gpu)
else:
    print("No GPU found")

## Data augmentation (自定義數據增強)

#### 不使用augmentation_dict,固定做隨機旋轉10度

In [None]:
from scipy.ndimage import rotate

class CustomDataGenerator(Sequence):
    def __init__(self, image_paths, labels, batch_size, target_size, label_to_index, num_classes, shuffle=True):
        self.image_paths = np.array(image_paths)
        self.labels = np.array(labels)
        self.batch_size = batch_size
        self.target_size = target_size
        #self.augmentation_dict = augmentation_dict
        self.label_to_index = label_to_index
        self.num_classes = num_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.image_paths) / float(self.batch_size)))

    def __getitem__(self, idx):
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_images = []
        batch_labels = []

        for i in batch_indices:
            img_path = self.image_paths[i]
            label = self.labels[i]

            # 加载和预处理图像 (直接在 TensorFlow 中載入和解碼映像。這意味著從一開始圖像就是以 Tensor 的形式存在的)
            img = tf.io.read_file(img_path)
            img = tf.image.decode_image(img, channels=3)
            img = tf.image.resize(img, self.target_size)

            # 应用数据增强
            img = self.apply_augmentation(img, label)/ 255 # 規一化
            batch_images.append(img)
            batch_labels.append(self.label_to_index[label])

        return tf.convert_to_tensor(batch_images), to_categorical(batch_labels, num_classes=self.num_classes)

    def apply_augmentation(self, image):
        # 随机旋转（正负 10 度）
        rotation_degree = random.uniform(-10, 10) #np.random.uniform(-10, 10)
        image = rotate(image, rotation_degree, reshape=False, mode='nearest')
          # reshape=False 保证旋转后的图像大小不变，但这可能导致图像的一部分被裁剪
          # mode 决定了在旋转过程中如何处理图像边界之外的像素

        # 对比度增强
        contrast_factor = 1.5  # 可以根据需要调整这个值
        image = self.adjust_contrast(image, contrast_factor)

        return image

    def adjust_contrast(self, image, contrast_factor):
        """调整图像的对比度"""
        mean = np.mean(image, axis=(0, 1), keepdims=True)
        adjusted = (image - mean) * contrast_factor + mean
        return np.clip(adjusted, 0, 255)


    def on_epoch_end(self):
        self.indices = np.arange(len(self.image_paths))
        if self.shuffle:
            np.random.shuffle(self.indices)



# 创建数据生成器实例
train_generator = CustomDataGenerator(
    train_paths, train_labels, batch_size=32, target_size=(224, 224),
     label_to_index=label_to_index,
    num_classes=len(unique_labels), shuffle=True
)
val_generator = CustomDataGenerator(
    val_paths, val_labels, batch_size=32, target_size=(224, 224),
    label_to_index=label_to_index,
    num_classes=len(unique_labels), shuffle=False
)

## Logger

* 補充: sys.path.insert(1, '../../path') 是Python中一种修改模块搜索路径的用法

In [None]:
def daily_logger():
  local_time = datetime.datetime.now()
  pid = os.getpid()
  with open('../../../../../sever_ip','r') as file:
    server_IP = file.readline().strip()
  #server_IP = socket.gethostbyname(socket.gethostname()) # 獲取當前主機的IP地址
  date = local_time.strftime('%Y-%m-%d')
  if 'log' not in os.listdir('./'):
    os.mkdir('./log')
  logger = logging.getLogger(f'{date}__log') # 创建或获取一个名为 '{date}__log' 的日志记录器（logger）对象，并将其赋值给变量 logger
  if not logger.handlers:
    handler = logging.FileHandler(f'./log/{date}.txt')
    handler.setLevel(logging.INFO)
    formatter = logging.Formater(f'[%(asctime)s__{server_IP}__{pid}]:%(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    handler.setFormatter(formatter) # 将上面创建的格式化器应用到文件处理器(handler)上。 !!!!! 增加這行!!!!!
    logger.addHandler(handler)
  return logger

In [None]:
def daily_logger():
  local_time = datetime.datetime.now()
  pid = os.getpid() #取得目前進程的進程ID（PID） (Process Identifier)
  with open('../../../../../sever_ip','r') as file:
    server_IP = file.readline().strip() # strip:去除行尾的空白字符
  #server_IP = socket.gethostbyname(socket.gethostname()) # 獲取當前主機的IP地址
  user = 'system'
  date = local_time.strftime('%Y-%m-%d') # 將變數 local_time 格式化為 'YYYY-MM-DD' 格式的字串
  if 'log' not in os.listdir('./'):  # 检查当前目录（.）下是否有名为 'log' 的目录。如果没有，创建这个目录。
    os.mkdir('./log')
  logger = logging.getLogger(f'{date}__log') # 创建或获取一个名为 '{date}__log' 的日志记录器（logger）对象，并将其赋值给变量 logger

  if not logger.handlers: # 检查 logger 是否没有任何处理器（handlers）
    handler = logging.FileHandler(f'./log/{date}.txt')  # 建立一個新的檔案處理器（FileHandler），用於將日誌寫入到路徑為 './log/{date}.txt' 的文件
    handler.setLevel(logging.INFO)
    formatter = logging.Formatter(f'[%(asctime)s__{server_IP}__{pid}]:{server_IP}-{user}, %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    # 创建一个格式化器（Formatter），用于定义日志消息的格式。这里的格式包括时间戳、服务器IP、进程ID、用户和日志消息。
    handler.setFormatter(formatter) # 将上面创建的格式化器应用到文件处理器(handler)上。
    logger.addHandler(handler) # 将文件处理器(handler) 添加到日志记录器 logger 上。
  return logger

## sys.path.insert(1, '...')

1. sys.path 列表：

  sys.path 是一个字符串列表，用于指定解释器搜索模块的目录。默认情况下，它包含当前脚本的目录和Python的安装目录。
2. sys.path.insert(index, path) 方法：

  * 此方法用于在 sys.path 列表的指定索引位置插入一个新的路径。在这种情况下，'../../path' 是您希望添加的新路径。
  * index 参数 1 意味着新路径被插入在列表的开始位置，紧随原始的第一个条目之后。这确保了在默认目录之前搜索您指定的目录。

3. 路径 '../../path'：

  * '../../path' 是一个相对路径。这个路径是相对于当前脚本运行目录的上上级目录中的 path 目录。
  * 例如，如果您的脚本位于 /home/user/projects/myproject/scripts 目录下，../../path 将解析为 /home/user/path。
4. 用途：

  * 这种做法通常用于临时添加项目的特定目录到模块搜索路径中，特别是当您希望导入不在标准模块搜索路径中的模块时。
  * 它允许您在不修改环境变量的情况下，临时扩展解释器的模块搜索范围。
5. 注意事项：

  * 修改 sys.path 可能会对模块的导入顺序产生影响，有时可能导致意外的行为，特别是如果存在同名模块的情况。
  * 在更大的项目中，更稳妥的方法是使用虚拟环境和适当的包结构来管理模块。

## 紀錄log以及分段執行时间

* 情況一

In [None]:
import logging
import time

# 设置日志记录器
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__) # __name__ 变量包含了模块的名字，正在创建一个与当前模块名相关联的日志记录器。使得日志消息更容易跟踪到特定的模块。

def your_function():
    # 记录函数开始时间
    start_time = time.time()

    # 这里是您的代码逻辑
    time.sleep(2)  # 举例，模拟执行时间

    # 记录函数结束时间
    end_time = time.time()

    # 计算并记录执行时间
    elapsed_time = end_time - start_time
    logger.info(f'your_function 执行耗时: {elapsed_time} 秒')

def another_function():
    # 类似地，为其他函数执行计时
    start_time = time.time()

    # 模拟一些操作
    time.sleep(1)

    end_time = time.time()
    elapsed_time = end_time - start_time
    logger.info(f'another_function 执行耗时: {elapsed_time} 秒')

# 调用函数
your_function()
another_function()

* 情況二

In [None]:
### 設置log ###
import logging
import os

# 创建一个logger
logger = logging.getLogger('my_automation_system')
logger.setLevel(logging.INFO)

# 创建一个handler，用于写入日志文件
log_file = './my_automation_system.log'
if not os.path.exists(os.path.dirname(log_file)):
    os.makedirs(os.path.dirname(log_file))
file_handler = logging.FileHandler(log_file)

# 设置日志记录格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)

# 将handler添加到logger
logger.addHandler(file_handler)

In [None]:
### 紀錄時間  ###
import time

def log_execution_time(func):
    """一个装饰器，用于测量函数执行时间并记录到日志"""
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        logger.info(f"执行 {func.__name__} 耗时 {end_time - start_time} 秒")
        return result
    return wrapper

# 示例：使用装饰器来测量函数的执行时间
@log_execution_time
def my_task():
    # 模拟一些工作
    time.sleep(2)
    print("任务完成")

my_task()

我们定义了一个装饰器 log_execution_time，它可以被应用于任何函数上。当被装饰的函数执行时，该装饰器会计算并记录该函数的执行时间。

这种方法非常适合于自动化系统，其中您可能需要跟踪多个任务的执行时间，同时保持日志记录的一致性和准确性。

请根据您的实际需求调整日志记录的细节，比如日志级别、日志格式或日志文件的位置。

## subprogram

* class name

In [None]:
# 从训练数据中提取
data_dir = '/path/to/training/data'
class_labels = [folder_name for folder_name in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, folder_name))]


### 1. train model

In [None]:
def retrained_model():


In [None]:
import os
import datetime
import tensorflow as tf

def retrain_model(train_data, test_data, model, model_name, model_save_path, log_file_path):
    """
    重新训练模型并保存结果。

    :param train_data: 训练数据。
    :param test_data: 测试数据。
    :param model: 要训练的模型。
    :param model_name: 模型名称。
    :param model_save_path: 模型保存路径。
    :param log_file_path: 日志文件保存路径。
    """
    # 训练模型
    model.fit(train_data, validation_data=val_data, epochs=10, callbacks=callbacks)


    # 评估模型
    test_loss, test_accuracy = model.evaluate(test_data)

    # 保存模型 (不需要的話可以改動)
    timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    model_filename = f"{model_name}_{timestamp}.h5"
    model.save(os.path.join(model_save_path, model_filename))

    # 记录到日志文件 ('a'為追加，'w'為覆蓋原始寫的)(詢問一下要覆蓋還是追加)
    with open(log_file_path, 'a') as log_file:
        log_file.write(f"{model_filename}: Test Accuracy = {test_accuracy}\n")

    return test_accuracy, model_filename


In [None]:
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split
from datetime import datetime

def retrain_model(data_dir, model_path, model_name, log_file_path, batch_size, target_size, augmentation_dict, label_to_index, num_classes, test_size=0.2):
    # 读取所有影像路径和标签
    image_paths = []
    labels = []

    for class_folder in os.listdir(data_dir):
        class_folder_path = os.path.join(data_dir, class_folder)
        for file in os.listdir(class_folder_path):
            fpath = os.path.join(class_folder_path, file)
            image_paths.append(fpath)
            labels.append(class_folder)

    # 切分成训练集和验证集
    train_paths, val_paths, train_labels, val_labels = train_test_split(image_paths, labels, test_size=test_size, stratify=labels, shuffle=True, random_state=42)

    # 创建数据生成器实例
    train_generator = CustomDataGenerator(
        train_paths, train_labels, batch_size=batch_size, target_size=target_size,
        augmentation_dict=augmentation_dict, label_to_index=label_to_index,
        num_classes=num_classes, shuffle=True, apply_clahe=True
    )
    val_generator = CustomDataGenerator(
        val_paths, val_labels, batch_size=batch_size, target_size=target_size,
        augmentation_dict={}, label_to_index=label_to_index,
        num_classes=num_classes, shuffle=False, apply_clahe=False
    )

    # 加载模型
    model = load_model(model_path)

    # 重新训练模型
    model.fit(
        train_generator,
        validation_data=val_generator,
        epochs=10,
        callbacks=[early_stopping, model_checkpoint, reduce_lr]
    )

    # 评估模型
    test_loss, test_accuracy = model.evaluate(val_generator)

    # 保存模型
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    saved_model_name = f"{model_name}_{timestamp}.h5"
    model.save(saved_model_name)

    # 记录到日志文件
    with open(log_file_path, 'a') as log_file:
        log_file.write(f"{timestamp}: Test Accuracy = {test_accuracy}\n")

    return test_accuracy, saved_model_name

In [None]:

def load_data(data_dir):
    """
    从指定的文件夹加载图像路径和标签。

    :param data_dir: 包含图像的文件夹的路径。
    :return: 图像路径列表和相应的标签列表。
    """
    image_paths = []
    labels = []

    # 遍历文件夹中的所有子文件夹
    for class_folder_name in os.listdir(data_dir):
        class_folder_path = os.path.join(data_dir, class_folder_name)

        # 确保它是一个文件夹
        if os.path.isdir(class_folder_path):
            # 遍历文件夹中的所有图像文件
            for image_file in os.listdir(class_folder_path):
                image_path = os.path.join(class_folder_path, image_file)

                # 只处理图像文件
                if image_file.lower().endswith(('.png', '.jpg', '.jpeg')):
                    image_paths.append(image_path)
                    labels.append(class_folder_name)

    return image_paths, labels

* 步骤 1：创建模块文件

In [None]:
# image_classification_trainer.py

import os
import datetime
from sklearn.model_selection import train_test_split
import tensorflow as tf

class ImageClassificationTrainer:
    def __init__(self, data_dir, model, model_save_path, log_file_path):
        self.data_dir = data_dir
        self.model = model
        self.model_save_path = model_save_path
        self.log_file_path = log_file_path

    def load_data(self):

        pass

    def retrained_model(self, test_data_generator):
        # 读取数据并拆分训练/验证集
        image_paths, labels = self.load_data()
        train_paths, val_paths, train_labels, val_labels = train_test_split(image_paths, labels, test_size=0.2, stratify=labels, shuffle=True, random_state=42)

        # 创建训练和验证数据生成器
        train_generator = CustomDataGenerator(train_paths, train_labels, ...)
        val_generator = CustomDataGenerator(val_paths, val_labels, ...)

        # 训练模型
        self.model.fit(train_generator, validation_data=val_generator, epochs=10, ...)

        # 评估模型
        test_loss, test_accuracy = self.model.evaluate(test_data_generator)

        # 保存模型和记录信息
        timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
        model_filename = f"model_{timestamp}.h5"
        self.model.save(os.path.join(self.model_save_path, model_filename))
        self.record_info(model_filename, test_accuracy)

        return f"{model_filename}", test_accuracy

    def record_info(self, model_filename, test_accuracy):
        with open(self.log_file_path, 'a') as log_file:
            log_file.write(f"Model: {model_filename}, Test Accuracy: {test_accuracy}, Date: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")

# 使用示例
model = # 加载您的模型
trainer = ImageClassificationTrainer('/path/to/data', model, '/path/to/save/model', '/path/to/log.txt')
test_data_generator = # 创建您的测试数据生成器
model_name, test_accuracy = trainer.retrained_model(test_data_generator)

* 步骤 2：在其他程序中导入

In [None]:
from image_classification_trainer import ImageClassificationTrainer
import tensorflow as tf

# 假设您的模型定义如下
model = tf.keras.models.load_model('/path/to/your/model.h5')

# 创建训练器实例
trainer = ImageClassificationTrainer('/path/to/data', model, '/path/to/save/model', '/path/to/log.txt')

# 创建测试数据生成器
test_data_generator = # 创建测试数据生成器

# 使用训练器进行重新训练
model_name, test_accuracy = trainer.retrained_model(test_data_generator)

### 2. predict

* 判断给定路径的文件夹内是否包含其他子文件夹。

In [None]:
import os

def predict_images_based_on_folder_structure(folder_path, model, target_size, class_labels):
    """
    根据文件夹结构预测图片。

    :param folder_path: 文件夹路径。
    :param model: 预训练的模型。
    :param target_size: 图像目标尺寸。
    :param class_labels: 类别标签列表。
    :return: 预测结果。
    """
    # 检查是否存在子文件夹
    contains_subfolders = any(os.path.isdir(os.path.join(folder_path, item)) for item in os.listdir(folder_path))

    if contains_subfolders:
        # 如果存在子文件夹，使用多文件夹预测函数
        return predict_multiple_folders(folder_path, model, target_size, class_labels)
    else:
        # 否则，使用单文件夹预测函数
        return predict_single_folder(folder_path, model, target_size, class_labels)


* 單個文件夾

In [None]:
def predict_single_folder(folder_path, model_path):
    model = load_model(model_path)

    predictions = {}
    for filename in os.listdir(folder_path):
        if filename.lower().endswith('.png'):
            img_path = os.path.join(folder_path, filename)
            img = load_and_preprocess_image(img_path)
            pred = model.predict(np.expand_dims(img, axis=0))
            predicted_class = class_labels[np.argmax(pred)]
            predictions[filename] = {
                'class': predicted_class,
                'probabilities': pred[0].tolist()
            }
    return predictions

* 一個文件夾下有多個文件夾

In [None]:
def predict_multiple_folders(parent_folder_path, model_path):
    model = load_model(model_path)
    all_predictions = {}
    for folder_name in os.listdir(parent_folder_path):
        folder_path = os.path.join(parent_folder_path, folder_name)
        if os.path.isdir(folder_path):
            predictions = predict_single_folder(folder_path, model_path)
            all_predictions[folder_name] = predictions
    return all_predictions

* 預測

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import img_to_array, load_img

def predict_images(model_version, image_folder_path, target_size, class_labels):
    """
    使用特定版本的模型预测图片文件夹中的所有图片。

    :param model_version: 模型版本（模型文件名）。
    :param image_folder_path: 图片文件夹路径。
    :param target_size: 图像目标尺寸。
    :param class_labels: 类别标签列表。
    :return: 预测结果字典。
    """
    # 加载模型
    model = tf.keras.models.load_model(os.path.join('/path/to/saved/models', model_version))

    # 预测结果字典
    predictions = {}

    # 遍历文件夹中的图片
    for filename in os.listdir(image_folder_path):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            img_path = os.path.join(image_folder_path, filename)
            img = load_img(img_path, target_size=target_size)
            img_array = img_to_array(img) / 255.0
            img_array = np.expand_dims(img_array, axis=0)

            # 进行预测
            pred = model.predict(img_array)
            class_index = np.argmax(pred, axis=1)[0]
            class_name = class_labels[class_index]
            probabilities = pred[0].tolist()

            # 保存预测结果
            predictions[filename] = {'class': class_name, 'probabilities': probabilities}

    return predictions

# 使用示例
model_version = 'model_20210908_1530.h5'  # 模型版本
image_folder_path = '/path/to/image/folder'  # 图片文件夹路径
target_size = (224, 224)  # 目标尺寸
class_labels = ['class1', 'class2', 'class3', ...]  # 类别标签

# 进行预测
predictions = predict_images(model_version, image_folder_path, target_size, class_labels)

# 打印或处理预测结果
for filename, pred in predictions.items():
    print(f"{filename}: Class = {pred['class']}, Probabilities = {pred['probabilities']}")
