In [1]:
import numpy as np
import cv2
import albumentations as A
import torch
from pathlib import Path
import json
from sklearn.preprocessing import LabelEncoder
from tqdm import tqdm
import pandas as pd

  check_for_updates()


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
#Assisted by ChaGPT
import shutil
from pathlib import Path
def extract_drive_dataset(
    zip_path='/content/drive/MyDrive/TKPR221/traffic_signs_dataset.zip',
    extract_path='/content/dataset'
):

    zip_path = Path(zip_path)
    if not zip_path.exists():
        raise FileNotFoundError(f"ZIP file not found at: {zip_path}")

    temp_dir = Path('/content/temp')
    temp_dir.mkdir(exist_ok=True)
    temp_zip = temp_dir / 'dataset.zip'

    print(f"\nCopying ZIP from Drive to Colab...")
    print(f"Source: {zip_path}")
    print(f"Destination: {temp_zip}")
    shutil.copy2(zip_path, temp_zip)

    if temp_zip.exists():
        zip_size = temp_zip.stat().st_size / (1024 * 1024)
        print(f"ZIP copied successfully! Size: {zip_size:.2f} MB")
    else:
        raise RuntimeError("Failed to copy ZIP file")

    extract_dir = Path(extract_path)
    extract_dir.mkdir(parents=True, exist_ok=True)

    print(f"\nExtracting ZIP to: {extract_dir}")
    print("This might take a while...")
    shutil.unpack_archive(str(temp_zip), str(extract_dir))

    print("\nCleaning up temporary files...")
    temp_zip.unlink()
    temp_dir.rmdir()

    crops_dir = extract_dir / 'crops'
    csv_file = crops_dir / 'dataset.csv'

    print("\nVerifying extracted files:")
    print(f"Crops directory exists: {crops_dir.exists()}")
    print(f"Dataset CSV exists: {csv_file.exists()}")

    if crops_dir.exists():
        batch_folders = list(crops_dir.glob('batch_*'))
        print(f"Number of batch folders found: {len(batch_folders)}")


        total_images = sum(len(list(folder.glob('*.jpg'))) for folder in batch_folders)
        print(f"Total number of images found: {total_images}")


        print("\nFolder structure:")
        print(f"└── {crops_dir.name}/")
        print(f"    ├── dataset.csv")
        for i, batch in enumerate(sorted(batch_folders)):
            is_last = i == len(batch_folders) - 1
            prefix = "    └── " if is_last else "    ├── "
            n_images = len(list(batch.glob('*.jpg')))
            print(f"{prefix}{batch.name}/ ({n_images} images)")

    return str(crops_dir)


try:
    dataset_dir = extract_drive_dataset()
    print(f"\nDataset extracted successfully to: {dataset_dir}")
except Exception as e:
    print(f"Error: {str(e)}")


Copying ZIP from Drive to Colab...
Source: /content/drive/MyDrive/TKPR221/traffic_signs_dataset.zip
Destination: /content/temp/dataset.zip
ZIP copied successfully! Size: 126.81 MB

Extracting ZIP to: /content/dataset
This might take a while...

Cleaning up temporary files...

Verifying extracted files:
Crops directory exists: False
Dataset CSV exists: False

Dataset extracted successfully to: /content/dataset/crops


In [4]:
'''
json_data = json.load(open('/content/drive/TKPR221/crops/dataset.json'))
csv_data = []
for image_id, crops in json_data.items():
    for crop in crops:
        csv_data.append({
            'filename': crop['filename'],
            'label': crop['label']
        })

df = pd.DataFrame(csv_data)
df.to_csv('dataset.csv', index=False)'''

"\njson_data = json.load(open('/content/drive/TKPR221/crops/dataset.json'))\ncsv_data = []\nfor image_id, crops in json_data.items():\n    for crop in crops:\n        csv_data.append({\n            'filename': crop['filename'],\n            'label': crop['label']\n        })\n\ndf = pd.DataFrame(csv_data)\ndf.to_csv('dataset.csv', index=False)"

In [5]:

