In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/sweden-traffic-signs-classification/sample.csv
/kaggle/input/sweden-traffic-signs-classification/data.rar
/kaggle/input/sweden-traffic-signs-classification/data.zip
/kaggle/input/sweden-traffic-signs-classification/train.csv
/kaggle/input/sweden-traffic-signs-classification/test.csv
/kaggle/input/sweden-traffic-signs-classification/preprocessed.rar


# Сиды

In [2]:
import numpy as np
import torch 
import os

seed=42
os.environ['PYTHONHASHSEED']=str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)




In [3]:
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Импорты

In [4]:
import timm
from torch import nn
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import tqdm 
from tqdm import tqdm 
from torchvision.transforms import v2 
from PIL import Image 
from sklearn.metrics import f1_score

# TEST MODE

In [5]:
TEST_MODE=False

# Загрузка данных

In [6]:
train=pd.read_csv('/kaggle/input/sweden-traffic-signs-classification/train.csv')
test=pd.read_csv('/kaggle/input/sweden-traffic-signs-classification/test.csv')
sample=pd.read_csv('/kaggle/input/sweden-traffic-signs-classification/sample.csv')
    
import zipfile
from pathlib import Path

zip_path = Path('/kaggle/input/sweden-traffic-signs-classification/data.zip')
out_dir  = Path('/kaggle/working/data')
out_dir.mkdir(parents=True, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as z:
    z.extractall(out_dir)


In [7]:
dir_of_all='/kaggle/working/data/data'

In [8]:
if TEST_MODE:
    train=train.sample(n=500, random_state=seed).reset_index(drop=True)
else:
    train=train

In [9]:
train

Unnamed: 0,file_name,label
0,picture-009375.jpg,PEDESTRIAN_CROSSING
1,picture-511184.jpg,PEDESTRIAN_CROSSING
2,picture-616769.jpg,PEDESTRIAN_CROSSING
3,picture-963518.jpg,PEDESTRIAN_CROSSING
4,picture-486207.jpg,PEDESTRIAN_CROSSING
...,...,...
2498,picture-565027.jpg,PASS_RIGHT_SIDE
2499,picture-034604.jpg,OTHER
2500,picture-258637.jpg,PRIORITY_ROAD
2501,picture-586334.jpg,NO_PARKING


In [10]:
test

Unnamed: 0,file_name
0,picture-176290.jpg
1,picture-834444.jpg
2,picture-768882.jpg
3,picture-212331.jpg
4,picture-277367.jpg
...,...
605,picture-641767.jpg
606,picture-996017.jpg
607,picture-671413.jpg
608,picture-043377.jpg


In [11]:
train_data, eval_data=train_test_split(train, test_size=0.2, stratify=train['label'])

# Dataset

## Augmentations

In [17]:
trasforms_for_train=v2.Compose([
    v2.Resize(224),
    v2.ToTensor(), 
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    
    ])

transforms_for_test=v2.Compose([
    v2.Resize(224),
    v2.ToTensor(), 
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])


In [None]:
class SwedenDataset(Dataset):
    def __init__(self, df, path_to_imgs, transforms, with_labels):
        self.df=df
        self.path_to_imgs=path_to_imgs
        self.trasforms=transforms
        self.with_labels=with_labels
    def __len__(self):
        return len(self.df)
    def __getitem__(self, idx):
        row=self.df.iloc[idx]
        image_name=row['file_name']
        image_path=os.path.join(self.path_to_imgs, f'{image_name}.jpg')
        image=Image.open(image_path).convert('RGB')
        if self.trasforms is not None:
            image=trasforms(image)
        else:
            image=image
        if self.with_labels:
            label=torch.tensor(int(row['label']), dtype=torch.long)
            return {
                'image': image, 
                'label': label
            }
        else:
            return {
                'image': image, 
                'image_name': image_name
            }

        

##  Создание датасетов 

In [None]:
train_dataset=SwedenDataset(train_data, dir_of_all, trasforms_for_train, with_labels=True)
eval_dataset=SwedenDataset(train_data, dir_of_all, trasforms_for_train, with_labels=True)
test_dataset=SwedenDataset(train_data, dir_of_all, transforms_for_test, with_labels=False)


## DataLoaders

In [None]:
train_dataloader=DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=8)
eval_dataloader=DataLoader(eval_dataset, batch_size=32, shuffle=False, num_workers=8)
test_dataloader=DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=8)

