In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install torch torchvision


Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl (410.6 MB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch)
  Using cached nvidia_cufft_cu12-11.0.2.54-py3-none-manylinux1_x86_64.whl (121.6 MB)
Collecting nvidia-curand-cu12==10.3.2.106 (from torch)
  Using cached nvidia_curand_cu12-10.3.2.106-py3-none-manylinux1_x86_64.whl (56.5 MB)
Collectin

In [None]:
import os
import json
import concurrent.futures
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

class RipCurrentDataset(Dataset):
    def __init__(self, image_dir, annotation_dir, transform=None, num_samples=20000):
        self.image_dir = image_dir
        self.annotation_dir = annotation_dir
        self.transform = transform

        self.image_files = [f for f in os.listdir(image_dir) if f.endswith('.jpg')]

        # Filter out images without corresponding json files
        self.image_files = [f for f in self.image_files if os.path.exists(os.path.join(self.annotation_dir, f.replace('.jpg', '.json')))]

        self.image_files = self.image_files[:num_samples]  # Limit to num_samples

        self.annotations = self.load_annotations(self.image_files)

    def load_annotations(self, image_files):
        annotations = []
        def load_json(img_name):
            annotation_path = os.path.join(self.annotation_dir, img_name.replace('.jpg', '.json'))
            with open(annotation_path) as f:
                return json.load(f)

        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = list(executor.map(load_json, image_files))

        return results

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert("RGB")

        annotation = self.annotations[idx]
        boxes = annotation['annotations']['drawing']

        # Print raw annotation for debugging
        # print(f'Raw annotation for {img_name}: {annotation}')

        if annotation['annotations']['class'] == 0:
            # If class is 0, there are no bounding boxes
            boxes = torch.empty((0, 4), dtype=torch.float32)
            labels = torch.tensor([], dtype=torch.int64)
        else:
            # Convert points to [x_min, y_min, x_max, y_max] format
            valid_boxes = []
            for box in boxes:
                x_min = min(point[0] for point in box)
                y_min = min(point[1] for point in box)
                x_max = max(point[0] for point in box)
                y_max = max(point[1] for point in box)
                if x_max > x_min and y_max > y_min:
                    valid_boxes.append([x_min, y_min, x_max, y_max])

            boxes = torch.tensor(valid_boxes, dtype=torch.float32)
            labels = torch.tensor([annotation['annotations']['class']] * len(boxes), dtype=torch.int64)

        target = {}
        target['boxes'] = boxes
        target['labels'] = labels

        # Print diagnostic information
        # print(f'Image {img_name} has {len(boxes)} valid boxes')

        if self.transform:
            image = self.transform(image)

        return image, target

transform = transforms.Compose([
    transforms.ToTensor(),
])

image_dir = '/content/drive/MyDrive/RIPcurrent/TS_1.image_1.Haeundae_3.PARA2'
annotation_dir = '/content/drive/MyDrive/RIPcurrent/TL_1.JSON_1.Haeundae_3.PARA2'

dataset = RipCurrentDataset(image_dir=image_dir,
                            annotation_dir=annotation_dir,
                            transform=transform,
                            num_samples=20000)

def collate_fn(batch):
    # Filter out samples with no bounding boxes
    filtered_batch = [sample for sample in batch if sample[1]['boxes'].size(0) > 0]
    filtered_out_count = len(batch) - len(filtered_batch)
    if filtered_out_count > 0:
        print(f'Filtered out {filtered_out_count} samples with no valid bounding boxes')
    if len(filtered_batch) == 0:
        return [], []
    return tuple(zip(*filtered_batch))

dataloader = DataLoader(dataset, batch_size=4, shuffle=True, num_workers=2, collate_fn=collate_fn)


In [None]:
#Dataset 최신 이안류있는 이미지 없는 이미지를 분리
import os
import json
import concurrent.futures
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

class RipCurrentDataset(Dataset):
    def __init__(self, image_dir, annotation_dir, transform=None, num_samples=20000, include_negative=True):
        self.image_dir = image_dir
        self.annotation_dir = annotation_dir
        self.transform = transform
        self.include_negative = include_negative

        self.image_files = [f for f in os.listdir(image_dir) if f.endswith('.jpg')]

        # Filter out images without corresponding json files
        self.image_files = [f for f in self.image_files if os.path.exists(os.path.join(self.annotation_dir, f.replace('.jpg', '.json')))]

        self.image_files = self.image_files[:num_samples]  # Limit to num_samples

        self.annotations = self.load_annotations(self.image_files)

    def load_annotations(self, image_files):
        annotations = []
        def load_json(img_name):
            annotation_path = os.path.join(self.annotation_dir, img_name.replace('.jpg', '.json'))
            with open(annotation_path) as f:
                return json.load(f)

        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = list(executor.map(load_json, image_files))

        return results

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert("RGB")

        annotation = self.annotations[idx]
        boxes = annotation['annotations']['drawing']

        if annotation['annotations']['class'] == 0:
            # If class is 0, there are no bounding boxes
            boxes = torch.empty((0, 4), dtype=torch.float32)
            labels = torch.tensor([], dtype=torch.int64)
        else:
            # Convert points to [x_min, y_min, x_max, y_max] format
            valid_boxes = []
            for box in boxes:
                x_min = min(point[0] for point in box)
                y_min = min(point[1] for point in box)
                x_max = max(point[0] for point in box)
                y_max = max(point[1] for point in box)
                if x_max > x_min and y_max > y_min:
                    valid_boxes.append([x_min, y_min, x_max, y_max])

            boxes = torch.tensor(valid_boxes, dtype=torch.float32)
            labels = torch.tensor([annotation['annotations']['class']] * len(boxes), dtype=torch.int64)

        target = {}
        target['boxes'] = boxes
        target['labels'] = labels

        if self.transform:
            image = self.transform(image)

        return image, target

