## simclr

In [1]:
import os
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision.transforms import InterpolationMode
import torchvision.models as models
from torchvision.models import ResNet18_Weights
import pytorch_lightning as pl
from lightning.pytorch.loggers import WandbLogger
import lightly 
import matplotlib.pyplot as plt
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import normalize
from PIL import Image
import numpy as np
from lightly import data
import glob
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import BatchSampler

In [2]:
NUM_CLASSES = 6379
TEST_NUM_CLASSES = 24
num_workers = 0#int(os.cpu_count()//2)
seed = 1
max_epochs = 10
#input_size = 128
#num_ftrs = 32

In [3]:
pl.seed_everything(seed)

Seed set to 1


1

In [4]:
path_to_data_train = '/home/abababam1/HandwrittenTextAlign/PRMU/simclr/data/train'
path_to_data_test = '/home/abababam1/HandwrittenTextAlign/PRMU/simclr/data/test'
path_to_data_test1 = '/home/abababam1/HandwrittenTextAlign/PRMU/simclr/data/test2'

In [5]:
transform_train = transforms.Compose([
    transforms.Resize((64, 63),antialias=True),  # 画像のサイズ変更
    transforms.Grayscale(num_output_channels=1), #single-channel
    transforms.ElasticTransform(alpha=200.0, sigma=10.0, interpolation=InterpolationMode.BILINEAR, fill=255),
    transforms.RandomAffine(degrees=(-20, 20), scale=(0.8, 1.2), fill = 255),
    transforms.ToTensor(),           # テンソルに変換
    transforms.Normalize((0.5,), (0.5,)) #single-channel normalization
])
transform_test = transforms.Compose([
    transforms.Resize((64, 63),antialias=True),  # 画像のサイズ変更
    transforms.Grayscale(num_output_channels=1), #single-channel
    transforms.ToTensor(),           # テンソルに変換
    transforms.Normalize((0.5,), (0.5,)) #single-channel normalization
])

transform_simclr = transforms.Compose([
    transforms.Resize((64, 63),antialias=True),  # 画像のサイズ変更
    transforms.Grayscale(num_output_channels=1), #single-channel
    transforms.ElasticTransform(alpha=200.0, sigma=10.0, interpolation=InterpolationMode.BILINEAR, fill=255),
    transforms.RandomAffine(degrees=(-20, 20), scale=(0.8, 1.2), fill = 255),
    #transforms.ToTensor(),           # テンソルに変換
    transforms.Normalize((0.5,), (0.5,)) #single-channel normalization
])

#### データローダー

・バッチサイズ（字ごとに異なる）
・画足りない情報: ラベルに入れる
・バッチサイズの指定を変える

In [6]:
def convert_rgba_to_rgb_with_background(image, background=(255, 255, 255)):
    """RGBA画像を指定した背景色でRGB画像に変換"""
    if image.mode == 'RGBA':
        # 背景を指定して新しい画像を作成
        background_image = Image.new('RGB', image.size, background)
        # アルファチャンネルを使用してマスクを適用
        background_image.paste(image, mask=image.split()[3])  # アルファチャンネルでマスク
        return background_image
    return image  # すでにRGBの場合はそのまま返す

def label_data_dict(path_to_data):
    d = dict() # 画像に対しラベル
    class_indices = dict() # ラベルに対し画像が何個あるか
    for idx, path in enumerate(glob.glob(f'{path_to_data}/*/*.png')):
        char = path.split('/')[-2]
        d[path] = char
        
        if char not in class_indices:
            class_indices[char] = [idx]
        else:
            class_indices[char] += [idx]
    return d, class_indices

class CustomDataset(Dataset):
    def __init__(self, path_to_data, transform=None):
        self.image_paths = []
        self.labels = []
        #self.classes = classes
        
        data, _ = label_data_dict(path_to_data)
        self.image_paths.extend(list(data.keys()))
        self.labels.extend(list(data.values()))

        self.classes = sorted(set(self.labels))

        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, index):
        image_path = self.image_paths[index]
        label = self.labels[index]

        label_index = self.classes.index(label)
        
        image = Image.open(image_path)
        image = convert_rgba_to_rgb_with_background(image, background=(255, 255, 255))  # 白背景
        #image = Image.open(image_path).convert('RGB')

        if self.transform:
            image = self.transform(image)


        return image, label_index

