## Библиотеки

In [None]:
import numpy as np
import pandas as pd
import os
import io
import requests
from PIL import Image
from tqdm import tqdm
import asyncio
import aiohttp
from aiohttp import ClientSession
from tqdm.asyncio import tqdm as tqdm_async
import os
from typing import List, Optional
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

## Функция визуализации

In [None]:
def show_image_grid(
    image_names: List[str],
    dir_list: List[str],
    row_labels: List[str],
    figsize_scale: float = 3.0,
    top_title: Optional[str] = None,
    col_labels: Optional[List[str]] = None,
):
    """
    Строки  = директории (dir_list)
    Столбцы = имена файлов (image_names)

    row_labels: подписи к строкам (той же длины, что и dir_list)
    col_labels: подписи к столбцам (той же длины, что и image_names), опционально
    top_title: общий заголовок сверху (опционально).
    """
    assert len(dir_list) == len(row_labels), "dir_list и row_labels должны быть одинаковой длины"
    if col_labels is not None:
        assert len(col_labels) == len(image_names), "col_labels и image_names должны быть одинаковой длины"

    n_rows = len(dir_list)
    n_cols = len(image_names)

    figsize = (figsize_scale * n_cols, figsize_scale * n_rows)
    fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize, squeeze=False)

    for i, (directory, row_label) in enumerate(zip(dir_list, row_labels)):
        for j, img_name in enumerate(image_names):
            ax = axes[i, j]
            img_path = os.path.join(directory, img_name)

            if os.path.exists(img_path):
                try:
                    img = Image.open(img_path).convert("RGB")
                    ax.imshow(img)
                except Exception as e:
                    ax.text(
                        0.5, 0.5,
                        f"Error\n{type(e).__name__}",
                        ha="center", va="center", fontsize=8,
                    )
            else:
                ax.text(
                    0.5, 0.5,
                    "Not found",
                    ha="center", va="center", fontsize=8,
                )

            ax.axis("off")

            # Заголовок над столбцом (только в первой строке)
            if i == 0:
                title = col_labels[j] if col_labels is not None else img_name
                ax.set_title(title, fontsize=9)

        # Подпись к строке слева
        axes[i, 0].set_ylabel(
            row_label,
            rotation=0,
            labelpad=40,
            va="center",
            fontsize=10,
        )

    if top_title is not None:
        fig.suptitle(top_title, fontsize=14)
        plt.tight_layout(rect=[0, 0, 1, 0.96])
    else:
        plt.tight_layout()

    plt.show()
    return fig, axes


## Усстановка датасета

In [None]:
def get_filename_from_url(url: str) -> str:
    """Извлечь имя файла из URL"""
    return url.rstrip("/").split("/")[-1]


async def download_one_async(
    url: str,
    download_dir: str,
    session: ClientSession,
    overwrite: bool = False,
    timeout: int = 10,
):
    fname = get_filename_from_url(url)
    dst_path = os.path.join(download_dir, fname)

    if not overwrite and os.path.exists(dst_path):
        return url, "exists"

    try:
        async with session.get(url, timeout=timeout) as resp:
            resp.raise_for_status()
            content = await resp.read()
            with open(dst_path, "wb") as f:
                f.write(content)
        return url, "ok"
    except Exception as e:
        return url, f"error: {e}"


async def download_all_images_async(
    df: pd.DataFrame,
    download_dir: str,
    concurrency: int = 8,   # сколько одновременных запросов
    overwrite: bool = False,
):
    os.makedirs(download_dir, exist_ok=True)

    urls = list(df['downloadUrl'])

    connector = aiohttp.TCPConnector(limit=concurrency)  # глобальный лимит соединений

    async with aiohttp.ClientSession(connector=connector) as session:
        sem = asyncio.Semaphore(concurrency)  # лимит одновременно активных задач

        async def sem_task(url):
            async with sem:
                return await download_one_async(url, download_dir, session, overwrite)

        tasks = [sem_task(url) for url in urls]

        # tqdm для async
        results = []
        for coro in tqdm_async.as_completed(tasks, total=len(tasks), desc="Downloading"):
            res = await coro
            results.append(res)
            url, status = res
            if status.startswith("error"):
                print(f"{url} -> {status}")

    return results