transform = transforms.Compose([
    transforms.ToTensor(),
])

image_dir = '/content/drive/MyDrive/RIPcurrent/TS_1.image_1.Haeundae_3.PARA2'
annotation_dir = '/content/drive/MyDrive/RIPcurrent/TL_1.JSON_1.Haeundae_3.PARA2'

dataset = RipCurrentDataset(image_dir=image_dir,
                            annotation_dir=annotation_dir,
                            transform=transform,
                            num_samples=20000,
                            include_negative=True)

def collate_fn(batch):
    # Separate samples with and without bounding boxes
    positive_samples = [sample for sample in batch if sample[1]['boxes'].size(0) > 0]
    negative_samples = [sample for sample in batch if sample[1]['boxes'].size(0) == 0]

    # Ensure all samples are processed
    combined_batch = positive_samples + negative_samples

    return combined_batch

dataloader = DataLoader(dataset, batch_size=4, shuffle=True, num_workers=2, collate_fn=collate_fn)


In [None]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def get_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

model = get_model(num_classes=2)



In [None]:
import torch
import torch.optim as optim
import torchvision.transforms as T
import os

# Define the device and move the model to the device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Define optimizer and learning rate scheduler
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Load the checkpoint
checkpoint_path = '/content/drive/MyDrive/RIPcurrent/model_checkpoints/RCNN_checkpoint_epoch0708PARA2GLORY_10.pth'
if os.path.exists(checkpoint_path):
    checkpoint = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch']
    print(f'Checkpoint loaded: start_epoch = {start_epoch}, loss = {checkpoint["loss"]:.4f}')
else:
    start_epoch = 0
    print('No checkpoint found. Training from scratch.')

num_epochs = 5
save_dir = '/content/drive/MyDrive/RIPcurrent/model_checkpoints'
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

for epoch in range(start_epoch, start_epoch + num_epochs):
    model.train()
    i = 0
    epoch_losses = []

    for images, targets in dataloader:
        if len(images) == 0:
            print(f'Skipping empty batch at step {i}')
            continue  # Skip empty batches

        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        epoch_losses.append(losses.item())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if i % 10 == 0:
            print(f'Epoch [{epoch+1}/{start_epoch + num_epochs}], Step [{i}/{len(dataloader)}], Loss: {losses.item():.4f}')
        i += 1

    lr_scheduler.step()

    # Save the model and optimizer state
    if epoch_losses:
        avg_loss = sum(epoch_losses) / len(epoch_losses)
    else:
        avg_loss = float('inf')

    checkpoint = {
        'epoch': epoch + 1,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': avg_loss,
    }
    torch.save(checkpoint, os.path.join(save_dir, f'RCNN_checkpoint_epoch0708PARA2GLORY_{epoch + 1}.pth'))
    print(f'Model saved at epoch {epoch + 1}, Loss: {avg_loss:.4f}')


SyntaxError: invalid decimal literal (<ipython-input-10-79ebbb38bb9e>, line 1)

In [None]:
#Training loop 최신 이안류 없는 경우도 학습하도록
import torch
import torch.optim as optim
import torchvision.transforms as T
import os

# Define the device and move the model to the device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Define optimizer and learning rate scheduler
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Load the checkpoint
checkpoint_path = '/content/drive/MyDrive/RIPcurrent/model_checkpoints/RCNN_checkpoint_epoch0708PARA2GLORY_10.pth'
if os.path.exists(checkpoint_path):
    checkpoint = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch']
    print(f'Checkpoint loaded: start_epoch = {start_epoch}, loss = {checkpoint["loss"]:.4f}')
else:
    start_epoch = 0
    print('No checkpoint found. Training from scratch.')

num_epochs = 5
save_dir = '/content/drive/MyDrive/RIPcurrent/model_checkpoints'
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

# Define DataLoader with collate_fn
dataloader = DataLoader(dataset, batch_size=4, shuffle=True, num_workers=2, collate_fn=collate_fn)

# Training loop
for epoch in range(start_epoch, start_epoch + num_epochs):
    model.train()
    i = 0
    epoch_losses = []

    for combined_batch in dataloader:
        if len(combined_batch) == 0:
            print(f'Skipping empty batch at step {i}')
            continue  # Skip empty batches

        images, targets = zip(*combined_batch)
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # Forward pass
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        epoch_losses.append(losses.item())

        # Backward pass
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if i % 10 == 0:
            print(f'Epoch [{epoch+1}/{start_epoch + num_epochs}], Step [{i}/{len(dataloader)}], Loss: {losses.item():.4f}')
        i += 1

    lr_scheduler.step()

    # Save the model and optimizer state
    if epoch_losses:
        avg_loss = sum(epoch_losses) / len(epoch_losses)
    else:
        avg_loss = float('inf')

    checkpoint = {
        'epoch': epoch + 1,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': avg_loss,
    }
    torch.save(checkpoint, os.path.join(save_dir, f'RCNN_checkpoint_epoch0708PARA2GLORY_{epoch + 1}.pth'))
    print(f'Model saved at epoch {epoch + 1}, Loss: {avg_loss:.4f}')