class TrafficSignDataset(torch.utils.data.Dataset):
    def __init__(self, image_dir, csv, transformations=None, size=None, flatten=False, label_encoder=None):
        self.image_dir = Path(image_dir)
        self.csv = csv
        self.transformations = transformations
        self.size = size
        self.flatten = flatten

        if label_encoder is None:
            self.label_encoder = LabelEncoder()
            self.csv['label'] = self.label_encoder.fit_transform(self.csv['label'])
        else:
            self.label_encoder = label_encoder
            self.csv['label'] = self.label_encoder.transform(self.csv['label'])


    def __len__(self):
        return len(self.csv)

    def __getitem__(self, idx):
        image_data = self.csv.iloc[idx]
        image = cv2.imread(str(self.image_dir / image_data['filename']), cv2.IMREAD_GRAYSCALE)

        if self.transformations:
            transformed = self.transformations(image=image)
            image = transformed['image']

        if self.size is not None:
            image = self.resize_with_pad(image, (self.size, self.size))

        if self.flatten:
            image = image.flatten()

        return image, image_data['label']

    def transform(self, image):
        return self.transformations(image)


    def resize_with_pad(self,image: np.array,
                    new_shape: tuple[int, int],
                    padding_color: tuple[int, ...] = (255, 255, 255)) -> np.array:  # Fixed type hint

        ## COPIED FROM : https://gist.github.com/IdeaKing/11cf5e146d23c5bb219ba3508cca89ec

        original_shape = (image.shape[1], image.shape[0])
        ratio = float(max(new_shape))/max(original_shape)
        new_size = tuple([int(x*ratio) for x in original_shape])
        image = cv2.resize(image, new_size)
        delta_w = new_shape[0] - new_size[0]
        delta_h = new_shape[1] - new_size[1]
        top, bottom = delta_h//2, delta_h-(delta_h//2)
        left, right = delta_w//2, delta_w-(delta_w//2)
        image = cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=padding_color)
        return image

    def get_label_encoded_classes(self):
        classes = self.label_encoder.classes_
        encoded_classes = self.label_encoder.transform(classes)
        return encoded_classes



In [6]:
!pip install neptune

Collecting neptune
  Downloading neptune-1.13.0-py3-none-any.whl.metadata (16 kB)
Collecting boto3>=1.28.0 (from neptune)
  Downloading boto3-1.35.97-py3-none-any.whl.metadata (6.7 kB)
Collecting bravado<12.0.0,>=11.0.0 (from neptune)
  Downloading bravado-11.0.3-py2.py3-none-any.whl.metadata (5.9 kB)
Collecting swagger-spec-validator>=2.7.4 (from neptune)
  Downloading swagger_spec_validator-3.0.4-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting botocore<1.36.0,>=1.35.97 (from boto3>=1.28.0->neptune)
  Downloading botocore-1.35.97-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3>=1.28.0->neptune)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.11.0,>=0.10.0 (from boto3>=1.28.0->neptune)
  Downloading s3transfer-0.10.4-py3-none-any.whl.metadata (1.7 kB)
Collecting bravado-core>=5.16.1 (from bravado<12.0.0,>=11.0.0->neptune)
  Downloading bravado-core-6.1.1.tar.gz (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [12]:
from torch.utils.data import random_split
import neptune
from sklearn.linear_model import SGDClassifier
import time

crops_dir = "/content/drive/MyDrive/TKPR221/crops/"
dataset_csv = "/content/drive/MyDrive/TKPR221/dataset.csv"
original_df = pd.read_csv(dataset_csv).iloc[:]
df = original_df.groupby('label').sample(n=round(10000/399), random_state=221, replace=True)

print(original_df.describe())
print(df.describe())


n_epochs = 5
batch_size = 32
train_size = 0.8
val_size = 0.1
test_size = 0.1
image_size = 256 # 256x256

transformations = A.Compose([
    A.RandomRotate90(),
    A.HorizontalFlip(),
    A.VerticalFlip(),
    A.RandomBrightnessContrast()
])


dataset = TrafficSignDataset(crops_dir, df, size=image_size, flatten=True, transformations=transformations)


train_dataset, val_dataset, test_dataset = random_split(
    dataset,
    [train_size, val_size, test_size],
    generator=torch.Generator()
)


train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=2048, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=2048)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=2048)


sgd_clf = SGDClassifier(
    loss='log_loss',
    penalty='l1',
    alpha=0.001,
    max_iter=1000,
    random_state=221
)

