In [None]:
import pandas as pd
import numpy as np
import os

import matplotlib.pyplot as plt
import seaborn as sns

from PIL import Image

import albumentations as A
from albumentations.pytorch import ToTensorV2

from torch.utils.data import Dataset, DataLoader, Sampler
import torch
import torch.nn as nn
import torch.nn.functional as F

import torchvision.models as models
from sklearn.model_selection import train_test_split

import cv2

from tqdm.auto import tqdm

In [None]:
# fix seeds
DEFAULT_RANDOM_SEED = 42
import random

def set_all_seeds(seed=DEFAULT_RANDOM_SEED):

    # python's seeds
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)

    # torch's seeds
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_all_seeds(seed=DEFAULT_RANDOM_SEED)

In [None]:
directory_path = '/kaggle/input/kylberg-texture-dataset'

# Получаем список всех папок в директории
folder_names = [name for name in os.listdir(directory_path) if os.path.isdir(os.path.join(directory_path, name))]
# создание классов
label2id = {} 
for ind, name in enumerate(folder_names):
    label2id[name] = ind
id2label = {v:k for k, v in label2id.items()} 

In [None]:
from pathlib import Path

root_path = Path(directory_path)

data = {'class_name': [], 'image_name': []}

for class_folder in tqdm(root_path.iterdir()):
    if class_folder.is_dir():  # Проверяем, что это директория
        for image_file in class_folder.iterdir():
            if image_file.is_file():  # Проверяем, что это файл
                data['class_name'].append(class_folder.name)
                data['image_name'].append(image_file.name)

# Создаем датафрейм названий и классов изображений
df = pd.DataFrame(data)
df['class'] = df['class_name'].map(label2id)

# делим на train - test выборки в равном соотношении
train, test = train_test_split(df, random_state=42, shuffle=True, stratify=df['class'])
train = train.reset_index(drop=True)
test = test.reset_index(drop=True)
y_test = test['class'].values

In [None]:
os.path.join(train.iloc[0]['class_name'], train.iloc[0]['image_name'])

In [None]:
class KylbergDataset(Dataset):
    def __init__(self, df: pd.DataFrame, folder_path: str, transform=None):
        self.df = df
        self.transform = transform
        self.folder_path = folder_path
        
    def __len__(self):
        return self.df.shape[0]

    def __getitem__(self, idx):
        
        path_to_image = os.path.join(self.folder_path, self.df.iloc[idx]['class_name'], self.df.iloc[idx]['image_name'])
        image = Image.open(path_to_image).convert('L')
        image_np = np.array(image)
        
#         fragments = cut_fragments(image=image_np, mode=self.mode, n=self.n, size=self.size)
#         fragment = fragments[fragment_idx]
        
        # нормализуем фрагмент
#         max_value = np.max(image_np)
        image_np = image_np / 255.0
        
        if self.transform:
            image_np = self.transform(image=image_np)['image']
        
        image_t = torch.tensor(image_np, dtype=torch.float32)
        label = self.df.iloc[idx]['class']

        return image_t, label

In [None]:
class SaltAndPepper(A.ImageOnlyTransform):
    def __init__(self, p=1., salt_ratio=0.5, amount=0.0008, always_apply=True):
        super().__init__(always_apply, p)
        self.salt_ratio = salt_ratio
        self.amount = amount

    def apply(self, image, **params):
        image_copy = np.copy(image)  # создание копии изображения

        num_salt = np.ceil(self.amount * image.size * self.salt_ratio)
        coords_salt = [np.random.randint(0, i - 1, int(num_salt)) for i in image_copy.shape]
        image_copy[coords_salt[0], coords_salt[1]] = 1

        num_pepper = np.ceil(self.amount * image.size * (1.0 - self.salt_ratio))
        coords_pepper = [np.random.randint(0, i - 1, int(num_pepper)) for i in image_copy.shape]
        image_copy[coords_pepper[0], coords_pepper[1]] = 0

        return image_copy

