In [None]:
import os
import gzip
import shutil
import random
import asyncio
import aiohttp
import pycountry

import numpy as np
import pandas as pd
from functools import lru_cache
from PIL import Image, UnidentifiedImageError

import nest_asyncio
from io import BytesIO
from tqdm.asyncio import tqdm_asyncio
from urllib.parse import quote, urlsplit, urlunsplit

from torch.utils.data import Subset
from torchvision.datasets import ImageFolder
from sklearn.model_selection import train_test_split

Загружаем снова датасет

In [None]:
file_path = 'links.tsv.gz'
with gzip.open(file_path, 'rt', encoding='utf-8') as file:
    df = pd.read_csv(file, sep='\t')

Выполним снова предобработку для сохранения датасета в формате картинок а не ссылок

In [None]:
# начнем с умного подхода: поищем совпадения в библиотеке pycountry
@lru_cache(maxsize=None)
def map_to_country(name):
    try:
        country = pycountry.countries.search_fuzzy(name)[0]
        return country.name
    except LookupError:
        return None

df['country'] = df['nationality'].apply(map_to_country)
manual_mapping = {
    'united_kingdom': 'United Kingdom',
    'kingdom_of_the_netherlands': 'Netherlands',
    'united_states_of_america': 'United States',
    'new_zealand': 'New Zealand',
    'socialist_federal_republic_of_yugoslavia': 'Yugoslavia',
    'czech_republic': 'Czech Republic',
    'south_africa': 'South Africa',
    "people's_republic_of_china": 'China',
    'south_korea': 'Korea', ###
    'soviet_union': 'Russian Federation',
    'republic_of_china_(1912–1949)': 'China',
    'turkey': 'Turkey',
    'bosnia_and_herzegovina': 'Bosnia and Herzegovina',
    'democratic_republic_of_the_congo': 'Democratic Republic of the Congo',
    'western_sahara': 'Western Sahara',
    'republic_of_macedonia': 'North Macedonia',
    'yugoslavia': 'Yugoslavia',
    'trinidad_and_tobago': 'Trinidad and Tobago',
    'ivory_coast': 'Côte d\'Ivoire',
    'greenland':'Canada',
    'puerto_rico': 'Puerto Rico',
    'northern_ireland': 'United Kingdom',
    'saudi_arabia': 'Saudi Arabia',
    'republic_of_the_congo': 'Congo',
    'palau':'Philippines', ###
    'montserrat':'Cuba', ###
    'north_korea': 'Korea', ###
    'antigua_and_barbuda': 'Antigua and Barbuda',
    'liechtenstein':'Israel', ###
    'czechoslovakia': 'Czech Republic',
    'sri_lanka': 'Sri Lanka',
    'san_marino': 'San Marino',
    'dominican_republic': 'Dominican Republic',
    'state_of_palestine': 'Palestine, State of',
    'burkina_faso': 'Burkina Faso',
    'federated_states_of_micronesia': 'Micronesia, Federated States of',
    'second_polish_republic': 'Poland',
    'costa_rica': 'Costa Rica',
    'british_raj': 'India',
    'german_democratic_republic': 'Germany',
    'united_arab_emirates': 'United Arab Emirates',
    'faroe_islands': 'United kingdom', ###
    'saint_kitts_and_nevis': 'Saint Kitts and Nevis',
    'hong_kong': 'China', ###
    "people's_republic_of_poland": 'Poland',
    'serbia_and_montenegro': 'Serbia',
    'nazi_germany': 'Germany',
    'el_salvador': 'El Salvador',
    'central_african_republic': 'Central African Republic',
    'kingdom_of_yugoslavia': 'Yugoslavia',
    'weimar_republic': 'Germany',
    'kingdom_of_denmark': 'Denmark',
    'saint_vincent_and_the_grenadines': 'Saint Vincent and the Grenadines',
    'papua_new_guinea': 'Papua New Guinea',
    'cape_verde': 'Cabo Verde',
    'palestinian_national_authority': 'Palestine, State of',
    'empire_of_japan': 'Japan',
    'sierra_leone': 'Sierra Leone',
    'east_timor': 'Indonesia', ###
    'russian_soviet_federative_socialist_republic': 'Russian Federation',
    'kingdom_of_serbs,_croatians_and_slovenes': 'Yugoslavia',
    'tibet': 'China',
    'mandatory_palestine': 'Palestine, State of',
    'kingdom_of_italy': 'Italy',
    'turkish_republic_of_northern_cyprus': 'Cyprus',
    'kingdom_of_romania': 'Romania',
    'guernsey':'United Kingdom', ###
    'equatorial_guinea': 'Equatorial Guinea',
    'transnistria': 'Romania',
    'great_britain': 'United Kingdom',
    'kingdom_of_iraq': 'Iraq',
    'south_sudan': 'Sudan',
    'ukrainian_soviet_socialist_republic': 'Ukraine',
    'são_tomé_and_príncipe': 'São Tomé and Príncipe',
    'artsakh': 'Azerbaijan',
    'federal_republic_of_yugoslavia': 'Yugoslavia',
    'armenian_soviet_socialist_republic': 'Armenia',
    'kingdom_of_egypt': 'Egypt',
    'francoist_spain': 'Spain',
    'protectorate_of_bohemia_and_moravia': 'Czech Republic',
    'west_germany': 'Germany',
    'solomon_islands': 'Solomon Islands',
    'saint_lucia': 'Saint Lucia',
    'colonial_nigeria': 'Nigeria',
    'kingdom_of_hungary': 'Hungary',
    "people's_republic_of_hungary": 'Hungary',
    'south_vietnam': 'Vietnam',
    'isle_of_man': 'United Kingdom', ###
    'manchukuo': 'China',
    'laos': 'Laos',
    'czechoslovak_socialist_republic': 'Czech Republic',
    'british_hong_kong': 'China',
    'slovak_state_(1939-1945)': 'Slovakia',
    'kingdom_of_bulgaria': 'Bulgaria',
    "people's_republic_of_bulgaria": 'Bulgaria',
    'sultanate_of_zanzibar': 'Tanzania, United Republic of', ###
    'dutch_east_indies': 'Indonesia',
    'french_algeria': 'Algeria',
    'marshall_islands': 'Marshall Islands',
    'byelorussian_soviet_socialist_republic': 'Belarus',
    'japanese_people': 'Japan',
    'welsh_people': 'United Kingdom',
    'british_people': 'United Kingdom',
    'rhodesia': 'Zimbabwe',
    'hungarian': 'Hungary',
    'federation_of_rhodesia_and_nyasaland': 'Zimbabwe',
    'socialist_republic_of_romania': 'Romania',
    'kingdom_of_albania': 'Albania',
    'iraqi_kurdistan': 'Iraq',
    'union_of_south_africa': 'South Africa',
    'indian_people': 'India',
    'cook_islands': 'New Zealand', ###
    'niue': 'New Zealand', ###
    'georgian_soviet_socialist_republic': 'Georgia',
    'southern_rhodesia': 'Zimbabwe',
    'british_virgin_islands': 'United States',
    'american_samoa': 'United States',
    'danish': 'Denmark',
    'kingdom_of_afghanistan': 'Afghanistan',
    'first_republic_of_austria': 'Austria',
    'british_empire': 'United Kingdom',
    'kingdom_of_greece': 'Greece',
    'belgian_congo': 'Democratic Republic of the Congo',
    'macau': 'China',
    "people's_socialist_republic_of_albania": 'Albania',
    'yemen_arab_republic': 'Yemen',
    'vatican_city': 'Italy',
    'kenya_colony': 'Kenya',
    'tibet_from_1912_to_1951': 'China',
    'ruanda-urundi': 'Rwanda',
    'german_empire': 'Germany',
    'nepali': 'Nepal',
    'united_kingdom_of_great_britain_and_ireland': 'United Kingdom',
    'tuva_republic': 'Russian Federation',
    'austrians': 'Austria',
    'british_national_(overseas)': 'United Kingdom',
    'filipino_people': 'Philippines',
    'lithuanian_soviet_socialist_republic': 'Lithuania',
    'country_of_the_kingdom_of_the_netherlands': 'Netherlands',
    'netherlands_antilles': 'Netherlands',
    'republic_of_upper_volta': 'Burkina Faso',
    'first_portuguese_republic': 'Portugal',
    "romanian_people's_republic": 'Romania',
    "mongolian_people's_republic": 'Mongolia',
    'democratic_republic_of_georgia': 'Georgia',
    'azerbaijani': 'Azerbaijan',
    'bangladeshis': 'Bangladesh',
    'bulgarian': 'Bulgaria',
    'italians': 'Italy',
    'american_occupation_zone': 'Germany',
    'republic_of_cuba_(1902–59)': 'Cuba',
    'south_yemen': 'Yemen',
    'irish_republic': 'Ireland',
    'british_somaliland': 'Somalia',
    'chinese_taipei': 'China',
    'bosniaks': 'Bosnia and Herzegovina',
    'tibetan_people': 'China',
    'kingdom_of_mysore': 'India',
    'beiyang_government': 'China',
    'afrika': 'South Africa',
    'americans': 'United States',
    'chileans': 'Chile',
    'sint_maarten': 'Netherlands',
    'hungarians': 'Hungary',
    'norwegian': 'Norway',
    'irish': 'Ireland',
    'czechoslovak_republic': 'Czech Republic',
    'mexicana': 'Mexico',
    'cayman_islands': 'Cuba',
    'são_paulo': 'Brazil',
    'québec-comté': 'Canada',
    'israelis': 'Israel',
    'range_of_andia': 'Spain',
    'anguilla':'Cuba',
    'marítimo': 'Portugal',
    'chilena': 'Chile',
    'canadian_french': 'Canada',
    'egyptians': 'Egypt',
    'francia': 'France',
    'ukrainians': 'Ukraine',
    'dominicana': 'Dominican Republic',
    'kurdistan': 'Turkey',
    'germans': 'Germany',
    'the_republic_of_abkhazia': 'Georgia', #### упс...
    'united_federation_of_planets': 'United States', # легенда
    'katun': 'Russian Federation',
    'siciliana': 'Italy',
    'soviètic': 'Russian Federation',
    'first_hungarian_republic': 'Hungary',
    'staffanstorp_municipality': 'Sweden',
    'nuu-chah-nulth': 'Canada',
    'croacia': 'Croatia',
    'liberland': 'Czech Republic',
    'spain_under_the_restoration': 'Spain',
    'venezolano.': 'Venezuela, Bolivarian Republic of',
    'estado_novo': 'Portugal',
    'ivanteyevskaya_street': 'Russian Federation',
    'kuwait_city': 'Kuwait',
    'florence': 'Italy',
    'monterrey': 'Mexico',
    'moldova':'Romania', ###
    'colombiana': 'Colombia',
    'ss_france': 'France',
    'francais_objective_specifique': 'France',
    'mexico_city': 'Mexico',
    'morocco_pavilion': 'Morocco',
    'brazil–uruguay_relations': 'Brazil',
    'ecuador_national_football_team': 'Ecuador',
    'langnau_am_albis': 'Switzerland',
    "federal_people's_republic_of_yugoslavia": 'Yugoslavia',
    'third_czechoslovak_republic': 'Czech Republic',
    'plastin': 'Romania',
    'nazareth': 'Israel',
    'korea':'Korea', ###
    'suisse_romande': 'Switzerland',
    'republika_srpska': 'Bosnia and Herzegovina',
    'san_luis_potosí': 'Mexico',
    'república_de_síria': 'Syrian Arab Republic',
    'tamil_eelam': 'Sri Lanka',
    'sockel_fm2+': 'Spain', # немного непонятно при чем тут розетка...
    'canadian_nationality_law': 'Canada',
    'bicycle_kick': 'Chile',
    'santo_domingo': 'Dominican Republic',
    'québécois': 'Canada',
}
df['country'] = df['nationality'].map(manual_mapping).combine_first(df['country'])
df.loc[df["label"] == "Eliana Rubashkyn", "country"] = "Colombia"
df.loc[df["label"] == "Glen L Roberts", "country"] = "United States"
df.loc[df["label"] == "Denis Pécic", "country"] = "France"
df = df.dropna(subset=['country'])

