In [None]:
!pip install evaluate requests ultralytics supervision

In [None]:
import os
import shutil

import torch
import numpy as np
import pandas as pd
import random
/
from transformers import (
    AutoImageProcessor, AutoModelForImageClassification,
    AutoModelForObjectDetection, ImageClassificationPipeline,
    TrainingArguments, Trainer, pipeline
)

from transformers.image_utils import load_image
import evaluate
from datasets import load_dataset

from matplotlib import pyplot as plt
import matplotlib.patches as patches
from PIL import Image

from requests import get
from huggingface_hub import hf_hub_download

from ultralytics import YOLO
from supervision import Detections

from datasets import Dataset, DatasetDict, load_metric
from sklearn.model_selection import train_test_split

# skimage
from skimage.io import imshow, imread, imsave
from skimage.transform import rotate, AffineTransform, warp,rescale, resize, downscale_local_mean
from skimage import color,data
from skimage.exposure import adjust_gamma
from skimage.util import random_noise

# Data Sorting

In [None]:
# Чтение данных из файла default.txt
file_path = "/kaggle/input/aminfins/default.txt"
with open(file_path, "r") as file:
    lines = file.readlines()

In [None]:
sorted_data = sorted(lines, key=lambda x: int(x.split()[-1]))

In [None]:
# Write the sorted paths to a text file
with open('sorted_paths.txt', 'w') as f:
    for path in sorted_data:
        f.write(path.replace('\n', ''))  # Remove the newline character
        f.write('\n')  # Add a newline after each path

In [None]:
# Define the output directory relative to the notebook's working directory
output_dir = '/kaggle/working/'

# Create directories 0, 1, 2 if they don't exist
for i in range(3):
    directory = os.path.join(output_dir, str(i))
    if not os.path.exists(directory):
        os.makedirs(directory)

# Move photos to respective folders
for item in sorted_data:
    photo_path, digit = item.split()
    filename = os.path.basename(photo_path.strip())
    digit = digit.strip()
    destination_folder = os.path.join(output_dir, digit)
    destination_path = os.path.join(destination_folder, filename)
    # Construct full source path assuming the photos are in "/kaggle/input/aminfins/All images"
    source_path = os.path.join("/kaggle/input/aminfins/All images", filename)
    shutil.copy(source_path, destination_path)

In [None]:
shutil.make_archive('/kaggle/working/0', 'zip', '/kaggle/working/0')
shutil.make_archive('/kaggle/working/1', 'zip', '/kaggle/working/1')
shutil.make_archive('/kaggle/working/2', 'zip', '/kaggle/working/2')

# Data Reading

In [None]:
# Функция загрузки изображения в формате Pillow
def load_image_(image_path, label):
    return Image.open('/kaggle/input/aminfins/gaz_sorted_images/' + label + image_path)

# Чтение данных из файла sorted_paths.txt
file_path = "/kaggle/input/aminfins/sorted_paths.txt"
with open(file_path, "r") as file:
    lines = file.readlines()

# Создание списков для хранения данных
image_paths = []
labels = []

# Разделение строк на пути к изображениям и метки
for line in lines:
    parts = line.split()
    image_paths.append(parts[0])
    labels.append(int(parts[1]))

In [None]:
# Загрузка изображений в формате Pillow для разных меток
neutral_images = [load_image_(path, label_mapping[label] + '/') for path, label in zip(image_paths, labels) if label == 0]
microsleep_images = [load_image_(path, label_mapping[label] + '/') for path, label in zip(image_paths, labels) if label == 1]
yawning_images = [load_image_(path, label_mapping[label] + '/') for path, label in zip(image_paths, labels) if label == 2]

# Data Augmentation

In [None]:
neutral_images_arr = [np.array(image) for image in neutral_images]
microsleep_images_arr = [np.array(image) for image in microsleep_images]
yawning_images_arr = [np.array(image) for image in yawning_images]

