<a href="https://colab.research.google.com/github/RaincallerMei/Galaxy-Zoo-Solution/blob/main/Astro_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!unzip /content/drive/MyDrive/AstroCNN/galaxy-zoo-the-galaxy-challenge.zip -d /content/dataset


Archive:  /content/drive/MyDrive/AstroCNN/galaxy-zoo-the-galaxy-challenge.zip
  inflating: /content/dataset/all_ones_benchmark.zip  
  inflating: /content/dataset/all_zeros_benchmark.zip  
  inflating: /content/dataset/central_pixel_benchmark.zip  
  inflating: /content/dataset/images_test_rev1.zip  
  inflating: /content/dataset/images_training_rev1.zip  
  inflating: /content/dataset/training_solutions_rev1.zip  


In [None]:
import os
import glob

# Go to the directory where the 6 zips live
os.chdir("/content/dataset")

# Find all the .zip files
zip_files = glob.glob("*.zip")

# Unzip each
for zipf in zip_files:
    folder_name = os.path.splitext(zipf)[0]  # e.g., "folder1" from "folder1.zip"
    !unzip -q {zipf} -d {folder_name}

replace all_zeros_benchmark/all_zeros_benchmark.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace training_solutions_rev1/training_solutions_rev1.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace images_test_rev1/images_test_rev1/100018.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace images_test_rev1/images_test_rev1/100037.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace images_test_rev1/images_test_rev1/100042.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace images_test_rev1/images_test_rev1/100052.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace images_test_rev1/images_test_rev1/100056.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: N
replace central_pixel_benchmark/central_pixel_benchmark.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace all_ones_benchmark/all_ones_benchmark.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: N
replace images_training_rev1/images_training_rev1/100008.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: N


In [None]:
!rm *.zip

#Match Images to CSV Rows

Load and Index the CSV

In [None]:
import pandas as pd

solutions_df = pd.read_csv("/content/dataset/training_solutions_rev1/training_solutions_rev1.csv")

# Suppose the columns are like:
# GalaxyID, p1, p2, ..., p37
# We'll create a dict { galaxy_id: [p1, p2, ..., p37], ... }

id_to_probs = {}
for row in solutions_df.itertuples(index=False):
    galaxy_id = str(row.GalaxyID)
    # row has p1, p2, ..., p37 as subsequent columns
    # Convert them to a list or tensor
    probabilities = list(row[1:])  # row[0] is GalaxyID, row[1:] are the 37 prob columns
    id_to_probs[galaxy_id] = probabilities

#Now id_to_probs["123456"] might be [0.123, 0.456, ..., 0.999], etc.

Custom Dataset Class:


1.   Lists all image files in training/.
2. For each file, extracts GalaxyID from the filename (e.g., 123456 from 123456.jpg).
3. Finds the matching probabilities from id_to_probs.
4. Loads the image, applies transformations, and returns (image_tensor, label_tensor).



In [None]:
import os
import torch
from torch.utils.data import Dataset
from PIL import Image

class GalaxyDataset(Dataset):
    def __init__(self,
                 images_dir,
                 id_to_probs,
                 transform=None):
        self.images_dir = images_dir
        self.id_to_probs = id_to_probs
        self.transform = transform

        # List all JPG files in images_dir
        self.image_paths = [
            os.path.join(images_dir, f)
            for f in os.listdir(images_dir)
            if f.lower().endswith('.jpg')
        ]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        # Filename is something like "123456.jpg"
        filename = os.path.basename(img_path)
        galaxy_id = os.path.splitext(filename)[0]  # "123456"

        # Load image
        image = Image.open(img_path).convert("RGB")

        # Convert probabilities to a tensor
        # If galaxy_id doesn't exist in id_to_probs,
        # you might want to handle KeyError or skip it
        probabilities = self.id_to_probs.get(galaxy_id, None)
        if probabilities is None:
            # If no label is found, you could handle it
            # (e.g., raise an Exception, or return dummy data)
            raise ValueError(f"GalaxyID {galaxy_id} not found in id_to_probs")

        # Transform image
        if self.transform:
            image = self.transform(image)

        # Convert list of probabilities to a FloatTensor
        label = torch.tensor(probabilities, dtype=torch.float32)

        return image, label

Transform the jpg/PIL images into tensors!

In [None]:
import torchvision.transforms as T

transformImage = T.Compose([
    T.Resize((224, 224)),
    T.ToTensor(),
    # Possibly T.Normalize(mean, std) if needed
])

#4. Creating a DataLoader


Create a Dataset and a DataLoader

In [None]:
train_dir = "/content/dataset/images_training_rev1/images_training_rev1"
train_dataset = GalaxyDataset(
    images_dir=train_dir,
    id_to_probs=id_to_probs,
    transform=transformImage
)

from torch.utils.data import DataLoader

train_loader = DataLoader(
    train_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=2  # or more, depending on environment
)

#5. Defining the CNN Model


GOATED CNN MODEL IMPLEMENTATION

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms.v2 as v2
from tqdm import tqdm
import torch.optim as optim