Checkpoint loaded: start_epoch = 10, loss = 0.2339
Epoch [11/15], Step [0/5000], Loss: 0.2089
Epoch [11/15], Step [10/5000], Loss: 0.2011
Epoch [11/15], Step [20/5000], Loss: 0.2372
Epoch [11/15], Step [30/5000], Loss: 0.1982
Epoch [11/15], Step [40/5000], Loss: 0.2154
Epoch [11/15], Step [50/5000], Loss: 0.2229
Epoch [11/15], Step [60/5000], Loss: 0.2596
Epoch [11/15], Step [70/5000], Loss: 0.1954
Epoch [11/15], Step [80/5000], Loss: 0.1807
Epoch [11/15], Step [90/5000], Loss: 0.1459
Epoch [11/15], Step [100/5000], Loss: 0.1580
Epoch [11/15], Step [110/5000], Loss: 0.1029
Epoch [11/15], Step [120/5000], Loss: 0.2213
Epoch [11/15], Step [130/5000], Loss: 0.2083
Epoch [11/15], Step [140/5000], Loss: 0.1495
Epoch [11/15], Step [150/5000], Loss: 0.2497
Epoch [11/15], Step [160/5000], Loss: 0.1587
Epoch [11/15], Step [170/5000], Loss: 0.2407
Epoch [11/15], Step [180/5000], Loss: 0.1962
Epoch [11/15], Step [190/5000], Loss: 0.2459
Epoch [11/15], Step [200/5000], Loss: 0.3012
Epoch [11/15], 

In [None]:

#예측만 도출 밑 셀은 정답까지 비교
import os
import json
import torch
import torchvision
from PIL import Image
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, FasterRCNN_ResNet50_FPN_Weights
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

# Load trained model checkpoint
def load_model(checkpoint_path, num_classes):
    weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=weights)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])

    return model

# Define the dataset class for evaluation
class RipCurrentTestDataset(Dataset):
    def __init__(self, image_dir, annotation_dir, transform=None, num_samples=10):
        self.image_dir = image_dir
        self.annotation_dir = annotation_dir
        self.transform = transform

        self.image_files = [f for f in os.listdir(image_dir) if f.endswith('.jpg')]
        self.image_files = self.image_files[:num_samples]  # Limit to num_samples

        self.annotations = self.load_annotations(self.image_files)

    def load_annotations(self, image_files):
        annotations = []
        for img_name in image_files:
            annotation_path = os.path.join(self.annotation_dir, img_name.replace('.jpg', '.json'))
            with open(annotation_path) as f:
                annotations.append(json.load(f))
        return annotations

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        annotation = self.annotations[idx]
        boxes = annotation['annotations']['drawing']
        if annotation['annotations']['class'] == 0:
            # If class is 0, there are no bounding boxes
            boxes = torch.empty((0, 4), dtype=torch.float32)
            labels = torch.tensor([], dtype=torch.int64)
        else:
            # Convert points to [x_min, y_min, x_max, y_max] format
            valid_boxes = []
            for box in boxes:
                x_min = min(point[0] for point in box)
                y_min = min(point[1] for point in box)
                x_max = max(point[0] for point in box)
                y_max = max(point[1] for point in box)
                if x_max > x_min and y_max > y_min:
                    valid_boxes.append([x_min, y_min, x_max, y_max])

            boxes = torch.tensor(valid_boxes, dtype=torch.float32)
            labels = torch.tensor([annotation['annotations']['class']] * len(boxes), dtype=torch.int64)

        target = {}
        target['boxes'] = boxes
        target['labels'] = labels
        target['image_id'] = img_name

        return image, target

# Define collate_fn to handle batch of targets correctly
def collate_fn(batch):
    images, targets = zip(*batch)
    return list(images), list(targets)

transform = transforms.Compose([
    transforms.ToTensor(),
])

test_image_dir = '/content/drive/MyDrive/RIPcurrent/TEST_image_Haeundae_1.GLORY'
test_annotation_dir = '/content/drive/MyDrive/RIPcurrent/TEST_json_Haeundae_1.GLORY'
checkpoint_path = '/content/drive/MyDrive/RIPcurrent/model_checkpoints/RCNN_checkpoint_epoch_5.pth'

test_dataset = RipCurrentTestDataset(image_dir=test_image_dir,
                                     annotation_dir=test_annotation_dir,
                                     transform=transform,
                                     num_samples=10)

test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

model = load_model(checkpoint_path, num_classes=2)
model.to(device)
model.eval()

# Function to plot the bounding boxes
def plot_image_with_boxes(image, boxes, labels):
    plt.imshow(image.permute(1, 2, 0).cpu().numpy())
    ax = plt.gca()
    for box, label in zip(boxes, labels):
        x_min, y_min, x_max, y_max = box
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, fill=False, color='red', linewidth=2)
        ax.add_patch(rect)
        ax.text(x_min, y_min, f'Class: {label}', bbox={'facecolor': 'yellow', 'alpha': 0.5})
    plt.show()

# Initialize lists to store evaluation metrics
all_preds = []
all_gts = []

# Evaluate the model and save predictions
results = []

for images, targets in test_dataloader:
    images = list(image.to(device) for image in images)
    outputs = model(images)

    for image, output, target in zip(images, outputs, targets):
        if isinstance(target, dict):
            gt_boxes = target['boxes']
            gt_labels = target['labels']
        else:
            raise TypeError(f"Unexpected target type: {type(target)} with content: {target}")

        # Collect predictions and ground truths
        pred_boxes = output['boxes'].detach().cpu()
        pred_labels = output['labels'].detach().cpu()

        all_preds.extend(pred_labels.numpy())
        all_gts.extend(gt_labels.numpy())

        # Save predictions to results list
        result = {
            'image_id': target['image_id'],
            'predictions': [{'box': box.tolist(), 'label': label.item()} for box, label in zip(pred_boxes, pred_labels)],
            'ground_truths': [{'box': box.tolist(), 'label': label.item()} for box, label in zip(gt_boxes, gt_labels)]
        }
        results.append(result)

        # Plot the image with bounding boxes
        plot_image_with_boxes(image, pred_boxes, pred_labels)