In [None]:
def apply_transformations_and_convert(arrays_list):
    transformed_pillow_images = []
    
    for array in arrays_list:
        # Convert array to Pillow image
        original_pillow_image = Image.fromarray(array.astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(original_pillow_image)
        
        # Horizontally flipped
        hflipped_array = np.fliplr(array)
        hflipped_pillow_image = Image.fromarray(hflipped_array.astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(hflipped_pillow_image)
        
        # Vertically flipped
        vflipped_array = np.flipud(array)
        vflipped_pillow_image = Image.fromarray(vflipped_array.astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(vflipped_pillow_image)
        
        # Clockwise rotation by 30 degrees
        rot_30_clockwise_array = rotate(array, angle=30)
        rot_30_clockwise_pillow_image = Image.fromarray((rot_30_clockwise_array * 255).astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(rot_30_clockwise_pillow_image)
        
        # Anticlockwise rotation by 30 degrees
        rot_30_anticlockwise_array = rotate(array, angle=-30)
        rot_30_anticlockwise_pillow_image = Image.fromarray((rot_30_anticlockwise_array * 255).astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(rot_30_anticlockwise_pillow_image)
        
        # Clockwise rotation by 45 degrees
        rot_45_clockwise_array = rotate(array, angle=45)
        rot_45_clockwise_pillow_image = Image.fromarray((rot_45_clockwise_array * 255).astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(rot_45_clockwise_pillow_image)
        
        # Anticlockwise rotation by 45 degrees
        rot_45_anticlockwise_array = rotate(array, angle=-45)
        rot_45_anticlockwise_pillow_image = Image.fromarray((rot_45_anticlockwise_array * 255).astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(rot_45_anticlockwise_pillow_image)
        
        # Clockwise rotation by 60 degrees
        rot_60_clockwise_array = rotate(array, angle=60)
        rot_60_clockwise_pillow_image = Image.fromarray((rot_60_clockwise_array * 255).astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(rot_60_clockwise_pillow_image)
        
        # Anticlockwise rotation by 60 degrees
        rot_60_anticlockwise_array = rotate(array, angle=-60)
        rot_60_anticlockwise_pillow_image = Image.fromarray((rot_60_anticlockwise_array * 255).astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(rot_60_anticlockwise_pillow_image)
        
        # Brightness adjusted array (gamma=0.5)
        array_bright = adjust_gamma(array, gamma=0.5, gain=1)
        bright_pillow_image = Image.fromarray(array_bright.astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(bright_pillow_image)
        
        # Darkened array (gamma=2)
        array_dark = adjust_gamma(array, gamma=2, gain=1)
        dark_pillow_image = Image.fromarray(array_dark.astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(dark_pillow_image)
        
        # Noisy array
        noisy_array = random_noise(array)
        noisy_pillow_image = Image.fromarray((noisy_array * 255).astype('uint8'))  # Convert to uint8
        transformed_pillow_images.append(noisy_pillow_image)
        
    return transformed_pillow_images

In [None]:
neutral_aug = apply_transformations_and_convert(neutral_images_arr)

In [None]:
# Define the output directory
output_directory = "/kaggle/working/neutral_aug"

# Create the output directory if it doesn't exist
os.makedirs(output_directory, exist_ok=True)

for i, image in enumerate(neutral_aug):
    image_path = os.path.join(output_directory, f"image_{i}.jpg")
    image.save(image_path)

In [None]:
# # Define the directory containing the images
# directory = "/kaggle/working/neutral_aug"

# # Iterate over each file in the directory
# for filename in os.listdir(directory):
#     # Construct the full file path
#     file_path = os.path.join(directory, filename)
    
#     # Check if the file is a regular file (not a directory)
#     if os.path.isfile(file_path):
#         # Remove the file
#         os.remove(file_path)

In [None]:
microsleep_aug = apply_transformations_and_convert(microsleep_images_arr)

In [None]:
# Define the output directory
output_directory = "/kaggle/working/microsleep_aug"

# Create the output directory if it doesn't exist
os.makedirs(output_directory, exist_ok=True)

for i, image in enumerate(microsleep_aug):
    image_path = os.path.join(output_directory, f"image_{i}.jpg")
    image.save(image_path)

In [None]:
yawning_aug = apply_transformations_and_convert(yawning_images_arr)

In [None]:
# Define the output directory
output_directory = "/kaggle/working/yawning_aug"

# Create the output directory if it doesn't exist
os.makedirs(output_directory, exist_ok=True)

for i, image in enumerate(yawning_aug):
    image_path = os.path.join(output_directory, f"image_{i}.jpg")
    image.save(image_path)

In [None]:
# Define the directories to be zipped
directories = ["/kaggle/working/neutral_aug", "/kaggle/working/microsleep_aug", "/kaggle/working/yawning_aug"]

# Zip each directory
for directory in directories:
    shutil.make_archive(directory, 'zip', directory)

# Aug Data Reading

In [None]:
# Словарь для преобразования числовых меток в строковые
label_mapping = {
    0: "neutral",
    1: "microsleep",
    2: "yawning",
}

In [None]:
# Directory containing the images
directory = '/kaggle/input/aminfins/gaz_aug_zip/neutral_aug/'

# Create a list to store PIL images
neutral_images_new = []

# Iterate over all files in the directory
for filename in os.listdir(directory):

        # Construct the full path to the image file
        file_path = os.path.join(directory, filename)
        # Open the image file as a PIL image and append it to the list
        pil_image = Image.open(file_path)
        neutral_images_new.append(pil_image)

In [None]:
# Directory containing the images
directory = '/kaggle/input/aminfins/gaz_aug_zip/microsleep_aug/'

# Create a list to store PIL images
microsleep_images_new = []

# Iterate over all files in the directory
for filename in os.listdir(directory):

        # Construct the full path to the image file
        file_path = os.path.join(directory, filename)
        # Open the image file as a PIL image and append it to the list
        pil_image = Image.open(file_path)
        microsleep_images_new.append(pil_image)

In [None]:
# Directory containing the images
directory = '/kaggle/input/aminfins/gaz_aug_zip/yawning_aug/'

# Create a list to store PIL images
yawning_images_new = []

# Iterate over all files in the directory
for filename in os.listdir(directory):

        # Construct the full path to the image file
        file_path = os.path.join(directory, filename)
        # Open the image file as a PIL image and append it to the list
        pil_image = Image.open(file_path)
        yawning_images_new.append(pil_image)

In [None]:
neutral_labels = [0] * len(neutral_images_new)
microsleep_labels = [1] * len(microsleep_images_new)
yawning_labels = [2] * len(yawning_images_new)

In [None]:
all_images = neutral_images_new + microsleep_images_new + yawning_images_new
all_labels = neutral_labels + microsleep_labels + yawning_labels

In [None]:
# Разделение данных на тренировочную и тестовую выборку
train_images, test_images, train_labels, test_labels = train_test_split(
    all_images, all_labels, test_size=0.15, random_state=0)

# Создание объектов Dataset для тренировочной и тестовой выборок
train_dataset = Dataset.from_dict({"image": train_images, "label": train_labels})
test_dataset = Dataset.from_dict({"image": test_images, "label": test_labels})

# Создание объекта DatasetDict
dataset = DatasetDict({"train": train_dataset, "test": test_dataset})

# Вывод информации о DatasetDict
print(dataset)

In [None]:
metric = load_metric("accuracy")

In [None]:
example = dataset["train"][122]
example['image']

In [None]:
id2label = label_mapping
label2id = {v: k for k, v in id2label.items()}
id2label, label2id

In [None]:
model_checkpoint = "microsoft/resnet-50"
batch_size = 8
image_processor  = AutoImageProcessor.from_pretrained(model_checkpoint)

In [None]:
from torchvision.transforms import (
    CenterCrop,
    Compose,
    Normalize,
    RandomHorizontalFlip,
    RandomResizedCrop,
    Resize,
    ToTensor,
)

normalize = Normalize(mean=image_processor.image_mean, std=image_processor.image_std)
if "height" in image_processor.size:
    size = (image_processor.size["height"], image_processor.size["width"])
    crop_size = size
    max_size = None
elif "shortest_edge" in image_processor.size:
    size = image_processor.size["shortest_edge"]
    crop_size = (size, size)
    max_size = image_processor.size.get("longest_edge")

train_transforms = Compose(
        [
            RandomResizedCrop(crop_size),
            RandomHorizontalFlip(),
            ToTensor(),
            normalize,
        ]
    )

val_transforms = Compose(
        [
            Resize(size),
            CenterCrop(crop_size),
            ToTensor(),
            normalize,
        ]
    )

def preprocess_train(example_batch):
    """Apply train_transforms across a batch."""
    example_batch["pixel_values"] = [
        train_transforms(image.convert("RGB")) for image in example_batch["image"]
    ]
    return example_batch

def preprocess_val(example_batch):
    """Apply val_transforms across a batch."""
    example_batch["pixel_values"] = [val_transforms(image.convert("RGB")) for image in example_batch["image"]]
    return example_batch

In [None]:
train_ds = dataset['train']
val_ds = dataset['test']

train_ds.set_transform(preprocess_train)
val_ds.set_transform(preprocess_val)

In [None]:
model = AutoModelForImageClassification.from_pretrained(
    model_checkpoint, 
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes = True,
)

In [None]:
# the compute_metrics function takes a Named Tuple as input:
# predictions, which are the logits of the model as Numpy arrays,
# and label_ids, which are the ground-truth labels as Numpy arrays.
def compute_metrics(eval_pred):
    """Computes accuracy on a batch of predictions"""
    predictions = np.argmax(eval_pred.predictions, axis=1)
    return metric.compute(predictions=predictions, references=eval_pred.label_ids)

def collate_fn(examples):
    pixel_values = torch.stack([example["pixel_values"] for example in examples])
    labels = torch.tensor([example["label"] for example in examples])
    return {"pixel_values": pixel_values, "labels": labels}

In [None]:
args = TrainingArguments(
    "train_image_kabyte",
    remove_unused_columns=False,
    evaluation_strategy = "epoch",
    save_strategy = "epoch",
    num_train_epochs=5,
    save_steps=200
)

In [None]:
trainer = Trainer(
    model,
    args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    tokenizer=image_processor,
    compute_metrics=compute_metrics,
    data_collator=collate_fn,
)

In [None]:
train_results = trainer.train()
# rest is optional but nice to have
trainer.save_model()
trainer.log_metrics("train", train_results.metrics)
trainer.save_metrics("train", train_results.metrics)
trainer.save_state()

In [None]:
j = random.randint(0,400)
image = dataset['test'][j]['image']
print(id2label[dataset['test'][j]['label']])
image

In [None]:
classifier = pipeline("image-classification", model='/kaggle/working/train_image_kabyte/checkpoint-1555')
print(classifier(image)[0])