# class_indices: {'label':[data], ...}
class ClassBatchSampler:
    def __init__(self, class_indices):
        self.class_indices = class_indices
        self.classes = list(class_indices.keys())
        #self.current_class = 0

    def __iter__(self):
        # 各クラスのインデックスを順に返す
        for class_label in self.classes:
            indices = self.class_indices[class_label]
            #print(f"Sampling indices for class {class_label}: {indices}", flush=True)  # デバッグ出力
            yield indices

    def __len__(self):
        return len(self.classes)
        

#----------------------------------------------------------------------

# データセットを作成
#dataset = CustomDataset(path_to_data, transform=transform)

#_, class_indices = label_data_dict(path_to_data)

# サンプラーを使ってデータローダーを作成
#sampler = ClassBatchSampler(class_indices)
#dataloader = DataLoader(dataset, batch_sampler=sampler)

# データローダーでクラスごとにデータを取得
#for batch_idx, (data, labels) in enumerate(dataloader):
#    print(f"Batch {batch_idx}:")
#    print(f"Data: {data}")
#    print(f"Labels: {labels}")
#    print(f"Batch size: {len(data)}")

In [7]:
# データセットを作成
dataset_train = CustomDataset(path_to_data_train, transform=transform_train)
dataset_test = CustomDataset(path_to_data_test, transform=transform_test)

_, class_indices_train = label_data_dict(path_to_data_train)
_, class_indices_test = label_data_dict(path_to_data_test)

# サンプラーを使って訓練データローダーを作成
sampler = ClassBatchSampler(class_indices_train)
dataloader_train = DataLoader(dataset_train, batch_sampler=sampler, num_workers=num_workers)

# サンプラーを使ってテストデータローダーを作成
sampler = ClassBatchSampler(class_indices_test)
dataloader_test = DataLoader(dataset_test, batch_sampler=sampler)

#### モデル

In [8]:
# Initialization of Conv2D parameters according to He et al. (2015)
def he_init(conv2d_layer):
    # kernel size: k1 x k2
    k1, k2 = conv2d_layer.kernel_size

    # input channel
    c = conv2d_layer.in_channels

    # number of summands
    n = k1 * k2 * c
    
    # good standard deviation : sqrt(2/n)
    std = (2 / n) ** 0.5

    # init kernel params ~ Normal(0, std^2)
    nn.init.normal_(conv2d_layer.weight, mean=0.0, std=std)

    # init bias = 0
    nn.init.zeros_(conv2d_layer.bias)
    