# Save results to a JSON file
output_file = '/content/drive/MyDrive/RIPcurrent/predictions.json'
with open(output_file, 'w') as f:
    json.dump(results, f)
print(f'Saved predictions to {output_file}')




NameError: name 'asdadada' is not defined

In [None]:
#예측과 정답까지 비교
import os
import json
import torch
import torchvision
from PIL import Image
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, FasterRCNN_ResNet50_FPN_Weights
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

# Load trained model checkpoint
def load_model(checkpoint_path, num_classes):
    weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=weights)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])

    return model

# Define the dataset class for evaluation
class RipCurrentTestDataset(Dataset):
    def __init__(self, image_dir, annotation_dir, transform=None, num_samples=20):
        self.image_dir = image_dir
        self.annotation_dir = annotation_dir
        self.transform = transform

        self.image_files = [f for f in os.listdir(image_dir) if f.endswith('.jpg')]
        self.image_files = self.image_files[:num_samples]  # Limit to num_samples

        self.annotations = self.load_annotations(self.image_files)

    def load_annotations(self, image_files):
        annotations = []
        for img_name in image_files:
            annotation_path = os.path.join(self.annotation_dir, img_name.replace('.jpg', '.json'))
            with open(annotation_path) as f:
                annotations.append(json.load(f))
        return annotations

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        annotation = self.annotations[idx]
        boxes = annotation['annotations']['drawing']
        if annotation['annotations']['class'] == 0:
            # If class is 0, there are no bounding boxes
            boxes = torch.empty((0, 4), dtype=torch.float32)
            labels = torch.tensor([], dtype=torch.int64)
        else:
            # Convert points to [x_min, y_min, x_max, y_max] format
            valid_boxes = []
            for box in boxes:
                x_min = min(point[0] for point in box)
                y_min = min(point[1] for point in box)
                x_max = max(point[0] for point in box)
                y_max = max(point[1] for point in box)
                if x_max > x_min and y_max > y_min:
                    valid_boxes.append([x_min, y_min, x_max, y_max])

            boxes = torch.tensor(valid_boxes, dtype=torch.float32)
            labels = torch.tensor([annotation['annotations']['class']] * len(boxes), dtype=torch.int64)

        target = {}
        target['boxes'] = boxes
        target['labels'] = labels
        target['image_id'] = img_name

        return image, target

# Define collate_fn to handle batch of targets correctly
def collate_fn(batch):
    images, targets = zip(*batch)
    return list(images), list(targets)

transform = transforms.Compose([
    transforms.ToTensor(),
])

test_image_dir = '/content/drive/MyDrive/RIPcurrent/TS_1.Image_4.Daecheon_2.ZIPTR'
test_annotation_dir = '/content/drive/MyDrive/RIPcurrent/TL_1.JSON_4.Daecheon_2.ZIPTR'
checkpoint_path = '/content/drive/MyDrive/RIPcurrent/model_checkpoints/RCNN_checkpoint_epoch_9.pth'

test_dataset = RipCurrentTestDataset(image_dir=test_image_dir,
                                     annotation_dir=test_annotation_dir,
                                     transform=transform,
                                     num_samples=20)

test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

model = load_model(checkpoint_path, num_classes=2)
model.to(device)
model.eval()

# Function to plot the bounding boxes
def plot_image_with_boxes(image, pred_boxes, pred_labels, gt_boxes, gt_labels):
    fig, ax = plt.subplots(1, 2, figsize=(12, 6))

    # Plot predicted boxes
    ax[0].imshow(image.permute(1, 2, 0).cpu().numpy())
    for box, label in zip(pred_boxes, pred_labels):
        x_min, y_min, x_max, y_max = box
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, fill=False, color='red', linewidth=2)
        ax[0].add_patch(rect)
        ax[0].text(x_min, y_min, f'Pred: {label}', bbox={'facecolor': 'yellow', 'alpha': 0.5})
    ax[0].set_title('Predicted Bounding Boxes')

    # Plot ground truth boxes
    ax[1].imshow(image.permute(1, 2, 0).cpu().numpy())
    for box, label in zip(gt_boxes, gt_labels):
        x_min, y_min, x_max, y_max = box
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, fill=False, color='green', linewidth=2)
        ax[1].add_patch(rect)
        ax[1].text(x_min, y_min, f'GT: {label}', bbox={'facecolor': 'yellow', 'alpha': 0.5})
    ax[1].set_title('Ground Truth Bounding Boxes')

    plt.show()

# Initialize lists to store evaluation metrics
all_preds = []
all_gts = []

# Evaluate the model and save predictions
results = []

for images, targets in test_dataloader:
    images = list(image.to(device) for image in images)
    outputs = model(images)

    for image, output, target in zip(images, outputs, targets):
        if isinstance(target, dict):
            gt_boxes = target['boxes']
            gt_labels = target['labels']
        else:
            raise TypeError(f"Unexpected target type: {type(target)} with content: {target}")

        # Collect predictions and ground truths
        pred_boxes = output['boxes'].detach().cpu()
        pred_labels = output['labels'].detach().cpu()

        all_preds.extend(pred_labels.numpy())
        all_gts.extend(gt_labels.numpy())

        # Save predictions to results list
        result = {
            'image_id': target['image_id'],
            'predictions': [{'box': box.tolist(), 'label': label.item()} for box, label in zip(pred_boxes, pred_labels)],
            'ground_truths': [{'box': box.tolist(), 'label': label.item()} for box, label in zip(gt_boxes, gt_labels)]
        }
        results.append(result)

        # Plot the image with predicted and ground truth bounding boxes
        plot_image_with_boxes(image.cpu(), pred_boxes, pred_labels, gt_boxes, gt_labels)

