In [1]:
from nvidia.dali.pipeline import pipeline_def
import nvidia.dali.types as types
import nvidia.dali.fn as fn
from nvidia.dali.plugin.pytorch import DALIGenericIterator
import os
import sys
import pickle
import numpy as np

In [5]:
#### 加载cifar10数据集 ####
def load_cifar10(batch_size, train=True, root='/data/cifar10'):
    '''该函数返回的结果与torchvision.datasets.CIFAR10()函数取self.data和self.targets返回的结果相同'''
    base_folder = 'cifar-10-batches-py'
    train_list = [
        ['data_batch_1', 'c99cafc152244af753f735de768cd75f'],
        ['data_batch_2', 'd4bba439e000b95fd0a9bffe97cbabec'],
        ['data_batch_3', '54ebc095f3ab1f0389bbae665268c751'],
        ['data_batch_4', '634d18415352ddfa80567beed471001a'],
        ['data_batch_5', '482c414d41f54cd18b22e5b47cb7c3cb'],
    ]

    test_list = [
        ['test_batch', '40351d587109b95175f43aff81a1287e'],
    ]
    
    if train:
        downloaded_list = train_list
    else:
        downloaded_list = test_list

    data = []
    targets = []
    for file_name, checksum in downloaded_list:
        file_path = os.path.join(root, base_folder, file_name)
        with open(file_path, 'rb') as f:
            if sys.version_info[0] == 2:
                entry = pickle.load(f)
            else:
                entry = pickle.load(f, encoding='latin1')
            data.append(entry['data'])
            if 'labels' in entry:
                targets.extend(entry['labels'])
            else:
                targets.extend(entry['fine_labels'])

    data = np.vstack(data).reshape(-1, 3, 32, 32)
    data = data.transpose((0, 2, 3, 1))  # convert to HWC
    targets = np.vstack(targets)
#     np.save("cifar.npy", data)
#     data = np.load('cifar.npy')  # to serialize, increase locality
    return data, targets

In [7]:
image_dir = "../data/"
batch_size = 256
data, targets = load_cifar10(batch_size, train=True, root=image_dir)

# @pipeline_def
# def simple_pipeline():
#     jpegs, labels = fn.readers.file(file_root=image_dir)
#     images = fn.decoders.image(jpegs, device='cpu')

#     return images, labels

In [None]:
print(f"data的size为:{data.shape}") # [50000, 32, 32, 3]
print(f"targets的size为:{targets.shape}") # [50000, 1]

In [9]:
def partition_data(training_data, labels, num_client, num_class, partition = 'noniid', beta=0.4): 
    '''按照Dirichlet分布划分原始数据集 '''
    # 参数num_client表示client的数量
    # training_data和labels是numpy数组
    training_data_subset_list = []
    training_label_subset_list = []
    if partition == "homo" or partition == "iid":
        idxs = np.random.permutation(N)   #在训练集的条数范围内生成随机序列
        batch_idxs = np.array_split(idxs, num_client)
        net_dataidx_map = {i: batch_idxs[i] for i in range(num_client)}

    elif partition == "noniid-labeldir" or partition == "noniid":
        min_size = 0 
        min_require_size = 10   # 每个client至少要有10条数据
        K = num_class
            # min_require_size = 100

        N = labels.shape[0]
        net_dataidx_map = {}  #用于存放每个client拥有的样本的idx数组

        #min_size表示所有client中样本数量最少的client对应的样本数量。如果存在某个client的样本数量没达到min_require_size，则继续为client分配样本。
        while min_size < min_require_size:
            idx_batch = [[] for _ in range(num_client)]  # idx_batch存放num_client个client对应的样本idx
            for k in range(K): #遍历所有类别，将每个类别按Dirichlet分布的比例分配给各个client。
                idx_k = np.where(labels == k)[0]  #idx_k表示训练集中label为k的所有样本的idx集合
                np.random.shuffle(idx_k) #上面选出来的idx是按顺序的，现在把顺序打乱。
                proportions = np.random.dirichlet(np.repeat(beta, num_client)) 
                #proportions的长度为num_client
                proportions = np.array([p * (len(idx_j) < N / num_client) for p, idx_j in zip(proportions, idx_batch)])  # 取出第j个client拥有的所有sample下标和第j个client的idx
                proportions = proportions / proportions.sum() #将剩下的client的划分比例重新归一化
                proportions = (np.cumsum(proportions) * len(idx_k)).astype(int)[:-1] #
                idx_batch = [idx_j + idx.tolist() for idx_j, idx in zip(idx_batch, np.split(idx_k, proportions))]  #为第j个client分配类别k的样本
                min_size = min([len(idx_j) for idx_j in idx_batch]) #min_size表示所有client中样本数量最少的client对应的样本数量
                # if K == 2 and num_client <= 10:
                #     if np.min(proportions) < 200:
                #         min_size = 0
                #         break


        for j in range(num_client):
            #分配完之后，由于idx_batch中的样本idx是按类别顺序存放的，所以要打乱。
            np.random.shuffle(idx_batch[j]) 
            net_dataidx_map[j] = idx_batch[j] # 用net_dataidx_map记录每个client拥有的样本。
            # 封装为dataloader
            training_data_subset = training_data[idx_batch[j]]
            training_label_subset = labels[idx_batch[j]]