In [None]:
train_transform = A.Compose([
    A.HorizontalFlip(p=.3),
#     A.RandomBrightnessContrast(p=1, contrast_limit=(.2), brightness_by_max=True, brightness_limit=(.2)),
    A.Rotate(limit=30, p=.3),
#     A.GaussianBlur(p=1, blur_limit=(1,3)),
#     A.CoarseDropout(max_holes=6, p=1., fill_value=200, max_height=3, max_width=3),
#     A.CoarseDropout(max_holes=6, p=1., fill_value=0, max_height=3, max_width=3),
    A.GaussNoise(var_limit=(10.0), p=1), #белый шум
#     SaltAndPepper(salt_ratio=0.4),
#     A.ElasticTransform(alpha=2, sigma=20, alpha_affine=10, p=.4),
])

# test_transform = A.Compose([
    
    
# ])

In [None]:
train_dataset = KylbergDataset(train, directory_path, train_transform)
test_dataset = KylbergDataset(test, directory_path)

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2, drop_last=True)

In [None]:
NUM_CLASSES = len(label2id)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

mobilenet_v2_model = models.mobilenet_v2(pretrained=True)
mobilenet_v2_model.features[0][0] = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
mobilenet_v2_model.classifier[1] = nn.Linear(mobilenet_v2_model.last_channel, NUM_CLASSES)
mobilenet_v2_model = mobilenet_v2_model.to(device)

if torch.cuda.device_count() > 1:
    mobilenet_v2_model = torch.nn.DataParallel(mobilenet_v2_model)
    
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(mobilenet_v2_model.parameters())

In [None]:
# criterion = nn.CrossEntropyLoss()
# criterion = FocalLoss(alpha=.8)

# optimizer = torch.optim.AdamW(mobilenet_v2_model.parameters())
# exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.6)

# milestones = [12, 15, 26]
# gamma = 0.3
# exp_lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones, gamma=gamma)

In [None]:
train_loss = []
train_acc = []

test_loss = []
test_acc = []

def train_and_validate(epoch, model):
                                                ### train
    print(f'EPOCH: {epoch + 1}')
    
    running_loss = 0.0
    running_acc = 0.0
    model.train()
    for batch_idx, (data, target) in tqdm(enumerate(train_dataloader)):
        target = target.type(torch.LongTensor).to(device)
        data = data.unsqueeze(1)
        data, target = data.to(device).float(), target.to(device)
        optimizer.zero_grad()
        outputs = model(data)
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        preds = outputs.argmax(dim=1)
        running_acc += (preds == target).float().mean().item()
        
#         if batch_idx % 200 == 0:
#             random_img = data.cpu().numpy()[np.random.randint(data.size(0))][0]
#             plt.imshow(random_img, cmap='gray')
#             plt.title("Random Image from Batch")
#             plt.axis('off')
#             plt.show()
    
    train_loss.append(running_loss / len(train_dataloader))
    train_acc.append(running_acc / len(train_dataloader))
    
    print(f"Epoch {epoch+1}, Train Loss: {train_loss[-1]:.3f}, Train Acc: {train_acc[-1]:.3f}")
#     exp_lr_scheduler.step()
    
                                                ### validate

    model.eval()
    all_preds = [] 

    with torch.no_grad():
        eval_acc = 0.0
        eval_loss = 0.0
        for batch_idx, (data, target) in enumerate(test_dataloader):
            target = target.type(torch.LongTensor).to(device)
            data = data.unsqueeze(1)
            data, target = data.to(device).float(), target.to(device)

            outputs = model(data)
            loss = criterion(outputs, target)
            eval_loss += loss.item()
            preds = outputs.argmax(dim=1)
            eval_acc += (preds == target).float().mean().item()

            all_preds.extend(preds.cpu().numpy())
            
    
    test_loss.append(eval_loss / len(test_dataloader))
    test_acc.append(eval_acc / len(test_dataloader))
    
    print(f"Epoch {epoch+1}, Test Loss: {test_loss[-1]:.3f}, Test Acc: {test_acc[-1]:.3f}")
    print()