In [None]:
test_df = pd.read_csv('hw_3_markup_data.txt', sep="\t", header=0).iloc[:-1]
train_df = pd.read_csv('hw_3_no_markup_data.txt', sep="\t", header=0)
train_df.head()

In [None]:
results = await download_all_images_async(test_df, "test_images", concurrency=32)

In [None]:
results = await download_all_images_async(train_df, "train_images", concurrency=64)

##  Upscaling изображений

Настроим все для дальнейшего исследования методов upscaling

Будем визуально сравнивать на первых 5 изображениях

In [None]:
image_names = [get_filename_from_url(url) for url in test_df['downloadUrl'][:5]]
col_labels = list(test_df['is_conifer'][:5])

dir_list = [
    "test_images"
]

row_labels = [
    "original test"
]

show_image_grid(
    image_names=image_names,
    dir_list=dir_list,
    row_labels=row_labels,
    col_labels=col_labels,
    figsize_scale=3.0,
    top_title="Сравнение разных методов апскейлинга",
);

## Бейзлайны

Специфика задачи - хвойные деревья и отличие х

1. Bilinear upscaling
2. Lanczos upscaling

In [None]:

def upscale_image(img: Image.Image, scale: int, method: str) -> Image.Image:
    """Апскейл изображения ×scale методом bilinear или lanczos."""
    w, h = img.size
    new_size = (w * scale, h * scale)

    if method == "bilinear":
        resample = Image.BILINEAR
    elif method == "lanczos":
        resample = Image.LANCZOS
    else:
        raise ValueError(f"Unknown method: {method}")

    return img.resize(new_size, resample=resample)


def upscale_directory(
    input_dir: str,
    output_dir: str,
    scale: int = 2,
    methods=("bilinear", "lanczos"),
):
    """
    Проходит по всем изображениям в input_dir и сохраняет апскейленные версии
    в output_dir/<method>/.
    """
    input_dir = os.path.abspath(input_dir)
    output_dir = os.path.abspath(output_dir)

    # создаём поддиректории для каждого метода
    method_dirs = {}
    for m in methods:
        m_dir = os.path.join(output_dir, m)
        os.makedirs(m_dir, exist_ok=True)
        method_dirs[m] = m_dir

    # список файлов-изображений
    filenames = [
        f for f in os.listdir(input_dir)
        if f.lower().endswith((".jpg", ".jpeg", ".png"))
    ]

    for fname in tqdm(filenames, desc=f"Upscaling {os.path.basename(input_dir)}"):
        in_path = os.path.join(input_dir, fname)

        try:
            img = Image.open(in_path).convert("RGB")
        except Exception as e:
            print(f"Failed to open {in_path}: {e}")
            continue

        for m in methods:
            try:
                img_up = upscale_image(img, scale=scale, method=m)
                out_path = os.path.join(method_dirs[m], fname)
                img_up.save(out_path, quality=95)
            except Exception as e:
                print(f"Failed to upscale ({m}) {in_path}: {e}")


In [None]:
upscale_directory(
    input_dir="train_images",
    output_dir="train_upscaled",
    scale=2,
    methods=("bilinear", "lanczos"),
)

# апскейлим test
upscale_directory(
    input_dir="test_images",
    output_dir="test_upscaled",
    scale=2,
    methods=("bilinear", "lanczos"),
)

In [None]:
dir_list = [
    "test_images",
    "test_upscaled/bilinear",
    "test_upscaled/lanczos",
]

row_labels = [
    "original test",
    "bilinear",
    "lanczos"
]

show_image_grid(
    image_names=image_names,
    dir_list=dir_list,
    row_labels=row_labels,
    col_labels=col_labels,
    figsize_scale=3.0,
    top_title="Сравнение разных методов апскейлинга",
);

### Использование SR - модели

In [None]:
!pip install py-real-esrgan
!pip install huggingface-hub==0.25.2

In [None]:
import torch
from py_real_esrgan.model import RealESRGAN


def create_realesrgan_x2plus_model(
    weights_path: str | None = None,
    device: str | None = None,
) -> RealESRGAN:
    """
    Создать модель RealESRGAN_x2plus (scale=2) из py-real-esrgan.

    weights_path:
        - если None: веса будут автоматически скачаны по имени 'RealESRGAN_x2plus';
        - если путь к .pth: веса будут загружены из файла.
    """
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"

    model = RealESRGAN(device, scale=2)

    if weights_path is None:
        # автоматическая загрузка весов по имени модели
        model.load_weights("RealESRGAN_x2plus", download=True)
    else:
        # загрузка локального файла с весами
        model.load_weights(weights_path, download=False)

    return model