# Кол-во эпох

In [None]:
EPOCHS=10

# Model

In [None]:
model=timm.create_model('ecaresnet50d.ra2_in1k', pretrained=True, num_labels=5).to(device)

# Loss

In [None]:
criteration=torch.nn.CrossEntropyLoss()

# Optimizer

In [None]:
optimizer=torch.optim.SGD(model.params, lr=0.001, momentum=0.9, weight_decay=1e-4)

# Scheduler

In [None]:
scheduler=torch.optim.CosineAnnealingLR(optimizer, T_max=EPOCHS)

# Training loop

In [None]:
best_f1=0.0

for epoch in range(1, EPOCHS+1):
    model.train()
    running_loss, running_correct, n= 0.0, 0, 0
    optimizer.zero_grad()
    pbar=tqdm(train_dataloader, desc='train', leave=False)
    for step, batch in enumerate(pbar):
        X=batch['image'].to(device)
        y=batch['label'].to(device)
        optimizer.zero_grad()
        logits=model(X)
        loss=criteration(logits, y)
        loss.backward()
        optimizer.step()
        running_loss=loss.item()* X.size(0)
        preds=logtis.argmax(dim=1)
        running_correct+=(preds==y).sum().item()
        n+=X.size(0)

        pbar.set_postfix(loss=running_loss/ max(n, 1), acc=running_correct / max(n, 1))
    sheduler.step()
    print(f'train: loss={running_loss/n:.4f}, acc={running_correct/n:.4f}')
    model.eval()
    loss_sum, correct, n=0.0,0,0
    all_probs, all_targets=[], []

    #####################################################################################################################
    with torch.no_grad():
        pbar=tqdm.tqdm(eval_dataloader, desc='validation', leave=False)
    
        for batch in pbar:
            X=batch['image'].to(device)
            y=batch['label'].to(device)

            logits=model(X)
            loss=criterion(logits, y)

            loss_sum+=loss.item()* X.size(0)

            preds=logits.argmax(dim=1)

            correct+=(preds==y).sum().item()

            n+=X.size(0)
            probs=torch.softmax(logits, dim=1)[:, 1].detach().cpu().numpy()

            all_probs.append(probs)
            all_targets.append(y.detach().cpu().numpy())
        all_probs = np.concatenate(all_probs) if len(all_probs) else np.array([])
        all_targets = np.concatenate(all_targets) if len(all_targets) else np.array([])
        auc = roc_auc_score(all_targets, all_probs)
        val_loss = loss_sum / n
        val_acc  = correct / n
        print(f"valid: loss={val_loss:.4f}, acc={val_acc:.4f}, AUC={auc:.4f}")

        if auc > best_auc:
            best_auc = auc
            torch.save({"model": model.state_dict()}, "best_resnet50.pt")
            print(f"✓ Saved new best (AUC {best_auc:.4f})")

# Submission

In [None]:
ckpt_path="best_resnet50.pt"
state=torch.load(ckpt_path, map_location=device)
model.load_state_dict(state["model"])

model.eval()

test_names, test_probs=[], []

with torch.no_grad():
    pbar = tqdm.tqdm(test_dataloader, desc="test", leave=False)
    for batch in pbar:
        x = batch["image"].to(device, non_blocking=True)
        logits = model(x)                       # [B, 2]
        probs  = torch.softmax(logits, 1)[:, 1] # positive-class probability
        test_probs.append(probs.cpu().numpy())
        test_names.extend(batch["image_name"])
test_probs = np.concatenate(test_probs, axis=0)
pred_df = pd.DataFrame({"image_name": test_names, "label": test_probs})

# 3) Align to sample order & save
submission = sample[["image_name"]].merge(pred_df, on="image_name", how="left")
#submission["target"] = submission["target"].fillna(0.0)  # just in case
submission.to_csv("submission.csv", index=False)
print(submission.head(), "\nSaved -> submission.csv")