In [None]:
# словарь для группировки редких стран
country_groups = {
    'Caribbean': [
        'Antigua and Barbuda', 'Bahamas', 'Barbados', 
        'Grenada', 'Saint Kitts and Nevis', 'Saint Lucia',
        'Saint Vincent and the Grenadines', 'Aruba', 'Bermuda',
        'Belize', 'Guyana'
    ],
    'Pacific Islands': [
        'Kiribati', 'Marshall Islands', 'Micronesia, Federated States of',
        'Nauru', 'Guam', 'Solomon Islands', 'Tuvalu', 'Vanuatu',
        'Papua New Guinea', 'Samoa'
    ],
    'African Small States': [
        'Botswana', 'Burundi', 'Cabo Verde', 'Comoros',
        'Djibouti', 'Equatorial Guinea', 'Eswatini', 'Gabon', 'Gambia', 'Eritrea',
        'Guinea-Bissau', 'Lesotho', 'Malawi', 'Mauritania', 'Mauritius', 'Mozambique',
        'São Tomé and Príncipe', 'Seychelles', 'Togo', 'Madagascar',
        'Central African Republic', 'Chad', 'Sierra Leone', 'Liberia'                               
    ],
    'Central Asia': [
        'Kyrgyzstan', 'Tajikistan', 'Turkmenistan', 'Uzbekistan'
    ],
    'Middle East Small States': [
        'Brunei Darussalam', 'Oman', 'Qatar', 'Western Sahara', 'Yemen'
    ],
    'Other Europe': [
        'Gibraltar', 'Monaco', 'San Marino'
    ],
    'Other Asia': [
        'Bhutan', 'Maldives', 'Laos', 'Vietnam'
    ]
}