class Net(nn.Module):
    def __init__(self,num_classes=NUM_CLASSES):
        super(Net, self).__init__()
        # mtzk: p=0.005 is too small
        #self.embedding_dropout = nn.Dropout(p = 0.005)
        self.embedding_dropout = nn.Dropout(p=0.5)
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool2d(2, stride=2)

        #self.conv1 = nn.Conv2d(3,64,3)
        self.conv1 = nn.Conv2d(1,64,3) #single-channel
        # nn.init.normal_(self.conv1.weight, mean=0.0, std=0.1)
        he_init(self.conv1)
        self.bn1 = nn.BatchNorm2d(num_features=64)

        self.conv2 = nn.Conv2d(64,128,3)
        # nn.init.normal_(self.conv2.weight, mean=0.0, std=0.1)
        he_init(self.conv2)
        self.bn2 = nn.BatchNorm2d(num_features=128)

        self.conv3 = nn.Conv2d(128,512,3)
        # nn.init.normal_(self.conv3.weight, mean=0.0, std=0.1)
        he_init(self.conv3)
        self.bn3 = nn.BatchNorm2d(num_features=512)

        self.conv4 = nn.Conv2d(512,512,3)
        # nn.init.normal_(self.conv4.weight, mean=0.0, std=0.1)
        he_init(self.conv4)

        # mtzk: BatchNorm2d は学習パラメータがあるので層ごとに違うのを使うべき
        self.bn4 = nn.BatchNorm2d(num_features=512)

        self.fc1 = nn.Linear(512 * 5 * 5, 4096)
        # mtzk: - std changed according to He et al. (2015)
        #       - init bias = zero
        # nn.init.normal_(self.fc1.weight, mean=0.0, std=0.1)
        nn.init.normal_(self.fc1.weight, mean=0.0, std=0.01)
        nn.init.zeros_(self.fc1.bias)

        # mtzk: 2023-12-28: BN also in fc
        self.bn_fc1 = nn.BatchNorm1d(num_features=4096)

        self.fc2 = nn.Linear(4096, 4096)
        # nn.init.normal_(self.fc2.weight, mean=0.0, std=0.1)
        nn.init.normal_(self.fc2.weight, mean=0.0, std=0.01)
        nn.init.zeros_(self.fc2.bias)

        # mtzk: 2023-12-28: BN also in fc
        self.bn_fc2 = nn.BatchNorm1d(num_features=4096)

        self.fc3 = nn.Linear(4096, num_classes)
        # nn.init.normal_(self.fc3.weight, mean=0.0, std=0.1)
        nn.init.normal_(self.fc3.weight, mean=0.0, std=0.001)
        nn.init.zeros_(self.fc3.bias)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.pool(x)
        x = self.embedding_dropout(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.pool(x)
        x = self.embedding_dropout(x)
        
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu(x)
        x = self.embedding_dropout(x)
        x = self.conv4(x)
        # mtzk: BatchNorm2d は学習パラメータがあるので層ごとに違うのを使うべき
        # x = self.bn3(x)
        x = self.bn4(x)
        x = self.relu(x)
        x = self.pool(x)
        x = self.embedding_dropout(x)
        
        x = x.view(x.size()[0], -1)
        x = self.fc1(x)
        # mtzk: 2023-12-28: BN also in fc
        x = self.bn_fc1(x)
        x = self.relu(x)
        x = self.embedding_dropout(x)
        x = self.fc2(x)
        # mtzk: 2023-12-28: BN also in fc
        x = self.bn_fc2(x)
        x = self.relu(x)
        x = self.embedding_dropout(x)
        x = self.fc3(x)
        return x

In [9]:
from lightly.models.modules.heads import SimCLRProjectionHead
from lightly.loss import NTXentLoss


class SimCLRModel(pl.LightningModule):
    def __init__(self, batch_size=10, transform=None):
        super().__init__()
        
        self.batch_size = batch_size 

        # create a ResNet backbone and remove the classification head
        net = Net()
        self.backbone = Net(num_classes=NUM_CLASSES)
        
        hidden_dim = NUM_CLASSES#net.fc1.in_features
        self.projection_head = SimCLRProjectionHead(
            input_dim=hidden_dim,
            hidden_dim=2048,
            output_dim=128,
            num_layers=2,
            batch_norm=True
        )
        
        self.transform = transform

        self.criterion = NTXentLoss()

    def forward(self, x):
        h = self.backbone(x).flatten(start_dim=1)
        #print(f"Flattened output size: {h.size()}")
        z = self.projection_head(h)
        return z

#    def training_step(self, batch, batch_idx):
        #print(f"Batch content: {batch}")  # バッチの内容を出力して確認
#        (x0, x1), *_ = batch
#        z0 = self.forward(x0)
#        z1 = self.forward(x1)
#        loss = self.criterion(z0, z1)
#        self.log("train_loss_ssl", loss)
#        return loss
    
    def training_step(self, batch, batch_idx):
        images, labels = batch  # 画像とラベルを分けて取得

        # 複数個の異なるビューを作成
        views = [self.transform(images) for _ in range(5)]

        # 各ビューについて順伝播を行い、特徴量を計算
        embeddings = [self.forward(view) for view in views]

        # 損失を計算（例: 各ペアの特徴量間で損失を計算し、その平均をとる）
        total_loss = 0
        num_pairs = 0

        # すべてのペアの組み合わせで損失を計算
        for i in range(len(embeddings)):
            for j in range(i + 1, len(embeddings)):
                total_loss += self.criterion(embeddings[i], embeddings[j])
                num_pairs += 1

        # ペア間の平均損失を計算
        loss = total_loss / num_pairs

        # ログに損失を記録
        self.log("train_loss_ssl", loss)
        return loss

    def configure_optimizers(self):# lr=0.075*(self.batch_size)**(1/2)
        #optim = torch.optim.SGD(
        #    self.parameters(), lr=6e-2, momentum=0.9, weight_decay=5e-4
        #)
        optim = torch.optim.Adam(
            self.parameters(), lr=1e-3, weight_decay=5e-4
        )
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
            optim, max_epochs
        )
        return [optim], [scheduler]

In [10]:
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.optim.lr_scheduler import CosineAnnealingLR
from lightly.models.modules.heads import SimCLRProjectionHead