In [None]:
def upscale_image_realesrgan_x2plus_py(
    img: Image.Image,
    model: RealESRGAN,
) -> Image.Image:
    """
    Апскейл одной PIL-картинки через RealESRGAN_x2plus (py-real-esrgan).
    """
    # model.predict ожидает PIL.Image и возвращает PIL.Image
    sr_img = model.predict(img)
    return sr_img

def upscale_directory_realesrgan_x2plus_py(
    input_dir: str,
    output_dir: str,
    model: RealESRGAN,
    method_name: str = "realesrgan_x2plus",
):
    """
    Апскейлит все изображения из input_dir через RealESRGAN_x2plus (py-real-esrgan)
    и сохраняет в output_dir/<method_name>/.
    """
    input_dir = os.path.abspath(input_dir)
    out_method_dir = os.path.join(os.path.abspath(output_dir), method_name)
    os.makedirs(out_method_dir, exist_ok=True)

    filenames = [
        f for f in os.listdir(input_dir)
        if f.lower().endswith((".jpg", ".jpeg", ".png"))
    ]

    for fname in tqdm(filenames, desc=f"RealESRGAN x2 (py): {os.path.basename(input_dir)}"):
        in_path = os.path.join(input_dir, fname)

        try:
            img = Image.open(in_path).convert("RGB")
        except Exception as e:
            print(f"Failed to open {in_path}: {e}")
            continue

        try:
            sr_img = upscale_image_realesrgan_x2plus_py(img, model)
            out_path = os.path.join(out_method_dir, fname)
            sr_img.save(out_path, quality=95)
        except Exception as e:
            print(f"Failed to upscale (RealESRGAN_x2plus, py) {in_path}: {e}")

In [None]:
from pathlib import Path

def count_files_pathlib(directory_path):
    path = Path(directory_path)
    # Use sum with a generator expression to count files
    count = sum(1 for entry in path.iterdir() if entry.is_file())
    return count

# Example usage:
directory_to_check = "train_images" # Checks the current directory
num_files = count_files_pathlib(directory_to_check)
print(f"Number of files: {num_files}")


In [None]:
# 1. Создаём модель
# Если веса нужно скачать автоматически:
realesrgan_model = create_realesrgan_x2plus_model(
    weights_path='RealESRGAN_x2plus.pth',  # или путь к локальному .pth, если уже скачали
)

# 2. Апскейлим train
upscale_directory_realesrgan_x2plus_py(
    input_dir="train_images",
    output_dir="train_upscaled",
    model=realesrgan_model,
    method_name="realesrgan_x2plus",
)

# 3. Апскейлим test
upscale_directory_realesrgan_x2plus_py(
    input_dir="test_images",
    output_dir="test_upscaled",
    model=realesrgan_model,
    method_name="realesrgan_x2plus",
)

In [None]:
dir_list = [
    "test_images",
    "test_upscaled/bilinear",
    "test_upscaled/lanczos",
    "test_upscaled/realesrgan_x2plus",
]

row_labels = [
    "original test",
    "bilinear",
    "lanczos",
    "realesrgan"
]

show_image_grid(
    image_names=image_names,
    dir_list=dir_list,
    row_labels=row_labels,
    col_labels=col_labels,
    figsize_scale=3.0,
    top_title="Сравнение разных методов апскейлинга",
);

## Сохраняем результат

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from pathlib import Path

def count_files_pathlib(directory_path):
    path = Path(directory_path)
    # Use sum with a generator expression to count files
    count = sum(1 for entry in path.iterdir() if entry.is_file())
    return count

# Example usage:
directory_to_check = "./drive/MyDrive/crowd_sourcing_yasda/train_upscaled/realesrgan_x2plus" # Checks the current directory
num_files = count_files_pathlib(directory_to_check)
print(f"Number of files: {num_files}")


In [None]:
! cp -r ./test_upscaled ./drive/MyDrive/crowd_sourcing_yasda/test_upscaled

In [None]:
! cp -r ./train_upscaled ./drive/MyDrive/crowd_sourcing_yasda/train_upscaled