country_counts = df['country'].value_counts()
keep_individual = country_counts[country_counts > 49].index.tolist() #названия стран где 50 и больше строк не меняем
def group_country(country):
    if country in keep_individual:
        return country
    for group, countries in country_groups.items():
        if country in countries:
            return group
    return 'Other'  #на всякий случай

df = df.copy()
df.loc[:, 'country'] = df['country'].apply(group_country)

Подготавливаем почву для загрузки картинок по ссылкам

Применим умный подход, так как изображений >300к, и будем их загружать асинхронно (многопоточно) по батчам датасета, для уменьшения количества ошибок из-за перегрузки сервера

При этом будем сразу сохранять полученные валидные ссылки, чтобы знать какие строчки не валидны а какие нет

In [None]:
dataset_root = "dataset"
SEM_LIMIT = 40

def safe_url(url):
    try:
        parts = urlsplit(url)
        safe_path = quote(parts.path)
        return urlunsplit((parts.scheme, parts.netloc, safe_path, parts.query, parts.fragment))
    except:
        return url

async def is_url_accessible(session, url, semaphore):
    async with semaphore:
        try:
            url = safe_url(url)
            async with session.get(url, timeout=5) as resp:
                return resp.status == 200
        except:
            return False

async def download_image(session, row, index, semaphore, failed_rows):
    url = safe_url(row['image'])
    class_name = str(row['country']).strip().lower().replace(" ", "_")
    class_dir = os.path.join(dataset_root, class_name)
    os.makedirs(class_dir, exist_ok=True)

    async with semaphore:
        await asyncio.sleep(random.uniform(0.1, 0.3))

        try:
            async with session.get(url, timeout=5) as resp:
                if resp.status == 200:
                    content = await resp.read()
                    try:
                        image = Image.open(BytesIO(content)).convert('RGB')
                        image.save(os.path.join(class_dir, f"{index}.jpg"))
                    except UnidentifiedImageError:
                        failed_rows.append(row)
                        print(f"[{index}] Нераспознанный формат изображения")
                else:
                    failed_rows.append(row)
                    print(f"[{index}] HTTP статус: {resp.status}")
        except Exception as e:
            failed_rows.append(row)
            print(f"[{index}] Ошибка скачивания: {e}")