#             print(f"labels的维度为:{labels.shape}")
#             print(f"idx_batch[{j}]:{idx_batch[j]}")
            training_data_subset_list.append(training_data_subset)
            training_label_subset_list.append(training_label_subset)
#     print(net_dataidx_map)
    #traindata_cls_counts：数据分布情况（每个client拥有的所有类别及其数量）
    traindata_cls_counts = record_net_data_stats(labels, net_dataidx_map) 

    return training_data_subset_list, training_label_subset_list, traindata_cls_counts

def record_net_data_stats(y_train, net_dataidx_map):
    '''用于记录每个client的数据分布(拥有的所有样本类别，及该类别出现的次数)'''
    net_cls_counts = {}

    for net_i, dataidx in net_dataidx_map.items(): # dict.items()返回(key, value)元组组成的列表
    # net_i表示第i个client, dataidx为其拥有的样本idx
        unq, unq_cnt = np.unique(y_train[dataidx], return_counts=True) #返回unique的类别数组
        tmp = {unq[i]: unq_cnt[i] for i in range(len(unq))} # 字典,存放第i个client拥有的类别及其数量
        net_cls_counts[net_i] = tmp #字典，存放所有client的类别信息

    data_list=[]
    for net_id, data in net_cls_counts.items(): # net_id表示client编号，data表示该client拥有的类别的次数信息
        n_total=0
        for class_id, n_data in data.items(): # class_id表示类别编号，n_data表示该类别在该client中的出现次数
            n_total += n_data  # 计算该client拥有的数据条数
        data_list.append(n_total) #data_list保存每个client拥有的数据条数
    print('mean:', np.mean(data_list)) #打印每个client的平均数据条数和方差，以显示异质程度
    print('std:', np.std(data_list))

    return net_cls_counts


In [10]:
training_data_list, training_label_list, traindata_cls_counts = partition_data(data,targets,num_client=10,num_class=10, partition = 'noniid', beta=0.4)

mean: 5000.0
std: 2147.5605695765603


In [None]:
print(training_data_list[0].shape) #[(item_num, 32, 32, 3)*10]
# print(training_label_list[0].shape) # [(item_num, 1)*10]
# print(training_data_list[0].shape==training_label_list[0].shape)

In [64]:
# 创建num_client个pipeline.
# 在pipeline中定义数据增强的流程

CIFAR10_MEAN=[0.49139968 * 255., 0.48215827 * 255., 0.44653124 * 255.]
CIFAR10_STD=[0.24703233 * 255., 0.24348505 * 255., 0.26158768 * 255.]
@pipeline_def(num_threads=4, device_id=0)
def get_dali_pipeline(images, labels):
#     images, labels = fn.readers.file(
#         file_root=images_dir, random_shuffle=True, name="Reader")
    # decode data on the GPU
#     images = fn.decoders.image_random_crop(
#         images, device="gpu", output_type=types.RGB, random_area=[0.14,1])
#     # the rest of processing happens on the GPU as well
#     images = fn.resize(images, resize_x=224, resize_y=224)
    images_1 = fn.random_resized_crop(images, size = 224, random_area =[0.14,1] )
    images_2 = fn.random_resized_crop(images, size=96, random_area=[0.05,0.14] )
    coin = fn.random.coin_flip(probability=0.5 ) #随机镜像对称变换?
    images = fn.flip(images, horizontal=coin)
    sat = fn.random.uniform(range=[0.2, 1.8]) # 亮度,饱和度,对比度变换的变化范围
    hue = fn.random.uniform(range=[-0.2, 0.2]) # 色调 设置hue=0.2
    images = fn.color_twist(images, brightness =sat, saturation =sat,contrast =sat, hue=hue)
    #颜色变换
    images = random_grayscale(images,probability=0.2)