class SimCLRClassificationModel(pl.LightningModule):
    def __init__(self, batch_size=10, transform=None):
        super().__init__()
        
        # SimCLR のバックボーンを定義
        net = Net()
        self.backbone = Net(num_classes=NUM_CLASSES)
        
        # Projection Head の定義
        hidden_dim = NUM_CLASSES#net.fc1.in_features
        self.projection_head = SimCLRProjectionHead(
            input_dim=hidden_dim,
            hidden_dim=2048,
            output_dim=128,
            num_layers=2,
            batch_norm=True
        )
        
        self.transform = transform
        
        # 分類層の定義
        self.classifier = torch.nn.Linear(128, TEST_NUM_CLASSES)
        
        # 損失関数を定義
        self.criterion = CrossEntropyLoss()

    def forward(self, x):
        h = self.backbone(x).flatten(start_dim=1)
        #print(f"Flattened output size: {h.size()}")
        projections = self.projection_head(h)
        
        # 分類層でクラスごとのスコアを出力
        logits = self.classifier(projections)  # classifier も必ず使用
        return logits

    def training_step(self, batch, batch_idx):
        images, labels = batch  # 画像とラベルを分けて取得

        # 複数個の異なるビューを作成
        views = [self.transform(images) for _ in range(5)]

        # 各ビューについて順伝播を行い、特徴量を計算
        embeddings = [self.forward(view) for view in views]

        # 損失を計算（例: 各ペアの特徴量間で損失を計算し、その平均をとる）
        total_loss = 0
        num_pairs = 0

        # すべてのペアの組み合わせで損失を計算
        for i in range(len(embeddings)):
            for j in range(i + 1, len(embeddings)):
                total_loss += self.criterion(embeddings[i], embeddings[j])
                num_pairs += 1

        # ペア間の平均損失を計算
        loss = total_loss / num_pairs

        # ログに損失を記録
        self.log("train_loss_ssl", loss)
        return loss

#    def validation_step(self, batch, batch_idx):
#        images, labels = batch
        
        # 順伝播で出力を計算
#        logits = self.forward(images)
        
        # 損失と精度を計算
#        loss = self.criterion(logits, labels)
#        acc = (logits.argmax(1) == labels).float().mean()
        
        # ログに記録
#        self.log("val_loss", loss, prog_bar=True)
#        self.log("val_acc", acc, prog_bar=True)

    def configure_optimizers(self):# lr=0.075*(self.batch_size)**(1/2)
        #optim = torch.optim.SGD(
        #    self.parameters(), lr=6e-2, momentum=0.9, weight_decay=5e-4
        #)
        optim = torch.optim.Adam(
            self.parameters(), lr=1e-3, weight_decay=5e-4
        )
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
            optim, max_epochs
        )
        return [optim], [scheduler]

#### 訓練

In [11]:
import torch
print(torch.cuda.is_available())  # GPUが使用可能かどうかを確認
#print(torch.cuda.device_count())  # 使用可能なGPUの数
#print(torch.cuda.get_device_name(0))  # GPUの名前
accelerator='gpu' if torch.cuda.is_available() else 'cpu'
accelerator
devices=2 if torch.cuda.is_available() else 1
devices

True


2