async def process_all(df):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Referer': 'https://google.com'
    }

    semaphore = asyncio.Semaphore(SEM_LIMIT)
    failed_rows = []

    async with aiohttp.ClientSession(headers=headers) as session:
        tasks_check = [
            is_url_accessible(session, url, semaphore) if pd.notna(url) else False
            for url in df['image']
        ]
        valid_mask = await tqdm_asyncio.gather(*tasks_check, position=0, desc="Проверка URL")
        valid_mask = pd.Series(valid_mask, index=df.index)

        df_valid = df[valid_mask].reset_index(drop=True)
        df_failed_check = df[~valid_mask].reset_index(drop=True)


        tasks_download = [
            download_image(session, row, row["id"], semaphore, failed_rows)
            for _, row in df_valid.iterrows()
        ]
        await tqdm_asyncio.gather(*tasks_download, position=0, desc="Загрузка изображений")

    df_failed_download = pd.DataFrame(failed_rows)
    df_failed_total = pd.concat([df_failed_check, df_failed_download], ignore_index=True)
    return df_valid, df_failed_total

async def process_all_in_batches(df, batch_size=10000, sleep_between_batches=30):
    total_batches = (len(df) + batch_size - 1) // batch_size
    all_valid, all_failed = [], []

    for i in range(total_batches):
        start, end = i * batch_size, min((i + 1) * batch_size, len(df))
        df_batch = df.iloc[start:end]
        print(f"\n🔹 Обработка батча {i+1}/{total_batches} ({start}–{end})")

        df_valid, df_failed = await process_all(df_batch)
        all_valid.append(df_valid)
        all_failed.append(df_failed)

        print(f"✅ Батч {i+1} завершён, пауза {sleep_between_batches} сек...\n")
        await asyncio.sleep(sleep_between_batches)

    df_valid_final = pd.concat(all_valid, ignore_index=True)
    df_failed_final = pd.concat(all_failed, ignore_index=True)
    return df_valid_final, df_failed_final