# Print lengths of all_preds and all_gts for debugging
print(f"Length of all_gts: {len(all_gts)}")
print(f"Length of all_preds: {len(all_preds)}")

# Save results to a JSON file
output_file = '/content/drive/MyDrive/RIPcurrent/predictions.json'
with open(output_file, 'w') as f:
    json.dump(results, f)
print(f'Saved predictions to {output_file}')

SyntaxError: invalid decimal literal (<ipython-input-5-3faf10fe643c>, line 2)

In [None]:
import os
import json
import torch
import torchvision
from PIL import Image
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, FasterRCNN_ResNet50_FPN_Weights
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

# Load trained model checkpoint
def load_model(checkpoint_path, num_classes):
    weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=weights)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])

    return model

# Define the dataset class for evaluation
class RipCurrentTestDataset(Dataset):
    def __init__(self, image_dir, annotation_dir, transform=None, num_samples=20):
        self.image_dir = image_dir
        self.annotation_dir = annotation_dir
        self.transform = transform

        self.image_files = [f for f in os.listdir(image_dir) if f.endswith('.jpg')]
        self.image_files = self.image_files[:num_samples]  # Limit to num_samples

        self.annotations = self.load_annotations(self.image_files)

    def load_annotations(self, image_files):
        annotations = []
        for img_name in image_files:
            annotation_path = os.path.join(self.annotation_dir, img_name.replace('.jpg', '.json'))
            with open(annotation_path) as f:
                annotations.append(json.load(f))
        return annotations

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        annotation = self.annotations[idx]
        boxes = annotation['annotations']['drawing']
        if annotation['annotations']['class'] == 0:
            # If class is 0, there are no bounding boxes
            boxes = torch.empty((0, 4), dtype=torch.float32)
            labels = torch.tensor([], dtype=torch.int64)
        else:
            # Convert points to [x_min, y_min, x_max, y_max] format
            valid_boxes = []
            for box in boxes:
                x_min = min(point[0] for point in box)
                y_min = min(point[1] for point in box)
                x_max = max(point[0] for point in box)
                y_max = max(point[1] for point in box)
                if x_max > x_min and y_max > y_min:
                    valid_boxes.append([x_min, y_min, x_max, y_max])

            boxes = torch.tensor(valid_boxes, dtype=torch.float32)
            labels = torch.tensor([annotation['annotations']['class']] * len(boxes), dtype=torch.int64)

        target = {}
        target['boxes'] = boxes
        target['labels'] = labels
        target['image_id'] = img_name

        return image, target

# Define collate_fn to handle batch of targets correctly
def collate_fn(batch):
    images, targets = zip(*batch)
    return list(images), list(targets)

transform = transforms.Compose([
    transforms.ToTensor(),
])

test_image_dir = '/content/drive/MyDrive/RIPcurrent/TS_1.image_1.Haeundae_2.PARA1'
test_annotation_dir = '/content/drive/MyDrive/RIPcurrent/TL_1.JSON_1.Haeundae_2.PARA1'
checkpoint_path = '/content/drive/MyDrive/RIPcurrent/model_checkpoints/RCNN_checkpoint_epoch0708PARA2GLORY_15.pth'

test_dataset = RipCurrentTestDataset(image_dir=test_image_dir,
                                     annotation_dir=test_annotation_dir,
                                     transform=transform,
                                     num_samples=20)

test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

model = load_model(checkpoint_path, num_classes=2)
model.to(device)
model.eval()

# Function to plot the bounding boxes
def plot_image_with_boxes(image, pred_boxes, pred_labels, gt_boxes, gt_labels):
    fig, ax = plt.subplots(1, 2, figsize=(12, 6))

    # Plot predicted boxes
    ax[0].imshow(image.permute(1, 2, 0).cpu().numpy())
    for box, label in zip(pred_boxes, pred_labels):
        x_min, y_min, x_max, y_max = box
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, fill=False, color='red', linewidth=2)
        ax[0].add_patch(rect)
        ax[0].text(x_min, y_min, f'Pred: {label}', bbox={'facecolor': 'yellow', 'alpha': 0.5})
    ax[0].set_title('Predicted Bounding Boxes')

    # Plot ground truth boxes
    ax[1].imshow(image.permute(1, 2, 0).cpu().numpy())
    for box, label in zip(gt_boxes, gt_labels):
        x_min, y_min, x_max, y_max = box
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, fill=False, color='green', linewidth=2)
        ax[1].add_patch(rect)
        ax[1].text(x_min, y_min, f'GT: {label}', bbox={'facecolor': 'yellow', 'alpha': 0.5})
    ax[1].set_title('Ground Truth Bounding Boxes')

    plt.show()

# Initialize lists to store evaluation metrics
all_preds = []
all_gts = []

# Evaluate the model and save predictions
results = []