run = neptune.init_run(
    project="muguryalcin/TKPR221",
    api_token="eyJhcGlfYWRkcmVzcyI6Imh0dHBzOi8vYXBwLm5lcHR1bmUuYWkiLCJhcGlfdXJsIjoiaHR0cHM6Ly9hcHAubmVwdHVuZS5haSIsImFwaV9rZXkiOiIyMjhjOTdkOS03YjZiLTRlYWMtOWYzMi05MGIwNmYzZTMxZmMifQ==",
    name=f"SGDClassifier_{time.strftime('%Y%m%d_%H%M%S')}",
    description="Description of the experiment",
    tags=["SGDClassifier", "l1_losg_loss_alpha0001"]
)


run["hyperparameters"] = {
    "n_epochs": n_epochs,

    "model": {
        "type": "SGDClassifier",
        "loss": sgd_clf.get_params()['loss'],
        "penalty": sgd_clf.get_params()['penalty'],
        "alpha": sgd_clf.get_params()['alpha'],
        "max_iter": sgd_clf.get_params()['max_iter']
    },

    "dataset": {
        "batch_size": batch_size,
        "total_size": len(dataset),
        "train_size": train_size,
        "val_size": val_size,
        "test_size": test_size,
        "n_classes": len(dataset.get_label_encoded_classes()),
        "image_size": image_size,
        "flattened": True,
        "transformations": transformations
    },
}

                                    filename       label
count                                  59534       59534
unique                                 59534         399
top     batch_5/--FValmNpFJ8yo8X7uWODA_0.jpg  other-sign
freq                                       1       39121
                                    filename                             label
count                                   9975                              9975
unique                                  6130                               399
top     batch_0/HTi48u68aWbnKR1R6Y7kew_5.jpg  complementary--accident-area--g3
freq                                      25                                25
[neptune] [info   ] Neptune initialized. Open in the app: https://app.neptune.ai/muguryalcin/TKPR221/e/TKPR-201


In [None]:
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score

all_classes = dataset.get_label_encoded_classes()

for epoch in range(n_epochs):
    train_loop = tqdm(train_loader, desc=f'Training Epoch {epoch+1} / {n_epochs}')
    for batch_X, batch_y in train_loop:
        X = batch_X.numpy()
        y = batch_y.numpy()
        sgd_clf.partial_fit(X, y, classes=all_classes)


    train_preds = sgd_clf.predict(X)

    run["train/accuracy"].append(accuracy_score(y, train_preds))
    run["train/f1"].append(f1_score(y, train_preds, average='weighted'))
    run["train/recall"].append(recall_score(y, train_preds, average='weighted'))
    run["train/precision"].append(precision_score(y, train_preds, average='weighted'))
    run["train/loss"].append(sgd_clf.get_params()['loss'])


    val_preds = []
    val_true = []
    for batch_X, batch_y in val_loader:
        X = batch_X.numpy()
        val_preds.extend(sgd_clf.predict(X))
        val_true.extend(batch_y.numpy())

    val_acc = accuracy_score(val_true, val_preds)
    val_f1 = f1_score(val_true, val_preds, average='weighted')
    val_recall = recall_score(val_true, val_preds, average='weighted')
    val_precision = precision_score(val_true, val_preds, average='weighted')

    print(f"Epoch {epoch+1} validation metrics:")
    print(f"Accuracy: {val_acc:.4f}")
    print(f"F1 Score: {val_f1:.4f}")
    print(f"Recall: {val_recall:.4f}")
    print(f"Precision: {val_precision:.4f}")


    run["val/accuracy"].append(val_acc)
    run["val/f1"].append(val_f1)
    run["val/recall"].append(val_recall)
    run["val/precision"].append(val_precision)


test_preds = []
test_true = []
for batch_X, batch_y in test_loader:
    X = batch_X.numpy()
    test_preds.extend(sgd_clf.predict(X))
    test_true.extend(batch_y.numpy())

test_acc = accuracy_score(test_true, test_preds)
test_f1 = f1_score(test_true, test_preds, average='weighted')
test_recall = recall_score(test_true, test_preds, average='weighted')
test_precision = precision_score(test_true, test_preds, average='weighted')

print(f"\nFinal test metrics:")
print(f"Accuracy: {test_acc:.4f}")
print(f"F1 Score: {test_f1:.4f}")
print(f"Recall: {test_recall:.4f}")
print(f"Precision: {test_precision:.4f}")


run["test/accuracy"].append(test_acc)
run["test/f1"].append(test_f1)
run["test/recall"].append(test_recall)
run["test/precision"].append(test_precision)

run.stop()

# Final Remarks: My group and I wrote the majority of this code snippet, and ChatGPT was primarily utilized for debugging.  