In [None]:
nest_asyncio.apply()

In [20]:
# df_valid_1, df_fail_1 = await process_all_in_batches(df[:100000], batch_size=1000, sleep_between_batches=10)
# df_valid_1.to_csv("df_valid_1.csv", index=True)
# df_fail_1.to_csv("df_fail_1.csv", index=True)

In [18]:
# df_valid_2, df_fail_2 = await process_all_in_batches(df[100000:200000], batch_size=1000, sleep_between_batches=10)
# df_valid_2.to_csv("df_valid_2.csv", index=True)
# df_fail_2.to_csv("df_fail_2.csv", index=True)

In [None]:
# df_valid_3, df_fail_3 = await process_all_in_batches(df[200000:], batch_size=1000, sleep_between_batches=10)
# df_valid_3.to_csv("df_valid_3.csv", index=True)
# df_fail_3.to_csv("df_fail_3.csv", index=True)

Сохраним полученный результат в `.csv` файл и затем объединим полученные датафрейм с последующим его сохранением

In [None]:
df_valid_1 = pd.read_csv("df_valid_1.csv")
df_valid_2 = pd.read_csv("df_valid_2.csv")
df_valid_3 = pd.read_csv("df_valid_3.csv")
df_fail_1 = pd.read_csv("df_fail_1.csv")
df_fail_2 = pd.read_csv("df_fail_2.csv")
df_fail_3 = pd.read_csv("df_fail_3.csv")