for images, targets in test_dataloader:
    images = list(image.to(device) for image in images)
    outputs = model(images)

    for image, output, target in zip(images, outputs, targets):
        if isinstance(target, dict):
            gt_boxes = target['boxes']
            gt_labels = target['labels']
        else:
            raise TypeError(f"Unexpected target type: {type(target)} with content: {target}")

        # Collect predictions and ground truths
        pred_boxes = output['boxes'].detach().cpu()
        pred_labels = output['labels'].detach().cpu()

        all_preds.extend(pred_labels.numpy().tolist())
        all_gts.extend(gt_labels.numpy().tolist())

        # Save predictions to results list
        result = {
            'image_id': target['image_id'],
            'predictions': [{'box': box.tolist(), 'label': label.item()} for box, label in zip(pred_boxes, pred_labels)],
            'ground_truths': [{'box': box.tolist(), 'label': label.item()} for box, label in zip(gt_boxes, gt_labels)]
        }
        results.append(result)

        # Plot the image with predicted and ground truth bounding boxes
        plot_image_with_boxes(image.cpu(), pred_boxes, pred_labels, gt_boxes, gt_labels)

# Print lengths of all_preds and all_gts for debugging
print(f"Length of all_gts: {len(all_gts)}")
print(f"Length of all_preds: {len(all_preds)}")

# Calculate evaluation metrics
precision = precision_score(all_gts, all_preds, average='weighted', zero_division=1)
recall = recall_score(all_gts, all_preds, average='weighted')
f1 = f1_score(all_gts, all_preds, average='weighted')
accuracy = accuracy_score(all_gts, all_preds)

print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')
print(f'Accuracy: {accuracy:.4f}')

# Save results to a JSON file
output_file = '/content/drive/MyDrive/RIPcurrent/predictions.json'
with open(output_file, 'w') as f:
    json.dump(results, f)
print(f'Saved predictions to {output_file}')


Output hidden; open in https://colab.research.google.com to view.

In [None]:
#0707 너무 많은 박스를 치는 것에 대한 해결방법 시험중
import os
import json
import torch
import torchvision
from PIL import Image
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, FasterRCNN_ResNet50_FPN_Weights
from torchvision.ops import nms
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

# Load trained model checkpoint
def load_model(checkpoint_path, num_classes):
    weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=weights)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])

    return model

# Define the dataset class for evaluation
class RipCurrentTestDataset(Dataset):
    def __init__(self, image_dir, annotation_dir, transform=None, num_samples=20):
        self.image_dir = image_dir
        self.annotation_dir = annotation_dir
        self.transform = transform

        self.image_files = [f for f in os.listdir(image_dir) if f.endswith('.jpg')]
        self.image_files = self.image_files[:num_samples]  # Limit to num_samples

        self.annotations = self.load_annotations(self.image_files)

    def load_annotations(self, image_files):
        annotations = []
        for img_name in image_files:
            annotation_path = os.path.join(self.annotation_dir, img_name.replace('.jpg', '.json'))
            with open(annotation_path) as f:
                annotations.append(json.load(f))
        return annotations

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        annotation = self.annotations[idx]
        boxes = annotation['annotations']['drawing']
        if annotation['annotations']['class'] == 0:
            # If class is 0, there are no bounding boxes
            boxes = torch.empty((0, 4), dtype=torch.float32)
            labels = torch.tensor([], dtype=torch.int64)
        else:
            # Convert points to [x_min, y_min, x_max, y_max] format
            valid_boxes = []
            for box in boxes:
                x_min = min(point[0] for point in box)
                y_min = min(point[1] for point in box)
                x_max = max(point[0] for point in box)
                y_max = max(point[1] for point in box)
                if x_max > x_min and y_max > y_min:
                    valid_boxes.append([x_min, y_min, x_max, y_max])

            boxes = torch.tensor(valid_boxes, dtype=torch.float32)
            labels = torch.tensor([annotation['annotations']['class']] * len(boxes), dtype=torch.int64)

        target = {}
        target['boxes'] = boxes
        target['labels'] = labels
        target['image_id'] = img_name

        return image, target

# Define collate_fn to handle batch of targets correctly
def collate_fn(batch):
    images, targets = zip(*batch)
    return list(images), list(targets)

transform = transforms.Compose([
    transforms.ToTensor(),
])

test_image_dir = '/content/drive/MyDrive/RIPcurrent/TS_1.image_1.Haeundae_2.PARA1'
test_annotation_dir = '/content/drive/MyDrive/RIPcurrent/TL_1.JSON_1.Haeundae_2.PARA1'
checkpoint_path = '/content/drive/MyDrive/RIPcurrent/model_checkpoints/RCNN_checkpoint_epoch0708PARA2GLORY_15.pth'

test_dataset = RipCurrentTestDataset(image_dir=test_image_dir,
                                     annotation_dir=test_annotation_dir,
                                     transform=transform,
                                     num_samples=20)

test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

model = load_model(checkpoint_path, num_classes=2)
model.to(device)
model.eval()

# Function to plot the bounding boxes
def plot_image_with_boxes(image, pred_boxes, pred_labels, gt_boxes, gt_labels):
    fig, ax = plt.subplots(1, 2, figsize=(12, 6))

    # Plot predicted boxes
    ax[0].imshow(image.permute(1, 2, 0).cpu().numpy())
    for box, label in zip(pred_boxes, pred_labels):
        x_min, y_min, x_max, y_max = box
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, fill=False, color='red', linewidth=2)
        ax[0].add_patch(rect)
        ax[0].text(x_min, y_min, f'Pred: {label}', bbox={'facecolor': 'yellow', 'alpha': 0.5})
    ax[0].set_title('Predicted Bounding Boxes')

    # Plot ground truth boxes
    ax[1].imshow(image.permute(1, 2, 0).cpu().numpy())
    for box, label in zip(gt_boxes, gt_labels):
        x_min, y_min, x_max, y_max = box
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, fill=False, color='green', linewidth=2)
        ax[1].add_patch(rect)
        ax[1].text(x_min, y_min, f'GT: {label}', bbox={'facecolor': 'yellow', 'alpha': 0.5})
    ax[1].set_title('Ground Truth Bounding Boxes')

    plt.show()