class GalaxyCNN(nn.Module):
    def __init__(self, num_outputs=37):
        super(GalaxyCNN, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)

        # Max-pooling layers
        self.pool = nn.MaxPool2d(2, 2)  # Reduce spatial dimensions by half

        # Fully connected layers
        self.fc1 = nn.Linear(128 * 8 * 8, 256)  # Adjusted based on 69x69 input
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, num_outputs)

        # Dropout for regularization
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        # Convolutional layers with ReLU activation and pooling
        x = self.pool(F.relu(self.conv1(x)))  # Output shape: [batch, 32, 34, 34]
        x = self.pool(F.relu(self.conv2(x)))  # Output shape: [batch, 64, 17, 17]
        x = self.pool(F.relu(self.conv3(x)))  # Output shape: [batch, 128, 6, 6]

        # Flatten the feature maps
        x = x.view(x.size(0), -1)  # Output shape: [batch, 128 * 8 * 8]

        # Fully connected layers with dropout
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.fc3(x)  # Output shape: [batch, 37]

        return x

# Example preprocessing function for rotation augmentation
# def augment_rotations(images, num_rotations=4):
#     """Generates multiple rotated versions of the input images."""
#     rotations = []
#     for i in range(num_rotations):
#         angle = i * (360 // num_rotations)  # Compute rotation angle
#         rotated_images = torch.rot90(images, k=i, dims=(-2, -1))  # Rotate images
#         rotations.append(rotated_images)
#     return torch.cat(rotations, dim=0)  # Combine rotated views into one batch


# Preprocessing function for cropping and downsampling
def preprocess_images(images):
    transform = v2.Compose([
        v2.ToPILImage(),
        v2.CenterCrop((207, 207)),
        v2.Resize((69, 69)),
        v2.ToTensor()
    ])
    processed_images = torch.stack([transform(image) for image in images])  # Process each image separately
    return processed_images

#6. Training: Loss Function and Optimizer


BCEWithLogitsLoss is typically used for multi-label classification of the 37 features that can be present (probability close to 1) or absent (close to 0)

In [None]:
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
num_epochs = 5
print_every = 100
model.train()

for epoch in range(num_epochs):
    running_loss = 0.0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}")

    for i, (images, labels) in enumerate(progress_bar):
        # Preprocess images

        transforms = v2.Compose([
            v2.RandomResizedCrop(size=(207, 207), antialias=True),
            v2.RandomHorizontalFlip(p=0.5),
            v2.ToDtype(torch.float32, scale=True),
            v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

        images = transforms(images)

        images = preprocess_images(images)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        if i % print_every == 0:
            avg_loss = running_loss / print_every
            print(f"[Epoch {epoch+1}, Batch {i}] loss: {avg_loss:.4f}")
            running_loss = 0.0

        # Update the progress bar with the current loss
        progress_bar.set_postfix(loss=(running_loss/(i+1)))

Epoch 1:   0%|          | 0/3849 [00:00<?, ?it/s]


RuntimeError: mat1 and mat2 shapes cannot be multiplied (16x8192 and 100352x256)

#7. Generating Predictions for Testing Images


After training is done:

1. Create a new dataset for your testing/ folder (no labels needed).
2. Forward pass each test image to get predicted probabilities.
3. Save them in the same format as your training CSV or the benchmark CSVs (GalaxyID + 37 columns).



In [None]:
class GalaxyTestDataset(Dataset):
    def __init__(self, images_dir, transform=None):
        self.images_dir = images_dir
        self.transform = transform
        self.image_paths = [
            os.path.join(images_dir, f)
            for f in os.listdir(images_dir)
            if f.lower().endswith('.jpg')
        ]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        path = self.image_paths[idx]
        image = Image.open(path).convert("RGB")
        filename = os.path.basename(path)
        galaxy_id = os.path.splitext(filename)[0]

        if self.transform:
            image = self.transform(image)

        return image, galaxy_id


Generate Predictions python Copy

In [None]:
test_dir = "/content/dataset/images_test_rev1/images_test_rev1"
test_dataset = GalaxyTestDataset(test_dir, transform=transformImage)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

model.eval()
predictions = []

with torch.no_grad():
    for images, galaxy_ids in test_loader:
        outputs = model(images)        # shape [batch, 37]
        probs = torch.sigmoid(outputs) # convert logits -> probabilities, shape [batch, 37]

        # Iterate and store results
        for gid, prob_vec in zip(galaxy_ids, probs):
            # Convert prob_vec to a Python list
            prob_list = prob_vec.cpu().numpy().tolist()
            predictions.append((gid, prob_list))


Save results

In [None]:
import csv

output_csv = "/content/my_predictions.csv"
header = ["GalaxyID"] + [f"p{i}" for i in range(1, 38)]  # 37 prob columns

with open(output_csv, mode="w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(header)

    for (gid, prob_list) in predictions:
        row = [gid] + prob_list
        writer.writerow(row)

print("Predictions saved to:", output_csv)


#8. Test Correctness (Square Error)

1. Loss Root Mean Squared Error:
2. AOC

#9. Uh Sigma
