In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


import torch
from torch import optim
from torch import nn
from torch.utils.data import DataLoader
from tqdm import tqdm

# !pip install torchvision
import torchvision

import torch.nn.functional as F
import torchvision.datasets as datasets
import torchvision.transforms as transforms

In [None]:
import os
import shutil

# 1. 设置参数
source_folder = "/kaggle/input/3classes-pneumonia-x-ray-classification/pneumonia-kaggle/train/PNEUMONIA"  # 包含所有图像的文件夹
bacteria_folder = "/kaggle/working/train/bacteria"  # 细菌图像的目标文件夹
virus_folder = "/kaggle/working/train/virus"  # 病毒图像的目标文件夹

# 2. 创建目标文件夹
os.makedirs(bacteria_folder, exist_ok=True)
os.makedirs(virus_folder, exist_ok=True)

# 3. 遍历源文件夹中的所有文件
for filename in os.listdir(source_folder):
    source_path = os.path.join(source_folder, filename)

    # 确保是文件而不是文件夹
    if os.path.isfile(source_path):
        # 根据文件名判断类别 (这里需要根据你的文件名规则进行修改)
        if "bacteria" in filename.lower():  # 假设细菌图像文件名包含 "bacteria"
            destination_path = os.path.join(bacteria_folder, filename)
            shutil.copy(source_path, destination_path)  # 复制文件
            #print(f"Copied {filename} to bacteria folder")
        elif "virus" in filename.lower():  # 假设病毒图像文件名包含 "virus"
            destination_path = os.path.join(virus_folder, filename)
            shutil.copy(source_path, destination_path)  # 复制文件
            #print(f"Copied {filename} to virus folder")
print("Classification complete.")

In [None]:
import os
import shutil

# 1. 设置参数
source_folder = "/kaggle/input/3classes-pneumonia-x-ray-classification/pneumonia-kaggle/train/NORMAL"  # 源文件夹
destination_folder = "/kaggle/working/train/normal"  # 目标文件夹

# 2. 创建目标文件夹
os.makedirs(destination_folder, exist_ok=True)

# 3. 复制文件
for filename in os.listdir(source_folder):
    source_path = os.path.join(source_folder, filename)
    destination_path = os.path.join(destination_folder, filename)

    # 确保是文件
    if os.path.isfile(source_path):
        shutil.copy(source_path, destination_path)
        #print(f"Copied {filename} to {destination_folder}")

print("Copy complete.")

In [None]:
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])

data_transforms = {
    'train':
    transforms.Compose([
        transforms.Resize((224,224)),
        transforms.RandomAffine(0, shear=10, scale=(0.8,1.2)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        normalize
    ]),
    'validation':
    transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        normalize
    ]),
}

image_datasets = {
    'train':
    datasets.ImageFolder('/kaggle/working/train', data_transforms['train'])
}

dataloaders = {
    'train':
    torch.utils.data.DataLoader(image_datasets['train'],
                                batch_size=784,
                                shuffle=True,
                                num_workers=0)
}

In [None]:
class CNN(nn.Module):
   def __init__(self, in_channels, num_classes):

       super(CNN, self).__init__()

       # 1st convolutional layer
       self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=8, kernel_size=3, padding=1)
       # Max pooling layer
       self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
       # 2nd convolutional layer
       self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, padding=1)
       # Fully connected layer
       self.fc1 = nn.Linear(16 * 56 * 56, 3)


   def forward(self, x):
       x = F.relu(self.conv1(x))
       x = self.pool(x)
       x = F.relu(self.conv2(x))
       x = self.pool(x)
       x = x.reshape(x.shape[0], -1)
       x = self.fc1(x)
       return x

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

model = CNN(in_channels=3, num_classes=3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
print(model)

In [None]:
num_epochs=10
for epoch in range(num_epochs):
 # Iterate over training batches
   print(f"Epoch [{epoch + 1}/{num_epochs}]")

   for batch_index, (data, targets) in enumerate(tqdm(dataloaders['train'])):
       data = data.to(device)
       targets = targets.to(device)
       scores = model(data)
       loss = criterion(scores, targets)
       optimizer.zero_grad()
       loss.backward()
       optimizer.step()

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, classification_report
from PIL import Image

model.eval()
pred = []
filenames = []
test_folder = "/kaggle/input/3classes-pneumonia-x-ray-classification/pneumonia-kaggle/test"

for filename in tqdm(os.listdir(test_folder)):
   if filename.endswith(".jpeg") or filename.endswith(".png") or filename.endswith(".jpg"):
        base_filename, ext = os.path.splitext(filename)
        filenames.append(base_filename)
        img_path = os.path.join(test_folder, filename)

        try:
            img = Image.open(img_path).convert('RGB')
        except (FileNotFoundError, OSError) as e:
            print(f"Error loading image {filename}: {e}")
            pred.append(-1)
            continue

        img = data_transforms['validation'](img).unsqueeze(0).to(device)


        with torch.no_grad():
            output = model(img)
            _, predicted = torch.max(output.data, 1)
            predicted_class = predicted.item()
            pred.append(predicted_class)

In [None]:
results = pd.DataFrame({'Id': filenames, 'Category': pred})
results = results.sort_values(by='Id')
results

In [None]:
results.to_csv('predictions.csv', index=False)
print("Prediction complete.")