# Initialize lists to store evaluation metrics
all_preds = []
all_gts = []

# Evaluate the model and save predictions
results = []

# Set the confidence threshold and IoU threshold for NMS
conf_threshold = 0.5
iou_threshold = 0.5

for images, targets in test_dataloader:
    images = list(image.to(device) for image in images)
    outputs = model(images)

    for image, output, target in zip(images, outputs, targets):
        if isinstance(target, dict):
            gt_boxes = target['boxes']
            gt_labels = target['labels']
        else:
            raise TypeError(f"Unexpected target type: {type(target)} with content: {target}")

        # Collect predictions and ground truths
        pred_boxes = output['boxes'].detach().cpu()
        pred_labels = output['labels'].detach().cpu()
        pred_scores = output['scores'].detach().cpu()

        # Apply confidence threshold
        high_conf_indices = pred_scores > conf_threshold
        pred_boxes = pred_boxes[high_conf_indices]
        pred_labels = pred_labels[high_conf_indices]
        pred_scores = pred_scores[high_conf_indices]

        # Apply NMS
        keep = nms(pred_boxes, pred_scores, iou_threshold)
        pred_boxes = pred_boxes[keep]
        pred_labels = pred_labels[keep]

        all_preds.extend(pred_labels.numpy().tolist())
        all_gts.extend(gt_labels.numpy().tolist())

        # Save predictions to results list
        result = {
            'image_id': target['image_id'],
            'predictions': [{'box': box.tolist(), 'label': label.item()} for box, label in zip(pred_boxes, pred_labels)],
            'ground_truths': [{'box': box.tolist(), 'label': label.item()} for box, label in zip(gt_boxes, gt_labels)]
        }
        results.append(result)

        # Plot the image with predicted and ground truth bounding boxes
        plot_image_with_boxes(image.cpu(), pred_boxes, pred_labels, gt_boxes, gt_labels)

# Print lengths of all_preds and all_gts for debugging
print(f"Length of all_gts: {len(all_gts)}")
print(f"Length of all_preds: {len(all_preds)}")

# Calculate evaluation metrics
precision = precision_score(all_gts, all_preds, average='weighted', zero_division=1)
recall = recall_score(all_gts, all_preds, average='weighted')
f1 = f1_score(all_gts, all_preds, average='weighted')
accuracy = accuracy_score(all_gts, all_preds)

print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')
print(f'Accuracy: {accuracy:.4f}')

# Save results to a JSON file
output_file = '/content/drive/MyDrive/RIPcurrent/predictions.json'
with open(output_file, 'w') as f:
    json.dump(results, f)
print(f'Saved predictions to {output_file}')


Output hidden; open in https://colab.research.google.com to view.

In [None]:
import os
import json
import torch
import torchvision
from PIL import Image
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, FasterRCNN_ResNet50_FPN_Weights
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
import cv2
import time

# Load trained model checkpoint
def load_model(checkpoint_path, num_classes):
    weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=weights)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])

    return model

# Function to plot the bounding boxes
def plot_image_with_boxes(image, boxes, labels):
    plt.imshow(image.permute(1, 2, 0).cpu().numpy())
    ax = plt.gca()
    for box, label in zip(boxes, labels):
        x_min, y_min, x_max, y_max = box
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, fill=False, color='red', linewidth=2)
        ax.add_patch(rect)
        ax.text(x_min, y_min, f'Class: {label}', bbox={'facecolor': 'yellow', 'alpha': 0.5})
    plt.show()

# Initialize lists to store evaluation metrics
all_preds = []
all_gts = []

# Set device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Load model
checkpoint_path = '/content/drive/MyDrive/RIPcurrent/model_checkpoints/RCNN_checkpoint_epoch_9.pth'
model = load_model(checkpoint_path, num_classes=2)
model.to(device)
model.eval()

# Set up video capture
cap = cv2.VideoCapture('rtmps://wsb.live.smilecdn.com/wsbrtsp15/stream15.stream')

if not cap.isOpened():
    print("Can't open video.")
    exit()

# Set the interval to 1 second
interval = 1

while True:
    start_time = time.time()
    ret, frame = cap.read()
    if not ret:
        break

    # Convert frame to PIL image
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    pil_image = Image.fromarray(frame_rgb)

    # Transform the image
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])
    image = transform(pil_image).unsqueeze(0).to(device)

    # Perform prediction
    outputs = model(image)

    # Process outputs
    output = outputs[0]
    pred_boxes = output['boxes'].detach().cpu()
    pred_labels = output['labels'].detach().cpu()

    # Plot the image with bounding boxes
    plot_image_with_boxes(image[0], pred_boxes, pred_labels)

    # Save predictions to results list
    result = {
        'image_id': f'frame_{int(start_time)}',
        'predictions': [{'box': box.tolist(), 'label': label.item()} for box, label in zip(pred_boxes, pred_labels)]
    }
    print(result)

    # Wait for the next interval
    while (time.time() - start_time) < interval:
        time.sleep(0.01)

cap.release()
cv2.destroyAllWindows()


SyntaxError: invalid syntax (<ipython-input-6-ae743a50e911>, line 1)

In [None]:
import os
import json
import torch
import torchvision
from PIL import Image
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, FasterRCNN_ResNet50_FPN_Weights
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
import cv2
import time
from google.colab.patches import cv2_imshow