In [12]:
import wandb
wandb.login()

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Using wandb-core as the SDK backend. Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Currently logged in as: [33mabababamb1[0m ([33mabababamb1-tokyo-university-of-science[0m). Use [1m`wandb login --relogin`[0m to force relogin


True

In [13]:
from pytorch_lightning.callbacks import ModelCheckpoint

checkpoint_dir = "/data2/abababam1/HandwrittenTextAlign/simclr/checkpoints"
# チェックポイント保存用のコールバック設定
checkpoint_callback = ModelCheckpoint(
    dirpath=checkpoint_dir,  # 保存先ディレクトリ
    filename="epoch{epoch}-step{step}",  # ファイル名のフォーマット
    save_top_k=1,  # 最新のみ保存
    every_n_epochs=2  # 毎エポック保存
)

In [None]:
%%time
#gpus = [1] if torch.cuda.is_available() else 0

wandb_logger = WandbLogger(log_model="all")

model = SimCLRModel(batch_size=10, transform=transform_simclr)
trainer = pl.Trainer(
    max_epochs=max_epochs, 
    accelerator='gpu' if torch.cuda.is_available() else 'cpu',
    devices=[0,1,2] if torch.cuda.is_available() else 1,  # GPUが使える場合は[2]、使えない場合は1（CPUコア数）
    strategy="ddp_notebook",  # データ並列 (DataParallel)
    enable_progress_bar=True, # 進捗バーを有効化
    log_every_n_steps=100,  # ログの更新間隔を設定
    logger=wandb_logger,  # ログ機能を無効化
    use_distributed_sampler=False,  # 分散サンプラーを無効化
    accumulate_grad_batches=30,
    callbacks=[checkpoint_callback]
)
trainer.fit(model, dataloader_train)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
Initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/3
Initializing distributed: GLOBAL_RANK: 2, MEMBER: 3/3
Initializing distributed: GLOBAL_RANK: 1, MEMBER: 2/3
----------------------------------------------------------------------------------------------------
distributed_backend=nccl
All distributed processes registered. Starting with 3 processes
----------------------------------------------------------------------------------------------------

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.


/data/abababam1/.conda/envs/ImageRecognition/lib/python3.11/site-packages/pytorch_lightning/callbacks/model_checkpoint.py:654: Checkpoint directory /data2/abababam1/HandwrittenTextAlign/simclr/checkpoints exists and is not empty.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1,2]
LOCAL_RANK: 2 - CUDA_VISIBLE_DEVICES: [0,1,2]
LOCAL_RANK: 1 - CUDA_VISIBLE_DEVICES: [0,1,2]

  | Name            | Type                 | Params | Mode 
-----------------------------------------------------------------
0 | backbone        | Net                  | 98.4 M | train
1 | projection_head | SimCLRProjectionHead | 13.3 M | train
2 | criterion       | NTXentLoss           | 0      | train
-----------------------------------------------------------------
111 M     Trainable params
0         Non-trainable params
111 M     Total params
446.892   Total estimated model params size (MB)
26        Modules in train mode
0         Modules in eval mode
/data/abababam1/.conda/envs/ImageRecognition/lib/python3.11/site-p

Epoch 1:  97%|█████████▋| 6158/6379 [43:57<01:34,  2.34it/s, v_num=3od3]

In [24]:
# モデルの状態を保存
torch.save(model.state_dict(), './params/1206-myNet-aug5-10ep.pth')

#### テストデータの埋め込み作成

In [18]:
def generate_embeddings(model, dataloader):
    """Generates representations for all images in the dataloader with
    the given model
    """

    embeddings = []
    filenames = []
    with torch.no_grad():
        #for img, label, fnames in dataloader:
        for img, label in dataloader:
            img = img.to(model.device)
            emb = model.backbone(img).flatten(start_dim=1)
            embeddings.append(emb)
            #filenames.extend(fnames)

    if embeddings:  # embeddingsが空でないことを確認
        embeddings = torch.cat(embeddings, 0)
        embeddings = normalize(embeddings)
        return embeddings, filenames
    else:
        raise RuntimeError("No embeddings generated. Please check your model and dataloader.")

model.eval()
embeddings, filenames = generate_embeddings(model, dataloader_test)
embeddings

array([[ 0.0012885 ,  0.00160888, -0.00455913, ...,  0.00072197,
        -0.00516354, -0.01171149],
       [ 0.00126025,  0.00179003, -0.00462212, ...,  0.00090455,
        -0.00522257, -0.01198926],
       [ 0.00126025,  0.00179003, -0.00462212, ...,  0.00090455,
        -0.00522257, -0.01198926],
       ...,
       [ 0.00126025,  0.00179003, -0.00462212, ...,  0.00090455,
        -0.00522257, -0.01198926],
       [ 0.00126025,  0.00179003, -0.00462212, ...,  0.00090455,
        -0.00522257, -0.01198926],
       [ 0.00126025,  0.00179003, -0.00462212, ...,  0.00090455,
        -0.00522257, -0.01198926]])

In [28]:
import pytorch_lightning as pl
import torch
from torch.nn import CrossEntropyLoss
from torch.utils.data import DataLoader

class SimCLR_Test(pl.LightningModule):
    def __init__(self, model, num_classes=TEST_NUM_CLASSES):
        super().__init__()
        self.model = model
        self.mlp = torch.nn.Linear(128, num_classes)  # SimCLRの出力次元に応じて調整
        self.loss_fn = CrossEntropyLoss()
        
        # テスト結果を保持するリストを初期化
        self.test_losses = []
        self.test_accuracies = []

    def forward(self, x):
        # 特徴抽出してからMLP層でクラス分類
        with torch.no_grad():
            features = self.model(x)
        logits = self.mlp(features)
        return logits

    def test_step(self, batch, batch_idx):
        x, y = batch
        z = self.forward(x)
        loss = self.loss_fn(z, y)

        predicted = z.argmax(1)
        acc = (predicted == y).float().mean()

        self.log('test_loss', loss, on_epoch=True, prog_bar=True)
        self.log('test_acc', acc, on_epoch=True, prog_bar=True)
        
        # 損失と精度をリストに保存
        self.test_losses.append(loss)
        self.test_accuracies.append(acc)
        
        return {"test_loss": loss, "test_acc": acc}     

    def on_test_epoch_end(self):
        # テストデータ全体の平均損失と精度を集計
        if self.test_losses and self.test_accuracies:
            avg_loss = torch.stack(self.test_losses).mean()
            avg_acc = torch.stack(self.test_accuracies).mean()
            self.log('avg_test_loss', avg_loss, prog_bar=True)
            self.log('avg_test_acc', avg_acc, prog_bar=True)
            print(f"Test Accuracy: {avg_acc:.4f}, Test Loss: {avg_loss:.4f}")
        else:
            print("No test data was processed")

        
# .pthファイルから事前学習済みモデルの重みをロード
def load_simclr_model(model_path):
    model = SimCLRModel()
    model.load_state_dict(torch.load(model_path))
    model.eval()  # 評価モードに設定
    return model

#simclr_model = load_simclr_model('./params/1112-myNet-transform-30ep-100.pth')
simclr_model = load_simclr_model('./params/1119-myNet-aug5-10ep.pth')

# テスト用の評価クラスを初期化
test_model = SimCLR_Test(model=simclr_model, num_classes=TEST_NUM_CLASSES)

# テストデータローダーの用意
#test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# PyTorch LightningのTrainerを使用してテストを実行
trainer = pl.Trainer(accelerator='cpu', devices=1)  # CPUのみを使用
trainer.test(test_model, dataloaders=dataloader_test)

  model.load_state_dict(torch.load(model_path))
GPU available: True (cuda), used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


Testing DataLoader 0: 100%|██████████| 24/24 [00:01<00:00, 18.14it/s]Test Accuracy: 0.0417, Test Loss: 3.7116
Testing DataLoader 0: 100%|██████████| 24/24 [00:01<00:00, 18.08it/s]
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
      avg_test_acc          0.0416666679084301
      avg_test_loss          3.711585283279419
        test_acc            0.04780876636505127
        test_loss           3.6389455795288086
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_loss': 3.6389455795288086,
  'test_acc': 0.04780876636505127,
  'avg_test_loss': 3.711585283279419,
  'avg_test_acc': 0.0416666679084301}]

In [21]:
import pytorch_lightning as pl
import torch
from torch.nn import CrossEntropyLoss
from torch.utils.data import DataLoader

class SimCLR_Test(pl.LightningModule):
    def __init__(self, model, num_classes=NUM_CLASSES):
        super().__init__()
        self.model = model
        self.mlp = torch.nn.Linear(128, num_classes)  # SimCLRの出力次元に応じて調整
        self.loss_fn = CrossEntropyLoss()
        
        # テスト結果を保持するリストを初期化
        self.test_losses = []
        self.test_accuracies = []

    def forward(self, x):
        # 特徴抽出してからMLP層でクラス分類
        with torch.no_grad():
            features = self.model(x)
        logits = self.mlp(features)
        return logits

    def test_step(self, batch, batch_idx):
        x, y = batch
        z = self.forward(x)
        loss = self.loss_fn(z, y)

        predicted = z.argmax(1)
        acc = (predicted == y).float().mean()

        self.log('test_loss', loss, on_epoch=True, prog_bar=True)
        self.log('test_acc', acc, on_epoch=True, prog_bar=True)
        
        # 損失と精度をリストに保存
        self.test_losses.append(loss)
        self.test_accuracies.append(acc)
        
        return {"test_loss": loss, "test_acc": acc}     

    def on_test_epoch_end(self):
        # テストデータ全体の平均損失と精度を集計
        if self.test_losses and self.test_accuracies:
            avg_loss = torch.stack(self.test_losses).mean()
            avg_acc = torch.stack(self.test_accuracies).mean()
            self.log('avg_test_loss', avg_loss, prog_bar=True)
            self.log('avg_test_acc', avg_acc, prog_bar=True)
            print(f"Test Accuracy: {avg_acc:.4f}, Test Loss: {avg_loss:.4f}")
        else:
            print("No test data was processed")

        
# .pthファイルから事前学習済みモデルの重みをロード
def load_simclr_model(model_path):
    model = SimCLRModel()
    model.load_state_dict(torch.load(model_path))
    model.eval()  # 評価モードに設定
    return model

#simclr_model = load_simclr_model('./params/1112-myNet-transform-30ep-100.pth')
simclr_model = load_simclr_model('./params/1112-myNet-aug5-10ep.pth')

# テスト用の評価クラスを初期化
test_model = SimCLR_Test(model=simclr_model, num_classes=NUM_CLASSES)

# PyTorch LightningのTrainerを使用してテストを実行
trainer = pl.Trainer(accelerator='cpu', devices=1)  # CPUのみを使用
trainer.test(test_model, dataloaders=dataloader_train)

  model.load_state_dict(torch.load(model_path))
GPU available: True (cuda), used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


Testing DataLoader 0: 100%|██████████| 6379/6379 [04:38<00:00, 22.88it/s]Test Accuracy: 0.0002, Test Loss: 9.7780
Testing DataLoader 0: 100%|██████████| 6379/6379 [04:38<00:00, 22.88it/s]
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
      avg_test_acc        0.00015676437760703266
      avg_test_loss          9.778013229370117
        test_acc          0.00010572066094027832
        test_loss            9.775991439819336
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_loss': 9.775991439819336,
  'test_acc': 0.00010572066094027832,
  'avg_test_loss': 9.778013229370117,
  'avg_test_acc': 0.00015676437760703266}]

In [24]:
import os
import glob

def delete_non_matching_files(base_dir, pattern='*-0.png'):
    """
    指定したディレクトリ以下のすべてのサブディレクトリ内で、
    名前に指定されたパターンが含まれないファイルを削除する。
    
    Parameters:
    - base_dir (str): 探索する基準となるディレクトリ
    - pattern (str): 残したいファイル名のパターン（デフォルトは '*-0.png'）
    """
    # 再帰的にすべてのファイルを探索
    all_files = glob.glob(os.path.join(base_dir, '**', '*.png'), recursive=True)
    
    for file_path in all_files:
        # ファイル名に '-0.png' が含まれていなければ削除
        if not file_path.endswith('-0.png'):
            try:
                os.remove(file_path)
                print(f"削除しました: {file_path}")
            except Exception as e:
                print(f"削除に失敗しました: {file_path}, エラー: {e}")

# 実行
base_dir = './HandwrittenTextAlign/PRMU/simclr/data/test1'
delete_non_matching_files(base_dir)

In [None]:
import os
import shutil

def process_files(src_dir, dest_dir):
    """
    src_dir: 元のディレクトリ
    dest_dir: 新しいディレクトリ
    """
    if not os.path.exists(dest_dir):
        os.makedirs(dest_dir)  # 新しいディレクトリを作成

    # 再帰的に src_dir を探索
    for root, dirs, files in os.walk(src_dir):
        for file in files:
            file_path = os.path.join(root, file)

            # .png ファイルを処理
            if file.endswith(".png"):
                # 元ディレクトリの相対パスを保持
                relative_path = os.path.relpath(root, src_dir)
                
                # 新しいサブディレクトリを作成
                subdir = os.path.join(dest_dir, relative_path)
                if not os.path.exists(subdir):
                    os.makedirs(subdir)
                
                # ファイル名から語尾（-0, -3 など）を取得
                suffix = file.split('-')[-1].split('.')[0]
                subsubdir = os.path.join(subdir, f'{subdir}-{suffix}')  # 新しい subsubdir を作成
                
                if not os.path.exists(subsubdir):
                    os.makedirs(subsubdir)
                
                # 新しいファイルパスを生成
                new_file_path = os.path.join(subsubdir, file)
                
                # ファイルをコピー
                shutil.copy(file_path, new_file_path)
                print(f"コピーしました: {file_path} -> {new_file_path}")

# 実行
src_dir = "./HandwrittenTextAlign/PRMU/simclr/data/test"  # 元のディレクトリ
dest_dir = "./HandwrittenTextAlign/PRMU/simclr/data/test2"  # 新しいディレクトリ
process_files(src_dir, dest_dir)