#     images = fn.crop_mirror_normalize(
#         images,
#         device="gpu",
#         crop_h=224,
#         crop_w=224,
#         mean=CIFAR10_MEAN,
#         std=CIFAR10_STD)
#     transforms.RandomHorizontalFlip(p=0.5),
#                 transforms.Compose(color_transform),
#                 transforms.ToTensor(),
#                 transforms.Normalize(mean=mean, std=std)])
#             ] * nmb_crops[i])
    return images, labels

def random_grayscale(images, probability):
    saturate = fn.random.coin_flip(probability=1-probability)
    saturate = fn.cast(saturate, dtype=types.FLOAT)
    return fn.hsv(images, saturation=saturate)

In [65]:
pipeline = get_dali_pipeline(training_data_list[0], training_label_list[0], batch_size=256)

In [66]:
# pipeline.build()

In [67]:
# 定义dataloader
train_data = DALIGenericIterator(
    [pipeline],
    ['data', 'label']
)

In [None]:
for i, data in enumerate(train_data):
    x, y = data[0]['data'], data[0]['label']
    print(f"size of x is :{x.size()}")
    print(f"size of y is: {y.size()}")
#     pred = model(x)
#     loss = loss_func(pred, y)
#     backward(loss, model)

In [76]:
class CIFAR_INPUT_ITER():
    def __init__(self, data, targets, batch_size, train = True):
        self.data = data
        self.targets = targets
        self.batch_size = batch_size
        self.train = train

    def __iter__(self):
        self.i = 0
        self.n = len(self.data)
        return self

    def __next__(self):
        batch = []
        labels = []
        for _ in range(self.batch_size):
            if self.train and self.i % self.n == 0:
                print("执行shuffle")
                self.data, self.targets = shuffle(self.data, self.targets, random_state=0)
            img, label = self.data[self.i], self.targets[self.i]
            batch.append(img)
            labels.append(label)
            self.i = (self.i + 1) % self.n
        return (batch, labels)

    next = __next__


In [111]:
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import cupy as cp
class HybridTrainPipe_CIFAR(Pipeline): # 这个pipeline就相当于是在构造数据集
    def __init__(self, images, labels, batch_size, 
                 num_threads, device_id, data_dir, crop=32, dali_cpu=False, 
                 local_rank=0,world_size=1,cutout=0):
        super(HybridTrainPipe_CIFAR, self).__init__(batch_size, num_threads, device_id, seed=12 + device_id)
        self.iterator = iter(CIFAR_INPUT_ITER(images, labels,batch_size ))
        dali_device = "gpu"
        self.input = ops.ExternalSource()
        self.input_label = ops.ExternalSource()
#         self.image= images
#         self.label = labels
        self.pad = ops.Paste(device=dali_device, ratio=1.25, fill_value=0)
        self.uniform = ops.Uniform(range=(0., 1.))
        self.crop = ops.Crop(device=dali_device, crop_h=crop, crop_w=crop)
        self.cmnp = ops.CropMirrorNormalize(device=dali_device,
                                            dtype=types.FLOAT,
                                            output_layout=types.NCHW,
                                            image_type=types.RGB,
                                            mean=[0.49139968 * 255., 0.48215827 * 255., 0.44653124 * 255.],
                                            std=[0.24703233 * 255., 0.24348505 * 255., 0.26158768 * 255.]
                                            )
        self.coin = ops.CoinFlip(device=dali_device, probability=0.5)
        self.rrc_1 = ops.RandomResizedCrop(device=dali_device, size = 224, random_area =[0.14,1] )
        self.rrc_2 = ops.RandomResizedCrop(device=dali_device, size=96, random_area=[0.05,0.14] )
        self.flip = ops.Flip(device=dali_device, horizontal=self.coin())
        self.sat = ops.random.Uniform(device=dali_device,range=[0.2, 1.8]) # 亮度,饱和度,对比度变换的变化范围
        self.hue = ops.random.Uniform(device=dali_device,range=[-0.2, 0.2]) # 色调 设置hue=0.2
        self.ColorJitter = ops.ColorTwist(device=dali_device, 
                                          brightness =self.sat(), 
                                          saturation =self.sat(),contrast =self.sat(), hue=self.hue())
        
        
        self.hsv = self.random_grayscale(probability=0.2)

    def iter_setup(self):
        (images, labels) = self.iterator.next()
        self.feed_input(self.jpegs, images, layout="HWC")
        self.feed_input(self.labels, labels)

    def define_graph(self): #这个函数是定义怎么对图像进行变换.
        rng = self.coin()
        self.images = self.input()
        self.labels = self.input_label()
        output = self.images.gpu()
        output = self.rrc_1(output)