# Load trained model checkpoint
def load_model(checkpoint_path, num_classes):
    weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=weights)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])

    return model

# Function to draw bounding boxes on the frame
def draw_boxes_on_frame(frame, boxes, labels):
    for box, label in zip(boxes, labels):
        x_min, y_min, x_max, y_max = map(int, box)
        cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 0, 255), 2)
        cv2.putText(frame, f'Class: {label}', (x_min, y_min-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 255), 2)
    return frame

# Set device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Load model
checkpoint_path = '/content/drive/MyDrive/RIPcurrent/model_checkpoints/RCNN_checkpoint_epoch_9.pth'
model = load_model(checkpoint_path, num_classes=2)
model.to(device)
model.eval()

# Set up video capture
cap = cv2.VideoCapture('rtmps://wsb.live.smilecdn.com/wsbrtsp3/stream3.stream')

if not cap.isOpened():
    print("Can't open video.")
    exit()

# Set the interval to 3 seconds
interval = 3

while True:
    start_time = time.time()
    ret, frame = cap.read()
    if not ret:
        break

    # Skip frames to maintain the interval
    while (time.time() - start_time) < interval:
        cap.grab()

    # Convert frame to PIL image
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    pil_image = Image.fromarray(frame_rgb)

    # Transform the image
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])
    image = transform(pil_image).unsqueeze(0).to(device)

    # Perform prediction
    outputs = model(image)

    # Process outputs
    output = outputs[0]
    pred_boxes = output['boxes'].detach().cpu()
    pred_labels = output['labels'].detach().cpu()

    # Draw bounding boxes on the frame
    frame_with_boxes = draw_boxes_on_frame(frame, pred_boxes, pred_labels)

    # Display the frame with bounding boxes
    cv2_imshow(frame_with_boxes)

    # Save predictions to results list
    result = {
        'image_id': f'frame_{int(start_time)}',
        'predictions': [{'box': box.tolist(), 'label': label.item()} for box, label in zip(pred_boxes, pred_labels)]
    }
    print(result)

    # Wait for the next interval
    while (time.time() - start_time) < interval:
        time.sleep(0.01)

cap.release()
cv2.destroyAllWindows()


SyntaxError: invalid syntax (<ipython-input-7-d31e15f93537>, line 1)

In [None]:
import os
import json
import torch
import torchvision
from PIL import Image
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, FasterRCNN_ResNet50_FPN_Weights
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
import cv2
import time
import numpy as np

# Load trained model checkpoint
def load_model(checkpoint_path, num_classes):
    weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=weights)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])

    return model

# Function to draw bounding boxes on the frame
def draw_boxes_on_frame(frame, boxes, labels):
    for box, label in zip(boxes, labels):
        x_min, y_min, x_max, y_max = map(int, box)
        cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 0, 255), 2)
        cv2.putText(frame, f'Class: {label}', (x_min, y_min-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 255), 2)
    return frame

# Function to plot the frame with bounding boxes using matplotlib
def plot_frame_with_boxes(frame, boxes, labels):
    frame_with_boxes = draw_boxes_on_frame(frame, boxes, labels)
    plt.imshow(cv2.cvtColor(frame_with_boxes, cv2.COLOR_BGR2RGB))
    plt.axis('on')
    plt.show()

# Set device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Load model
checkpoint_path = '/content/drive/MyDrive/RIPcurrent/model_checkpoints/RCNN_checkpoint_epoch0707PARAGLORY_9.pth'
model = load_model(checkpoint_path, num_classes=2)
model.to(device)
model.eval()

# Set up video capture
cap = cv2.VideoCapture('rtmps://wsb.live.smilecdn.com/wsbrtsp3/stream3.stream')

if not cap.isOpened():
    print("Can't open video.")
    exit()

# Set the interval to 3 seconds
interval = 1

while True:
    start_time = time.time()
    ret, frame = cap.read()
    if not ret:
        break

    # Skip frames to maintain the interval
    while (time.time() - start_time) < interval:
        cap.grab()

    # Convert frame to PIL image
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    pil_image = Image.fromarray(frame_rgb)

    # Transform the image
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])
    image = transform(pil_image).unsqueeze(0).to(device)

    # Perform prediction
    outputs = model(image)

    # Process outputs
    output = outputs[0]
    pred_boxes = output['boxes'].detach().cpu()
    pred_labels = output['labels'].detach().cpu()

    # Plot the frame with bounding boxes
    plot_frame_with_boxes(frame, pred_boxes, pred_labels)

    # Save predictions to results list
    result = {
        'image_id': f'frame_{int(start_time)}',
        'predictions': [{'box': box.tolist(), 'label': label.item()} for box, label in zip(pred_boxes, pred_labels)]
    }
    print(result)

    # Wait for the next interval
    while (time.time() - start_time) < interval:
        time.sleep(0.01)

cap.release()
cv2.destroyAllWindows()


Output hidden; open in https://colab.research.google.com to view.

In [1]:
!pip install requests



In [None]:
import requests
import json
from requests.exceptions import HTTPError

url = "http://localhost:8080/"
data = {
    "key1": "value1",
    "key2": "value2"
}
headers = {
    "Content-Type": "application/json"
}

try:
    response = requests.post(url, data=json.dumps(data), headers=headers)
    response.raise_for_status()  # HTTPError가 발생할 수 있는 곳
    print("POST 요청 성공")
    print("응답 데이터:", response.json())
except HTTPError as http_err:
    print(f"HTTP 에러 발생: {http_err}")
except Exception as err:
    print(f"다른 에러 발생: {err}")