#     cm = confusion_matrix(all_targets, all_preds)
#     disp = ConfusionMatrixDisplay(confusion_matrix=cm)
#     disp.plot()
#     plt.show()
    
#     wandb.log({
#         "train_acc": train_acc[-1],
#         "train_loss": train_loss[-1],
#         "train_full": train_full[-1] / 100,
#         "valid_acc": test_acc[-1],
#         "valid_loss": test_loss[-1],
#         "valid_full": test_full[-1] / 100
#     })
    
    return train_loss, train_acc, test_loss, test_acc

In [None]:
best_loss = float('inf')
epochs_without_improvement = 0
early_stopping_threshold = 5

for epoch in range(10):
    train_loss, train_acc, test_loss, test_acc = train_and_validate(epoch, mobilenet_v2_model)
    
    if test_loss[-1] < best_loss:
        best_loss = test_loss[-1]
        epochs_without_improvement = 0
    else:
        epochs_without_improvement += 1
        
    if epochs_without_improvement >= early_stopping_threshold:
        print("Early stopping triggered after {} epochs without improvement.".format(epochs_without_improvement))
        break

# save and load model with huggingface repo

In [None]:
torch.save(mobilenet_v2_model.state_dict(), 'mobilenet-v2-textures.pth')

In [None]:
from huggingface_hub import upload_file, HfFolder, notebook_login, hf_hub_download

notebook_login()

# Assuming you've stored your Hugging Face token as a Kaggle secret
# from kaggle_secrets import UserSecretsClient
# user_secrets = UserSecretsClient()
# hf_token = user_secrets.get_secret("huggingface_token")

repo_name = "danzzzll/mobilenet-v2-textures"  # Replace with your Hugging Face Hub repo
model_file_path = '/kaggle/working/mobilenet-v2-textures.pth'

# Upload the model file
upload_file(
    path_or_fileobj=model_file_path,
    path_in_repo="mobilenet-v2-textures.pth",  # Path where the file will be stored in your repo
    repo_id=repo_name,
#     token=hf_token,
    repo_type="model"
)

print(f"Model successfully uploaded to: https://huggingface.co/{repo_name}")

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

from huggingface_hub import upload_file, HfFolder, notebook_login, hf_hub_download
from collections import OrderedDict


NUM_CLASSES = len(class_dict)

repo_id = 'danzzzll/mobilenet-v2-textures'
filename = 'mobilenet-v2-textures.pth'

def loading_weights(repo_id, filename):
    weights_path = hf_hub_download(repo_id=repo_id, filename=filename)
    
    model = models.mobilenet_v2(weights=None)
    model.features[0][0] = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    model.classifier[1] = nn.Linear(model.last_channel, 28)

    state_dict = torch.load(weights_path, map_location='cpu')
    
    new_state_dict = OrderedDict()
    for k, v in state_dict.items():
        name = k[7:] if k.startswith('module.') else k  # remove `module.` prefix if exists
        new_state_dict[name] = v

    model.load_state_dict(new_state_dict)
    model.classifier[1] = nn.Linear(model.last_channel, NUM_CLASSES)

    if torch.cuda.device_count() > 1:
        model = torch.nn.DataParallel(model)
    
    model = model.to(device)
    return model

model = loading_weights(repo_id, filename)

In [None]:
def evaluation(model):
    model.eval()
    with torch.no_grad():
        running_acc = 0.0
        running_loss = 0.0
        for batch_idx, (data, target) in enumerate(test_dataloader):
            target = target.type(torch.LongTensor).to(device)
            data = data.unsqueeze(1)
            data, target = data.to(device).float(), target.to(device)

            outputs = model(data)
            loss = criterion(outputs, target)
            running_loss += loss.item()
            preds = outputs.argmax(dim=1)
            running_acc += (preds == target).float().mean().item()

    return running_acc / len(test_dataloader)

evaluation(new_model)

In [None]:
def inference(model, image, id2label):
    model.eval()
    image = image.unsqueeze(0).unsqueeze(0)
    print(image.shape)
    outputs = model(image)
    preds = outputs.argmax()
    clas = preds.item()
    
    return id2label.get(clas)