#         output = self.crop(output, crop_pos_x=self.uniform(), crop_pos_y=self.uniform())
        output = self.flip(output)
#         output = self.cmnp(output, mirror=rng)
        output = self.ColorJitter(output)
        return [output, self.labels]
                                          
    def random_grayscale(self, probability):
        saturate = ops.random.CoinFlip(device = "gpu",probability=1-probability)
        saturate = ops.Cast(device = "gpu", dtype=types.FLOAT)(saturate())
        hsv = ops.Hsv(device = "gpu", saturation=saturate)
        return hsv


In [105]:
IMG_DIR = '../data'
TRAIN_BS = 256
TEST_BS = 200
NUM_WORKERS = 4
CROP_SIZE = 32


pip_train = HybridTrainPipe_CIFAR(images = training_data_list[0], 
                                  labels = training_label_list[0],
                                  batch_size=TRAIN_BS, 
                                  num_threads=NUM_WORKERS, 
                                  device_id=0, 
                                  data_dir=IMG_DIR, 
                                  crop=CROP_SIZE, 
                                  world_size=1, 
                                  local_rank=0, 
                                  cutout=0)

  self.cmnp = ops.CropMirrorNormalize(device=dali_device,
  op_instances.append(_OperatorInstance(input_set, self, **kwargs))


In [100]:
class DALIDataloader(DALIGenericIterator):
    def __init__(self, pipeline, size, batch_size, output_map=["data", "label"], auto_reset=True, onehot_label=False):
        super(DALIDataloader, self).__init__(pipelines=pipeline, size=size, auto_reset=auto_reset, output_map=output_map)
#         self.size = size #这一步赋值不能执行,不知道为什么...
        self.batch_size = batch_size
        self.onehot_label = onehot_label
        self.output_map = output_map #output_map是指定batch中的字典的两个key
        
    def __next__(self):
        if self._first_batch is not None: # 只有第一个batch走这里
            batch = self._first_batch
            self._first_batch = None
            pdb.set_trace()
            return batch[0]  #batch是一个list, list中有一个dict.
        # batch==[{'data': tensor([batch_size, channel_size, input_size, input_size]), 
        #               'labels': tensor([batch_size, 1])}]
        data = super().__next__()[0]
        
        if self.onehot_label:
            data[self.output_map[1]] = data[self.output_map[1]].squeeze().long()
#             return [data[self.output_map[0]], data[self.output_map[1]].squeeze().long()]
#         else:
#             return [data[self.output_map[0]], data[self.output_map[1]]]
        return data
    
    def __len__(self): #计算batch的数量
        if self._size % self.batch_size==0: 
         #self._size是调用DALIGenericIterator类的属性,该值的大小等于训练集的样本数量
            return self._size // self.batch_size
        else:
            return self._size // self.batch_size+1
        

In [112]:
CIFAR_IMAGES_NUM_TRAIN = 50000
train_loader = DALIDataloader(pipeline=pip_train, 
                              size=CIFAR_IMAGES_NUM_TRAIN, 
                              batch_size=TRAIN_BS, 
                              onehot_label=True)

  op_instances.append(_OperatorInstance(input_set, self, **kwargs))


TypeError: when calling operator RandomResizedCrop:
Input 0 is neither a DALI `DataNode` nor a list of data nodes but `ndarray`.
Attempt to convert it to a constant node failed.

In [46]:
class ExternalInputIterator(object):
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", 'r') as f:
            self.files = [line.rstrip() for line in f if line != '']
        shuffle(self.files)

    def __iter__(self):
        self.i = 0
        self.n = len(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []
        for _ in range(self.batch_size):
            jpeg_filename, label = self.files[self.i].split(' ')
            f = open(self.images_dir + jpeg_filename, 'rb')
            batch.append(np.frombuffer(f.read(), dtype = np.uint8))
            labels.append(np.array([label], dtype = np.uint8))
            self.i = (self.i + 1) % self.n
        return (batch, labels)