In [None]:
df_valid = pd.concat([df_valid_1, df_valid_2, df_valid_3], ignore_index=True)
df_failed = pd.concat([df_fail_1, df_fail_2, df_fail_3], ignore_index=True)

В теории тут должно быть создание архива для выгрузки его в яндекс датасфера

P.S. эту ячейку можно не делать потому что далее создается отдельная директория с разделением которую опять будем архивировать

In [None]:
# shutil.make_archive("dataset", 'zip', "dataset")

Разделяем датасет тренировочные, валидационные и тестовые данные с сохранением балансов классов

In [None]:
# dataset = ImageFolder(root="dataset")
#
# file_paths = [sample[0] for sample in dataset.samples]
# labels = [sample[1] for sample in dataset.samples]
#
#
# # Сначала разделяем на train+val (80%) и test (20%)
# train_val_files, test_files, train_val_labels, test_labels = train_test_split(
#     file_paths,
#     labels,
#     test_size=0.2,
#     stratify=labels,
#     random_state=30
# )
# # Разделяем train_val на train и val
# train_files, val_files, train_labels, val_labels = train_test_split(
#     train_val_files,
#     train_val_labels,
#     test_size=0.2,
#     stratify=train_val_labels,
#     random_state=30
# )
#
# # Загружаем полный датасет с преобразованиями
# full_dataset = ImageFolder(root="dataset", transform=None)
#
# # Создаем словарь для индексов (чтобы сопоставить пути с индексами в full_dataset)
# file_to_index = {os.path.normpath(path): idx for idx, (path, _) in enumerate(full_dataset.samples)}
#
# # Получаем индексы для train, val, test
# train_indices = [file_to_index[os.path.normpath(path)] for path in train_files]
# val_indices = [file_to_index[os.path.normpath(path)] for path in val_files]
# test_indices = [file_to_index[os.path.normpath(path)] for path in test_files]
#
#
# train_dataset = Subset(full_dataset, train_indices)
# val_dataset = Subset(full_dataset, val_indices)
# test_dataset = Subset(full_dataset, test_indices)
#
# # Создаем поддиректории
# os.makedirs("split_dataset/train", exist_ok=True)
# os.makedirs("split_dataset/val", exist_ok=True)
# os.makedirs("split_dataset/test", exist_ok=True)
#
# def copy_files(files, target_dir):
#     for file in files:
#         class_name = os.path.basename(os.path.dirname(file))
#         dest_dir = os.path.join(target_dir, class_name)
#         os.makedirs(dest_dir, exist_ok=True)
#         shutil.copy(file, dest_dir)
#
# copy_files(train_files, "split_dataset/train")
# copy_files(val_files, "split_dataset/val")
# copy_files(test_files, "split_dataset/test")

Проверяем распределение классов

In [None]:
# train_dataset = ImageFolder("split_dataset/train", transform=None)
# val_dataset = ImageFolder("split_dataset/val", transform=None)
# test_dataset = ImageFolder("split_dataset/test", transform=None)
#
# def print_class_distribution(dataset, name):
#     if isinstance(dataset, Subset):
#         labels = [dataset.dataset.targets[i] for i in dataset.indices]
#     else:
#         labels = dataset.targets
#     unique, counts = np.unique(labels, return_counts=True)
#     print(f"{name} distribution:")
#     for cls, count in zip(unique, counts):
#         print(f"Class {cls}: {count} samples ({count / len(labels):.2%})")
#
# print_class_distribution(train_dataset, "Train")
# print_class_distribution(val_dataset, "Validation")
# print_class_distribution(test_dataset, "Test")

Опять-таки архивируем разделенный датасет для последующей загрузки в яндекс датасфера

In [None]:
# shutil.make_archive("split_dataset", 'zip', "split_dataset")