In [None]:
!ls
!unzip dataset.zip

In [41]:
import numpy as np
import cv2
import os

def load_img(img_path):
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
    img = cv2.resize(img,(224,224))
    img = np.reshape(img,(1,224,224))
    img = img.astype("float32")/255
    return img

classify_list = ['8PSK','BPSK' ,'PAM4' ,'QAM16','QAM64','QPSK']

def load_dataset():
    # train:val 8:2 也就是40:10
    dataset_path = './dataset/'
    classes = os.listdir(dataset_path)
    classify = {
        '8PSK' :0,
        'BPSK' :1,
        'PAM4' :2,
        'QAM16':3,
        'QAM64':4,
        'QPSK' :5,
        }
    x_train = []
    y_train = []
    x_test = []
    y_test = []
    for clazz in classes:
        img_dir = dataset_path+clazz+'/'
        imgs = os.listdir(img_dir)
        group = dict()
        print(f'loading gorup:{clazz}...')
        for name in imgs:
            dB = int(name.split('dB')[0])
            if group.__contains__(dB):
                group[dB].append(name)
            else:
                group[dB] = [name]
        for dB,img_ns in group.items():
            img_train = img_ns[:40]
            img_test = img_ns[-10:-5]
            for img_p in img_train:
                img = load_img(img_dir + img_p)
                x_train.append(img)
                y_train.append(classify[clazz])
                # 数据增强
                if dB >= 10 :
                    tmp = np.flip(img,1)
                    x_train.append(tmp)
                    y_train.append(classify[clazz])
                    tmp = np.flip(tmp,2)
                    x_train.append(tmp)
                    y_train.append(classify[clazz])
                    tmp = np.flip(tmp,1)
                    x_train.append(tmp)
                    y_train.append(classify[clazz])

            for img_p in img_test:
                img = load_img(img_dir + img_p)
                x_test.append(img)
                y_test.append(classify[clazz])
    x_train = np.stack(x_train)
    y_train = np.reshape(y_train,(len(y_train),1))
    x_test = np.stack(x_test)
    y_test = np.reshape(y_test,(len(y_test),1))
    return (x_train,y_train),(x_test,y_test)

In [42]:
(x_train,y_train),(x_val,y_val) = load_dataset()
print(x_train.shape)
print(y_train.shape)

loading gorup:BPSK...
loading gorup:QPSK...
loading gorup:PAM4...
loading gorup:QAM64...
loading gorup:QAM16...
loading gorup:8PSK...
(6000, 1, 224, 224)
(6000, 1)


In [43]:
validate_dataset = NumpyDataset(x_test,y_test)
k,v = validate_dataset.__getitem__(0)
print(k.shape)
print(v.shape)

torch.Size([1, 224, 224])
torch.Size([1])


In [1]:
import torch
print(torch.cuda.is_available())

True


In [48]:
import torch.nn as nn
import torch

# Author https://github.com/WZMIAOMIAO/deep-learning-for-image-processing/blob/master/pytorch_classification/Test2_alexnet/
class AlexNet(nn.Module):
    def __init__(self, num_classes=1000, init_weights=False):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 48, kernel_size=11, stride=4, padding=2),  # input[1, 224, 224]  output[48, 55, 55]
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),                  # output[48, 27, 27]
            nn.Conv2d(48, 128, kernel_size=5, padding=2),           # output[128, 27, 27]
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),                  # output[128, 13, 13]
            nn.Conv2d(128, 192, kernel_size=3, padding=1),          # output[192, 13, 13]
            nn.ReLU(inplace=True),
            nn.Conv2d(192, 192, kernel_size=3, padding=1),          # output[192, 13, 13]
            nn.ReLU(inplace=True),
            nn.Conv2d(192, 128, kernel_size=3, padding=1),          # output[128, 13, 13]
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),                  # output[128, 6, 6]
        )
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(128 * 6 * 6, 2048),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(2048, 2048),
            nn.ReLU(inplace=True),
            nn.Linear(2048, num_classes),
        )
        if init_weights:
            self._initialize_weights()

    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, start_dim=1)
        x = self.classifier(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

In [25]:
import torch
from torch.utils.data import Dataset

class NumpyDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # 将 NumPy 数组转换为 PyTorch 张量
        image = torch.from_numpy(self.data[idx])
        label = torch.from_numpy(self.labels[idx])
        return image, label

In [60]:
import os,sys,json

from torchvision import transforms, datasets, utils
import torch.optim as optim
from tqdm import tqdm

def main():
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("using {} device.".format(device))

    # TOOD
    train_dataset = NumpyDataset(x_train,y_train)
    train_num = len(train_dataset)

    # # {'daisy':0, 'dandelion':1, 'roses':2, 'sunflower':3, 'tulips':4}
    # flower_list = train_dataset.class_to_idx
    # cla_dict = dict((val, key) for key, val in flower_list.items())
    # # write dict into json file
    # json_str = json.dumps(cla_dict, indent=4)
    # with open('class_indices.json', 'w') as json_file:
    #     json_file.write(json_str)

    batch_size = 32
    nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8])  # number of workers
    print('Using {} dataloader workers every process'.format(nw))

    train_loader = torch.utils.data.DataLoader(train_dataset,batch_size=batch_size, shuffle=True,num_workers=nw)

    # TODO
    validate_dataset = NumpyDataset(x_val,y_val)

    val_num = len(validate_dataset)
    validate_loader = torch.utils.data.DataLoader(validate_dataset,batch_size=4, shuffle=False,num_workers=nw)

    print("using {} images for training, {} images for validation.".format(train_num,val_num))

    net = AlexNet(num_classes=6, init_weights=True)

    net.to(device)
    loss_function = nn.CrossEntropyLoss()
    # pata = list(net.parameters())
    optimizer = optim.Adam(net.parameters(), lr=0.0002)

    epochs = 10
    save_path = './AlexNet2.pth'
    best_acc = 0.0
    train_steps = len(train_loader)
    for epoch in range(epochs):
        # train
        net.train()
        running_loss = 0.0
        train_bar = tqdm(train_loader, file=sys.stdout)
        for step, data in enumerate(train_bar):
            images, labels = data
            labels = labels.squeeze(1)
            optimizer.zero_grad()
            outputs = net(images.to(device))
            # print(f'output={outputs.shape},labes={labels.shape}')
            loss = loss_function(outputs, labels.to(device))
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            train_bar.desc = "train epoch[{}/{}] loss:{:.3f}".format(epoch + 1,epochs,loss)

        # validate
        net.eval()
        acc = 0.0  # accumulate accurate number / epoch
        with torch.no_grad():
            val_bar = tqdm(validate_loader, file=sys.stdout)
            for val_data in val_bar:
                val_images, val_labels = val_data
                val_labes = val_labels.squeeze(1)
                outputs = net(val_images.to(device))
                predict_y = torch.max(outputs, dim=1)[1]
                acc += torch.eq(predict_y, val_labels.to(device)).sum().item()

        val_accurate = acc / val_num / validate_loader.batch_size
        print('[epoch %d] train_loss: %.3f  val_accuracy: %.3f ' %
              (epoch + 1, running_loss / train_steps, val_accurate))

        if val_accurate > best_acc:
            best_acc = val_accurate
            torch.save(net.state_dict(), save_path)

    print('Finished Training')


In [61]:
main()

using cuda:0 device.
Using 2 dataloader workers every process
using 6000 images for training, 300 images for validation.
train epoch[1/10] loss:0.594: 100%|██████████| 188/188 [00:04<00:00, 40.23it/s]
100%|██████████| 75/75 [00:00<00:00, 121.67it/s]
[epoch 1] train_loss: 1.319  val_accuracy: 0.657 
train epoch[2/10] loss:0.746: 100%|██████████| 188/188 [00:04<00:00, 43.05it/s]
100%|██████████| 75/75 [00:00<00:00, 186.55it/s]
[epoch 2] train_loss: 0.601  val_accuracy: 0.718 
train epoch[3/10] loss:0.823: 100%|██████████| 188/188 [00:04<00:00, 43.54it/s]
100%|██████████| 75/75 [00:00<00:00, 178.14it/s]
[epoch 3] train_loss: 0.473  val_accuracy: 0.767 
train epoch[4/10] loss:0.292: 100%|██████████| 188/188 [00:04<00:00, 37.86it/s]
100%|██████████| 75/75 [00:00<00:00, 190.67it/s]
[epoch 4] train_loss: 0.391  val_accuracy: 0.800 
train epoch[5/10] loss:0.385: 100%|██████████| 188/188 [00:04<00:00, 43.79it/s]
100%|██████████| 75/75 [00:00<00:00, 184.58it/s]
[epoch 5] train_loss: 0.340  val_a