In [None]:
#DALI
import torch, torchvision
import copy
import numpy as np
import time
import os.path
from torchvision import models

#安装
!pip install --extra-index-url https://developer.download.nvidia.com/compute/redist nvidia-dali-cuda110
#!pip install --extra-index-url https://developer.download.nvidia.com/compute/redist nvidia-dali-tf-plugin-cuda110

from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
import nvidia.dali.fn as fn
#DALIClassificationIterator(pipelines, reader_name)  ==  DALIGenericIterator(pipelines, ["data", "label"], reader_name)
from nvidia.dali.plugin.pytorch import DALIClassificationIterator, LastBatchPolicy#（这个是用于填充作用）
from nvidia.dali.plugin.pytorch import DALIGenericIterator

#自定义增益函数
#from dali_augment import * 

In [None]:
txt_path='/kaggle/input/10-monkey-species/monkey_labels.txt'
root='/kaggle/input/10-monkey-species/'
training_data_path='/kaggle/input/10-monkey-species/training/training/'
valid_data_path='/kaggle/input/10-monkey-species/validation/validation/'
means=[110.508858,109.552668, 84.623747]
stds=[67.212821,66.229520,66.544232]

In [None]:
#写成文本文件
label=os.listdir(training_data_path)
with open('train.txt','w') as f:
    for l in range(len(label)):
        for filename in os.listdir(os.path.join(training_data_path,label[l])):
            line=f'{label[l]}/{filename} {l}\n'
            f.write(line)
with open('valid.txt','w') as f:
    for l in range(len(label)):
        for filename in os.listdir(os.path.join(valid_data_path,label[l])):
            line=f'{label[l]}/{filename} {l}\n'
            f.write(line)#重写数据类，可以将类名加入，也可以不加

# 自己定义数据来源

In [2]:
#构造自己的数据集类
from random import shuffle
#数据源生成器
class ExternalInputIterator(object):
    def __init__(self, batch_size, 
                 device_id,
                 num_gpus,
                ann_file="/kaggle/working/train.txt",
                img_prex='/kaggle/input/10-monkey-species/training/training/'):
        self.ann_file =ann_file
        self.batch_size = batch_size
        self.img_prex=img_prex
        #读取txt文本
        with open(self.ann_file, 'r') as f:
            self.files = [line.rstrip() for line in f if line is not '']
        self.data_set_len = len(self.files)
        self.files = self.files[self.data_set_len * device_id // num_gpus:self.data_set_len * (device_id + 1) //num_gpus]
        self.n = len(self.files)
    def __iter__(self):
        self.i = 0
        shuffle(self.files)
        return self
    def __next__(self):
        batch = []
        labels = []
        if self.i >= self.n:
            raise StopIteration
        #生成一个batch的数据
        for _ in range(self.batch_size):
            #处理字符串
            jpeg_filename, label = self.files[self.i].split(' ')
            # we can use numpy
            batch.append(np.fromfile(self.img_prex+jpeg_filename, dtype = np.uint8)) 
            # or PyTorch's native tensors
            labels.append(torch.tensor([int(label)], dtype = torch.uint8)) 
            self.i = (self.i + 1) % self.n
            
            '''
            #GPU版本
                import cupy as cp
                import imageio
            GPU版本的，但是依然是在CPU上处理，只是处理的结果放到了GPU上面
            jpeg_filename, label = self.files[self.i].split(' ')
            im = imageio.imread(self.img_prex + jpeg_filename)
            im = cp.asarray(im)
            im = im * 0.6;
            batch.append(im.astype(cp.uint8))
            labels.append(cp.array([label], dtype = np.uint8))
            '''
            
        return (batch, labels)#这里就说明了他是dataloader格式的结果
    
    def __len__(self):
        return self.data_set_len

#生成pipeline
def ExternalSourcePipeline(batch_size, 
                           num_threads, 
                           device_id, 
                           external_data,
                          is_training=True,
                          dali_cpu=False):
    pipe = Pipeline(batch_size, num_threads,device_id)

    with pipe:
        #用np.fromfile 读取成一列数据
        jpegs, labels = fn.external_source(source=external_data, 
                                           num_outputs=2)#输出个数保证一致
        
        
        #设置设备
        dali_device = 'cpu' if dali_cpu else 'gpu'
        decoder_device = 'cpu' if dali_cpu else 'mixed'
        device_memory_padding = 211025920 if decoder_device == 'mixed' else 0
        host_memory_padding = 140544512 if decoder_device == 'mixed' else 0
        
        
        if is_training:
            #增益方法
            images = fn.decoders.image_random_crop(jpegs,
                                                  device=decoder_device,
                                                   output_type=types.RGB,
                                                  device_memory_padding=device_memory_padding,
                                                  host_memory_padding=host_memory_padding,
                                                  random_aspect_ratio=[0.8, 1.25],
                                                  random_area=[0.1, 1.0]
                                                  ,num_attempts=100)
            
            
            #images =augment(images)
            images = fn.resize(images, resize_x=240, resize_y=240)
            mirror = fn.random.coin_flip(probability=0.5)
        else:
            images = fn.decoders.image(jpegs,
                                      device='mixed',
                                      output_type=types.RGB)
            images = fn.resize(images, resize_x=240, resize_y=240)
            mirror = False
        images = fn.crop_mirror_normalize(images.gpu(),
                                          dtype=types.FLOAT,
                                          output_layout="CHW",
                                          mean=[0.485 * 255,0.456 * 255,0.406 * 255],
                                          std=[0.229 * 255,0.224 * 255,0.225 * 255],
                                          mirror=mirror)
        labels = labels.gpu()
        pipe.set_outputs(images, labels)
    return pipe


#实例化训练数据的来源，生成器
train_eii = ExternalInputIterator(256, 0, 1)

#构造训练pipe
train_pipe = ExternalSourcePipeline(batch_size=256, 
                              num_threads=2,
                              device_id = 0,
                              external_data = train_eii,
                               dali_cpu=False)

train_pipe.build()
trainloader= DALIGenericIterator(train_pipe,
                          ["images","labels"],
                           size=len(train_eii),
                          auto_reset=True,
                          last_batch_policy=LastBatchPolicy.PARTIAL)#其他任务要调用这个


#实例化验证数据的来源
valids_eii = ExternalInputIterator(64, 0, 1, 
                ann_file="/kaggle/working/valid.txt",
                img_prex='/kaggle/input/10-monkey-species/validation/validation/')
#再构造验证pipe
valid_pipe = ExternalSourcePipeline(batch_size=64, 
                              num_threads=2,
                              device_id = 0,
                              external_data = valids_eii,
                              is_training=False,
                              dali_cpu=False)
valid_pipe.build()
#pii = PyTorchIterator(pipe, size=len(eii), last_batch_padded=True, last_batch_policy=LastBatchPolicy.PARTIAL)
validloader= DALIGenericIterator(valid_pipe,
                          ["images","labels"],
                           size=len(valids_eii),
                          last_batch_policy=LastBatchPolicy.PARTIAL)#其他任务要调用这个

NameError: name 'RandAugment' is not defined

# 使用内置的读取函数

In [None]:
#和上面是一样的, 区别是没有写生成数据，而是直接调用fn.file_reader，并且整合了InputIterator和PIPELINE两步操作

def create_dali_pipeline(batch_size, 
                         num_threads,
                         device_id, 
                         data_dir, 
                         crop,
                         size,
                         shard_id, 
                         num_shards,
                         dali_cpu=False, 
                         is_training=True):
    pipeline = Pipeline(batch_size, 
                        num_threads, 
                        device_id, seed=12 + device_id)
    with pipeline:
        images, labels = fn.readers.file(file_root=data_dir,
                                        shard_id=0,
                                        num_shards=1,
                                        random_shuffle=is_training,
                                        pad_last_batch=True,name="Reader")
        #指定设备
        dali_device = 'cpu' if dali_cpu else 'gpu'
        decoder_device = 'cpu' if dali_cpu else 'mixed'
        device_memory_padding = 211025920 if decoder_device == 'mixed' else 0
        host_memory_padding = 140544512 if decoder_device == 'mixed' else 0
        
        if is_training:
            #这个是解码函数，是在mixed上执行
            images = fn.decoders.image_random_crop(images,
                                                  device=decoder_device,
                                                   output_type=types.RGB,
                                                  device_memory_padding=device_memory_padding,
                                                  host_memory_padding=host_memory_padding,
                                                  random_aspect_ratio=[0.8, 1.25],
                                                  random_area=[0.1, 1.0]
                                                  ,num_attempts=100)
            #之后要在GPU上执行
            images = fn.resize(images,
                               device=dali_device,
                               resize_x=crop,
                               resize_y=crop,
                               interp_type=types.INTERP_TRIANGULAR)
            mirror = fn.random.coin_flip(probability=0.5)
        else:
            images = fn.decoders.image(images,
                                      device=decoder_device,
                                      output_type=types.RGB)
            images = fn.resize(images,
                               device=dali_device,
                               size=size,
                               mode="not_smaller",
                               interp_type=types.INTERP_TRIANGULAR)
            mirror = False
            
        #这个很关键，执行的TOTENSOR和NORMIZATION两个操作
        images = fn.crop_mirror_normalize(images.gpu(),
                                          dtype=types.FLOAT,
                                          output_layout="CHW",
                                          crop=(crop, crop),
                                          mean=[0.485 * 255,0.456 * 255,0.406 * 255],
                                          std=[0.229 * 255,0.224 * 255,0.225 * 255],
                                          mirror=mirror)
        labels = labels.gpu()
        pipeline.set_outputs(images, labels)
    return pipeline

In [7]:
#常规的分类数据集使用这个
crop_size=224
val_size=224

pipe = create_dali_pipeline(batch_size=128,
                            num_threads=1,
                            device_id=0,
                            data_dir=training_data_path,
                            crop=crop_size,
                            size=val_size,
                            dali_cpu=False,
                            shard_id=0,
                            num_shards=1,
                           is_training=True)
pipe.build()
trainloader = DALIClassificationIterator(pipe, 
                                         reader_name="Reader", 
                                         last_batch_policy=LastBatchPolicy.PARTIAL)#这是一个生成器
pipe = create_dali_pipeline(batch_size=64,
                            num_threads=1,
                            device_id=0,
                            data_dir=valid_data_path,
                            crop=crop_size,
                            size=val_size,
                            dali_cpu=False,
                            shard_id=0,
                            num_shards=1,
                           is_training=False)
pipe.build()
validloader = DALIClassificationIterator(pipe, 
                                         reader_name="Reader",
                                         last_batch_policy=LastBatchPolicy.PARTIAL)#这是一个生成器

NameError: name 'create_dali_pipeline' is not defined

In [8]:
#调用模型
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.backends.cudnn.benchmark=True # 保证输入大小不变
model = models.resnet18(pretrained=True) #预训练模型
num_ftrs = model.fc.in_features 
model.fc = torch.nn.Linear(num_ftrs, 10) #修改分类头
model = model.to(device)

NameError: name 'torch' is not defined

In [9]:
#训练设置
criterion = torch.nn.CrossEntropyLoss()

# Observe that all parameters are being optimized  优化器
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

# Decay LR by a factor of 0.1 every 7 epochs 超级加速
optimizer = torch.optim.AdamW(model.parameters(),lr=0.001,weight_decay=1e-4, amsgrad=False)
exp_lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10)

NameError: name 'torch' is not defined

In [10]:
#训练过程
epochs= 30 
for epoch in range(epochs):
    epoch_loss= 0.0
    epoch_count = 0
    model.train()
    print(f'Epoch {epoch}/{epochs}')
    print('----------------------------')
    start_time = time.time()
    for i,data in enumerate(trainloader):
       
        optimizer.zero_grad()
        
        img = data[0]["images"]#注意标签
        label = data[0]["labels"].squeeze(-1).cuda().long()
        
        #已经在GPU上了
        #img=img.to(device)
        #label = label.to(device)
        
        prediction = model(img)
        _ , correct = torch.max(prediction,dim=1)
        loss = criterion (prediction,label)
        
        epoch_loss += loss.item()
        epoch_count +=  (correct==label).sum().item()
        loss.backward()
        optimizer.step()
        exp_lr_scheduler.step()
        one_epoch_time = time.time()-start_time
    print (f'one_epoch_time :{one_epoch_time:.4f},lr : {optimizer.param_groups[0]["lr"]},train Loss: {epoch_loss:.4f} Acc:{epoch_count/len(trainloader)/256*100:.4f} ')
    if (epoch+1) % 3 == 0:
        model.eval()
        eval_loss=0.0
        eval_accuracy=0.0
            
        for data in validloader:
            
            image = data[0]["images"]
            label = data[0]["labels"].squeeze(-1).cuda().long()
            
            #数据已经在GPU上了
            #image=image.to(device)
            #label=label.to(device)
                
            with torch.no_grad():
                pred=model(image)
            loss = criterion (pred,label)
            _ , count = torch.max(pred,dim=1)
            eval_accuracy += (count==label).sum().item()
            eval_loss += loss.item()
        print (f'valid Loss: {eval_loss:.4f} Acc:{eval_accuracy /len(validloader)/64*100:.4f}')
        
        model.train()
    
    
    trainloader.reset()
    validloader.reset()


NameError: name 'model' is not defined

# FP16训练 

In [None]:
from torch.cuda.amp import autocast, GradScaler

# amp依赖Tensor core架构，所以model参数必须是cuda tensor类型
model = Net().cuda()
optimizer = optim.SGD(model.parameters(), ...)
# GradScaler对象用来自动做梯度缩放
scaler = GradScaler()

for epoch in epochs:
    for input, target in data:
        optimizer.zero_grad()
        # 在autocast enable 区域运行forward
        with autocast():
            # model做一个FP16的副本，forward
            output = model(input)
            loss = loss_fn(output, target)
        # 用scaler，scale loss(FP16)，backward得到scaled的梯度(FP16)
        scaler.scale(loss).backward()
        # scaler 更新参数，会先自动unscale梯度
        # 如果有nan或inf，自动跳过
        scaler.step(optimizer)
        # scaler factor更新
        scaler.update()

# 定位 
画矩形
import matplotlib.patches as patches
rect = patches.Rectangle((l, t), width=(r - l), height=(b - t),linewidth=1, edgecolor='#76b900', facecolor='none')
ax.add_patch(rect)

In [2]:
@pipeline_def
def create_coco_pipeline(default_boxes, args):
    try:
        shard_id = torch.distributed.get_rank()
        num_shards = torch.distributed.get_world_size()
    except RuntimeError:
        shard_id = 0
        num_shards = 1
    #读取方式
    inputs, bboxes, labels, polygons, vertices = fn.readers.coco(
        file_root=file_root,
        annotations_file=annotations_file,
        polygon_masks=True, # Load segmentation mask data as polygons
        ratio=True,         # Bounding box and mask polygons to be expressed in relative coordinates
        ltrb=True,          # Bounding boxes to be expressed as left, top, right, bottom coordinates
    )
    images, bboxes, labels = fn.readers.coco(file_root=args.train_coco_root,
                                             annotations_file=args.train_annotate,
                                             skip_empty=True,
                                             shard_id=shard_id,
                                             num_shards=num_shards,
                                             ratio=True,
                                             ltrb=True,
                                             random_shuffle=False,
                                             shuffle_after_epoch=True,
                                             name="Reader")
    #只处理bboxes
    crop_begin, crop_size, bboxes, labels = fn.random_bbox_crop(bboxes, labels,
                                                                device="cpu",
                                                                aspect_ratio=[0.5, 2.0],
                                                                thresholds=[0, 0.1, 0.3, 0.5, 0.7, 0.9],#裁剪的IOU
                                                                scaling=[0.3, 1.0],
          bbox_layout="xyXY",#[start_x, start_y, end_x, and end_y],对应的还有”xyWH”=[start_x,start_y,width,height]
                                                                allow_no_crop=True,
                                                                num_attempts=50)                #尝试次数
    #用上面的结果给image_slice方法使用来处理图片的裁剪
    images = fn.decoders.image_slice(images, crop_begin, crop_size, device="mixed", output_type=types.RGB)
    flip_coin = fn.random.coin_flip(probability=0.5)
    images = fn.resize(images,
                       resize_x=300,
                       resize_y=300,
                       min_filter=types.DALIInterpType.INTERP_TRIANGULAR)
    
    #生成随机数
    saturation = fn.uniform(range=[0.5, 1.5])
    contrast = fn.uniform(range=[0.5, 1.5])
    brightness = fn.uniform(range=[0.875, 1.125])
    hue = fn.uniform(range=[-0.5, 0.5])
    
    #调节图片色度
    images = fn.hsv(images, dtype=types.FLOAT, hue=hue, saturation=saturation)  # use float to avoid clipping and
                                                         # quantizing the intermediate result
    images = fn.brightness_contrast(images,
                                    contrast_center = 128,  # input is in float, but in 0..255 range
                                    dtype = types.UINT8,
                                    brightness = brightness,
                                    contrast = contrast)

    dtype = types.FLOAT16 if args.fp16 else types.FLOAT
    
    #图片翻转
    bboxes = fn.bb_flip(bboxes, ltrb=True, horizontal=flip_coin)
    images = fn.crop_mirror_normalize(images,
                                      crop=(300, 300),
                                      mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
                                      std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
                                      mirror=flip_coin,
                                      dtype=dtype,
                                      output_layout="CHW",
                                      pad_output=False)
    #编码
    bboxes, labels = fn.box_encoder(bboxes, labels,
                                    criteria=0.5,
                                    anchors=default_boxes.as_ltrb_list())#这个是进行bboxes编码工作的，直接给定anchor来计算bboxes

    labels=labels.gpu()
    bboxes=bboxes.gpu()

    return images, bboxes, labels



train_pipe = create_coco_pipeline(
    default_boxes,
    args,
    batch_size=args.batch_size,
    num_threads=args.num_workers,
    device_id=args.local_rank,
    seed=local_seed)

#定位使用的类
train_loader = DALIGenericIterator(
    train_pipe,
    ["images", "boxes", "labels"],
    reader_name="Reader",
    last_batch_policy=LastBatchPolicy.FILL)

NameError: name 'pipeline_def' is not defined

# segmentation

In [None]:
#通用过程就是先创建一个Pipeline实例,在实例里完成整个管道构建
#pipe.build()
#pipe.run()

pipe = Pipeline(batch_size=batch_size, num_threads=num_threads, device_id=device_id, seed=11)
with pipe:
    # COCO reader, with piwelwise masks
    inputs, bboxes, labels, masks = fn.readers.coco(
        file_root=file_root,
        annotations_file=annotations_file,
        pixelwise_masks=True # Load segmentation pixelwise mask data
    )
    #解码图片
    images = fn.decoders.image(inputs)

    # COCO reader produces three dimensions (H, W, 1). Here we are just removing the trailing dimension
    # rel_shape=(1, 1) means keep the first two dimensions as they are.
    masks = fn.reshape(masks, rel_shape=(1, 1))

    # Select random foreground pixels with 70% probability and random pixels with 30% probability
    # Foreground pixels are by default those with value higher than 0.
    center = fn.segmentation.random_mask_pixel(
        masks, foreground=fn.random.coin_flip(probability=0.7)
    )

    # Random crop shape (can also be constant)
    crop_h = fn.cast(fn.random.uniform(range=(200, 300), shape=(1,), device='cpu'), dtype=types.INT64)
    crop_w = fn.cast(fn.random.uniform(range=(200, 300), shape=(1,), device='cpu'), dtype=types.INT64)
    crop_shape = fn.cat(crop_h, crop_w, axis=0)

    # Calculating anchor for slice (top-left corner of the cropping window)
    crop_anchor = center - crop_shape // 2

    # Slicing image and mask.
    # Note that we are allowing padding when sampling out of bounds, since a foreground pixel can appear
    # near the edge of the image.
    out_image = fn.slice(images, crop_anchor, crop_shape, axis_names="HW", out_of_bounds_policy='pad')
    out_mask = fn.slice(masks, crop_anchor, crop_shape, axis_names="HW", out_of_bounds_policy='pad')
    
    #这是最后一步,完成建立pipeline
    pipe.set_outputs(images, masks, center, crop_anchor, crop_shape, out_image, out_mask)
    
pipe.build()
outputs = pipe.run()
i = 16
image = outputs[0].at(i)
mask = outputs[1].at(i)
center = outputs[2].at(i)
anchor = outputs[3].at(i)
shape = outputs[4].at(i)
out_image = outputs[5].at(i)
out_mask = outputs[6].at(i)


#可视化
fig, ax = plt.subplots(dpi=160)
ax.imshow(image)
ax.imshow(mask, cmap='jet', alpha=0.5)
rect = patches.Rectangle((anchor[1], anchor[0]), width=shape[1], height=shape[0],
                         linewidth=1, edgecolor='#76b900', facecolor='none')
ax.add_patch(rect)
ax.scatter(center[1], center[0], s=10, edgecolor='#76b900')
plt.title('Original Image/Mask with random crop window and center')
plt.show()

fig, ax = plt.subplots(dpi=160)
ax.imshow(out_image)
ax.imshow(out_mask, cmap='jet', alpha=0.5)
plt.title('Cropped Image/Mask')
plt.show()

# 测试管道



In [None]:
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
from nvidia.dali import pipeline_def
%matplotlib inline

def show_images(image_batch):
    columns = 4
    rows = (16 + 1) // (columns)
    fig = plt.figure(figsize = (32,(32 // columns) * rows))
    gs = gridspec.GridSpec(rows, columns)
    for j in range(rows*columns):
        plt.subplot(gs[j])
        plt.axis("off")
        plt.imshow(image_batch.at(j))

In [8]:
#定义管道的方法
#  并行化参数(batch_size=batch_size, num_threads=2, device_id=0, py_num_workers=4, py_start_method='spawn')
@pipeline_def
def random_rotated_gpu_pipeline():
    #读取图片
    jpegs, labels = fn.readers.file(file_root=training_data_path, 
                                    random_shuffle=True, #在这里shuffle
                                    initial_fill=21,
                                   #parallel=True,并行化
                                   )
    images = fn.decoders.image(jpegs, device='mixed')
    angle = fn.random.uniform(range=(-10.0, 10.0))
    rotated_images = fn.rotate(images.gpu(), #这里是将图片放到GPU上
                               angle=angle, 
                               fill_value=0)

    return rotated_images, labels,angle

NameError: name 'ExternalInputIterator' is not defined

In [None]:
pipe = random_rotated_gpu_pipeline(batch_size=16, num_threads=4, device_id=0)
pipe.build()

In [None]:
pipe_out = pipe.run()
print(pipe_out)
images, labels,angle = pipe_out
#要把图片复制CPU上才能可视化，并不能从GPU移动到CPU上
show_images(images.as_cpu())

In [1]:
# 序列化
s = pipe.serialize()
#反序列化
pipe2 = Pipeline(batch_size = batch_size, num_threads = 2, device_id = 0, seed = 12)

pipe2.deserialize_and_build(s)

NameError: name 'pipe' is not defined

注意事项

*Since decoders.image does not accept data on the GPU we need to decode it outside DALI on the CPU and then move it to the GPU.

*data[0].at(5)只能用at方法来切片

*data[0].as_cpu().at(5)顺序不能改

*as_array()是一个关键函数

*angle = fn.random.uniform(range=(-10.0, 10.0)) #生成随机数,是增益的重要函数
        saturation = fn.uniform(range=[0.5, 1.5])
        contrast = fn.uniform(range=[0.5, 1.5])
        brightness = fn.uniform(range=[0.875, 1.125])
        hue = fn.uniform(range=[-0.5, 0.5])

*nvidia.dali.fn 包中有所有的处理函数Operations

*nvidia.dali.math 支持的数学计算

# 自定义计算OP

In [1]:
#创建一个OP头文件
#!cat customdummy/dummy.h

    #ifndef EXAMPLE_DUMMY_H_
    #define EXAMPLE_DUMMY_H_

    #include <vector>

    #include "dali/pipeline/operator/operator.h"

    namespace other_ns {

    template <typename Backend>
    class Dummy : public ::dali::Operator<Backend> {
     public:
      inline explicit Dummy(const ::dali::OpSpec &spec) :
        ::dali::Operator<Backend>(spec) {}

      virtual inline ~Dummy() = default;

      Dummy(const Dummy&) = delete;
      Dummy& operator=(const Dummy&) = delete;
      Dummy(Dummy&&) = delete;
      Dummy& operator=(Dummy&&) = delete;

     protected:
     
      //是否有输出信息推断，就是setupimpl的指示
      bool CanInferOutputs() const override {
        return true;
      }
      
      
      //根据输入来计算输出的形状和类型
      bool SetupImpl(std::vector<::dali::OutputDesc> &output_desc,
                     const ::dali::workspace_t<Backend> &ws) override {
        const auto &input = ws.template InputRef<Backend>(0);
        output_desc.resize(1);
        output_desc[0] = {input.shape(), input.type()};
        return true;
      }
       
      //主执行函数，在.CC文件中实现
      void RunImpl(::dali::workspace_t<Backend> &ws) override;
    };

    }  // namespace other_ns

    #endif  // EXAMPLE_DUMMY_H_

In [2]:
#CPU上的实现
#!cat customdummy/dummy.cc

    //头文件导入
    #include "dummy.h"

    namespace other_ns {


    //OP实现的方法具体执行，输入是整个batch
    template <>
    void Dummy<::dali::CPUBackend>::RunImpl(::dali::HostWorkspace &ws) {
      const auto &input = ws.InputRef<::dali::CPUBackend>(0);
      auto &output = ws.OutputRef<::dali::CPUBackend>(0);

      ::dali::TypeInfo type = input.type();
      auto &tp = ws.GetThreadPool();
      const auto &in_shape = input.shape();
      for (int sample_id = 0; sample_id < in_shape.num_samples(); sample_id++) {
        tp.AddWork(
            [&, sample_id](int thread_id) {
              type.Copy<::dali::CPUBackend, ::dali::CPUBackend>(output.raw_mutable_tensor(sample_id),
                                                                input.raw_tensor(sample_id),
                                                                in_shape.tensor_size(sample_id), 0);
            },
            in_shape.tensor_size(sample_id));
      }
      tp.RunAll();
    }

    }  // namespace other_ns
    
    
    //注册接口，注意修改名称
    DALI_REGISTER_OPERATOR(CustomDummy, ::other_ns::Dummy<::dali::CPUBackend>, ::dali::CPU);
    
    //简单的说明
    DALI_SCHEMA(CustomDummy)
        .DocStr("Make a copy of the input tensor")
        .NumInput(1)
        .NumOutput(1);

In [3]:
#GPU版本的实现
#! cat customdummy/dummy.cu

    #include <cuda_runtime_api.h>
    #include "dummy.h"

    namespace other_ns {
    
    
    //具体实现
    template<>
    void Dummy<::dali::GPUBackend>::RunImpl(::dali::DeviceWorkspace &ws) {
      const auto &input = ws.Input<::dali::GPUBackend>(0);
      auto &output = ws.Output<::dali::GPUBackend>(0);
      CUDA_CALL(cudaMemcpyAsync(
              output.raw_mutable_data(),
              input.raw_data(),
              input.nbytes(),
              cudaMemcpyDeviceToDevice,
              ws.stream()));
    }

    }  // namespace other_ns
    
    
    //注册
    DALI_REGISTER_OPERATOR(CustomDummy, ::other_ns::Dummy<::dali::GPUBackend>, ::dali::GPU);

In [4]:
#用Cmake来编译，不是用pybild11
#!cat customdummy/CMakeLists.txt

    cmake_minimum_required(VERSION 3.5)
    project(custom_dummy_plugin)
    find_package(CUDA 9.0 REQUIRED)

    execute_process(
            COMMAND python -c "import nvidia.dali as dali; print(dali.sysconfig.get_lib_dir())"
            OUTPUT_VARIABLE DALI_LIB_DIR)
    string(STRIP ${DALI_LIB_DIR} DALI_LIB_DIR)

    execute_process(
            COMMAND python -c "import nvidia.dali as dali; print(\" \".join(dali.sysconfig.get_compile_flags()))"
            OUTPUT_VARIABLE DALI_COMPILE_FLAGS)
    string(STRIP ${DALI_COMPILE_FLAGS} DALI_COMPILE_FLAGS)

    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 ${DALI_COMPILE_FLAGS} ")
    set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -std=c++14 ${DALI_COMPILE_FLAGS} ")
    link_directories( "${DALI_LIB_DIR}" )

    cuda_add_library(customdummy SHARED dummy.cc dummy.cu )
    target_link_libraries(customdummy dali)

    !rm -rf customdummy/build
    !mkdir -p customdummy/build
    !cd customdummy/build && \
      cmake .. && \
      make
      
      
     生成了! ls customdummy/build/*.so文件

In [5]:
import nvidia.dali.plugin_manager as plugin_manager
plugin_manager.load_library('./customdummy/build/libcustomdummy.so')
#调用
help(fn.custom_dummy)

ModuleNotFoundError: No module named 'nvidia'

# PYTHON函数定义操作，因为python线程问题，只适合测试

# CPU

In [8]:
import nvidia.dali.plugin.pytorch as dalitorch
import torch
import torch.utils.dlpack as torch_dlpack
import torchvision.transforms as transforms


transform = transforms.Compose([transforms.ToPILImage(),
                              transforms.RandomPerspective(p=1.),
                              transforms.ToTensor()])


def perspective(t):
    return transform(t).transpose(2, 0).transpose(0, 1)



def dlpack_manipulation(dlpacks):
    tensors = [torch_dlpack.from_dlpack(dlpack) for dlpack in dlpacks]
    output = [(tensor.to(torch.float32) / 255.).sqrt() for tensor in tensors]
    output.reverse()
    return [torch_dlpack.to_dlpack(tensor) for tensor in output]



torch_function_pipe = Pipeline(batch_size=batch_size, 
                               num_threads=4, 
                               device_id=0,
                               exec_async=False, 
                               exec_pipelined=False, 
                               seed=99)

with torch_function_pipe:
    
    input, _ = fn.readers.file(file_root=image_dir, random_shuffle=True)
    
    im = fn.decoders.image(input, device='cpu', output_type=types.RGB)
    
    res = fn.resize(im, resize_x=300, resize_y=300)
    
    norm = fn.crop_mirror_normalize(res, std=255., mean=0.)
    
    perspective = dalitorch.fn.torch_python_function(norm, function=perspective)#用python的方法处理
    
    sqrt_color = fn.dl_tensor_python_function(res, function=dlpack_manipulation)
    
    torch_function_pipe.set_outputs(perspective, sqrt_color)

ModuleNotFoundError: No module named 'nvidia'

# GPU这个才是核心
   因为cupy和numpy是一样的,所以可以不用修改API，直接用

In [None]:
import cupy

#不能修改输入，而是只能够复制一份再修改   自定义操作
def edit_images(image1, image2):
    
    assert image1.shape == image2.shape
    h, w, c = image1.shape
    y, x = cupy.ogrid[0:h, 0:w]
    mask = (x - w / 2) ** 2 + (y - h / 2) ** 2 > h * w / 9
    result1 = cupy.copy(image1)
    result1[mask] = image2[mask]
    result2 = cupy.copy(image2)
    result2[mask] = image1[mask]
    #cupy.cuda.get_current_stream().synchronize() 
    return result1, result2


#或者写一个简单的kernel函数
mix_channels_kernel = cupy.ElementwiseKernel(
        'uint8 x, uint8 y',
        'uint8 z',
        'z = (i % 3) ? x : y',
        'mix_channels'
    )


out1, out2 = fn.python_function(res1, res2, device='gpu', function=edit_images, num_outputs=2)


out3       = fn.python_function(res1, res2, device='gpu', function=mix_channels_kernel)

In [1]:
#只要将‘gpu’指定出来就可以在GPU上使用
image_dir = '../data/images'
batch_size = 4

#这里要求exec_async=False和exec_pipelined=False
python_function_pipe = Pipeline(batch_size=batch_size, 
                                num_threads=4, 
                                device_id=0,
                                exec_async=False, 
                                exec_pipelined=False, 
                                seed=99)

with python_function_pipe:
    input1, _ = fn.readers.file(file_root=image_dir, random_shuffle=True)
    input2, _ = fn.readers.file(file_root=image_dir, random_shuffle=True)
    im1, im2 = fn.decoders.image([input1, input2], device='mixed', output_type=types.RGB)
    res1, res2 = fn.resize([im1, im2], device='gpu', resize_x=300, resize_y=300)
    
    out1, out2 = fn.python_function(res1, res2, device='gpu', function=edit_images, num_outputs=2)
    
    
    out3 = fn.python_function(res1, res2, device='gpu', function=mix_channels_kernel)
    
    
    python_function_pipe.set_outputs(out1, out2, out3)

NameError: name 'Pipeline' is not defined

# CUPY使用

## 单一元素运算

In [None]:
import cupy as cp

In [3]:
squared_diff = cp.ElementwiseKernel(
  'T x, T y',#输入参数列表
   'T z', #输出参数列表
    
    '''
    
    z = (x - y) * (x - y);
    
    ''',#执行
  'squared_diff'#name
)
squared_diff(a,b)

NameError: name 'cp' is not defined

## 使用索引

In [4]:
#raw 的意思是就是用Y的原来类型，而不是每个元素执行，所以当用元素执行时，要给定他的索引
add_reverse = cp.ElementwiseKernel(
     'T x, raw T y', 'T z',
     'z = x + y[_ind.size() - i - 1]',
    'add_reverse')

NameError: name 'cp' is not defined

# Reduction kernels

In [5]:
#看清他每一个位置的操作
l2norm_kernel = cp.ReductionKernel(
     'T x',  # input params
     'T y',  # output params
     'x * x',  # map
     'a + b',  # reduce
     'y = sqrt(a)',  # post-reduction map
     '0',  # identity value
     'l2norm'  # kernel name
 )

NameError: name 'cp' is not defined

In [6]:
add_kernel = cp.RawKernel(
r'''

extern "C" __global__
void my_add(const float* x1, const float* x2, float* y) {
    int tid = blockDim.x * blockIdx.x + threadIdx.x;
     y[tid] = x1[tid] + x2[tid];
 }
 
 ''',
'my_add')
x1 = cp.arange(25, dtype=cp.float32).reshape(5, 5)
x2 = cp.arange(25, dtype=cp.float32).reshape(5, 5)
y = cp.zeros((5, 5), dtype=cp.float32)
add_kernel((5,), (5,), (x1, x2, y))  # grid, block and arguments

NameError: name 'cp' is not defined

# 单机多卡

In [None]:
import os
import numpy as np
import time
import matplotlib.pyplot as plt
from PIL import Image
import torch, torchvision
from torch.utils.data import Dataset,DataLoader
from torchvision.transforms import Compose
from torchvision import transforms
from torchvision.models import resnet50,resnet18
from torchvision.datasets import ImageFolder
from torch.nn import functional as F

torch.cuda.is_available()
device=torch.device('cuda')
from torchvision import models
from torch import nn
import argparse
import  torch.distributed as DDP
#DALI

import os.path
from torchvision import models
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
import nvidia.dali.fn as fn
#DALIClassificationIterator(pipelines, reader_name)  ==  DALIGenericIterator(pipelines, ["data", "label"], reader_name)
from nvidia.dali.plugin.pytorch import DALIClassificationIterator, LastBatchPolicy#（这个是用于填充作用）
from nvidia.dali.plugin.pytorch import DALIGenericIterator
from random import shuffle

txt_path='/kaggle/input/10-monkey-species/monkey_labels.txt'
root='/kaggle/input/10-monkey-species/'
training_data_path='/kaggle/input/10-monkey-species/training/training/'
valid_data_path='/kaggle/input/10-monkey-species/validation/validation/'
means=[110.508858,109.552668, 84.623747]
stds=[67.212821,66.229520,66.544232]

class ExternalInputIterator(object):
    def __init__(self, batch_size, 
                 device_id,
                 num_gpus,
                ann_file="/kaggle/working/train.txt",
                img_prex='/kaggle/input/10-monkey-species/training/training/'):
        self.ann_file =ann_file
        self.batch_size = batch_size
        self.img_prex=img_prex
        #读取txt文本
        with open(self.ann_file, 'r') as f:
            self.files = [line.rstrip() for line in f if line is not '']
        self.data_set_len = len(self.files)
        self.files = self.files[self.data_set_len * device_id // num_gpus:self.data_set_len * (device_id + 1) //num_gpus]
        self.n = len(self.files)
    def __iter__(self):
        self.i = 0
        shuffle(self.files)
        return self
    def __next__(self):
        batch = []
        labels = []
        if self.i >= self.n:
            raise StopIteration
        #生成一个batch的数据
        for _ in range(self.batch_size):
            #处理字符串
            jpeg_filename, label = self.files[self.i].split(' ')
            # we can use numpy
            batch.append(np.fromfile(self.img_prex+jpeg_filename, dtype = np.uint8)) 
            # or PyTorch's native tensors
            labels.append(torch.tensor([int(label)], dtype = torch.uint8)) 
            self.i = (self.i + 1) % self.n
            
        return (batch, labels)#这里就说明了他是dataloader格式的结果
    
    def __len__(self):
        return self.data_set_len

#生成pipeline
def ExternalSourcePipeline(batch_size, 
                           num_threads, 
                           device_id, 
                           external_data,
                          is_training=True,
                          dali_cpu=False):
    pipe = Pipeline(batch_size, num_threads,device_id)

    with pipe:
        #用np.fromfile 读取成一列数据
        jpegs, labels = fn.external_source(source=external_data, 
                                           num_outputs=2)#输出个数保证一致
        
        
        #设置设备
        dali_device = 'cpu' 
        decoder_device = 'cpu'
        device_memory_padding = 211025920 
        host_memory_padding = 140544512 
        
        
        if is_training:
            #增益方法
            images = fn.decoders.image_random_crop(jpegs,
                                                  device=decoder_device,
                                                   output_type=types.RGB,
                                                  device_memory_padding=device_memory_padding,
                                                  host_memory_padding=host_memory_padding,
                                                  random_aspect_ratio=[0.8, 1.25],
                                                  random_area=[0.1, 1.0]
                                                  ,num_attempts=100)
            
            
            #images =augment(images)
            images = fn.resize(images, resize_x=240, resize_y=240)
            mirror = fn.random.coin_flip(probability=0.5)
        else:
            images = fn.decoders.image(jpegs,
                                      device='mixed',
                                      output_type=types.RGB)
            images = fn.resize(images, resize_x=240, resize_y=240)
            mirror = False
        images = fn.crop_mirror_normalize(images.gpu(),
                                          dtype=types.FLOAT,
                                          output_layout="CHW",
                                          mean=[0.485 * 255,0.456 * 255,0.406 * 255],
                                          std=[0.229 * 255,0.224 * 255,0.225 * 255],
                                          mirror=mirror)
        labels = labels.gpu()
        pipe.set_outputs(images, labels)
    return pipe


def train(local_rank,train_eii,valids_eii,criterion,model,optimizer,exp_lr_scheduler):
    model = torch.nn.parallel.DistributedDataParallel(model.to(local_rank),device_ids=[local_rank])
    train_pipe = ExternalSourcePipeline(batch_size=256, 
                                  num_threads=2,
                                  device_id = local_rank,
                                  external_data = train_eii,
                                   dali_cpu=False)
    train_pipe.build()
    train_loader= DALIGenericIterator(train_pipe,
                              ["images","labels"],
                               size=len(train_eii),
                              auto_reset=True,
                              last_batch_policy=LastBatchPolicy.PARTIAL)#其他任务要调用这个
    valid_pipe = ExternalSourcePipeline(batch_size=64, 
                                  num_threads=2, device_id = local_rank,
                                  external_data = valids_eii,is_training=False,dali_cpu=False)
    valid_pipe.build()
    valid_loader= DALIGenericIterator(valid_pipe,
                              ["images","labels"],
                               size=len(valids_eii),
                              last_batch_policy=LastBatchPolicy.PARTIAL)#其他任务要调用这个      
    epochs= 50
    for epoch in range(epochs):
        epoch_loss= 0.0
        epoch_count = 0
        model.train()
        print(f'Epoch {epoch}/{epochs}')
        print('----------------------------')
        start_time = time.time()
   
        for i,data in enumerate(train_loader):

            optimizer.zero_grad()
            img = data[0]["images"].cuda(local_rank)#注意标签
            label_train = data[0]["labels"].squeeze(-1).cuda(local_rank).long()
            prediction = model(img)
            _ , correct = torch.max(prediction,dim=1)
            loss = criterion (prediction,label_train)

            epoch_loss += loss.item()
            epoch_count +=  (correct==label_train).sum().item()
            loss.backward()
            optimizer.step()
            exp_lr_scheduler.step()
            one_epoch_time = time.time()-start_time
        print (f'one_epoch_time :{one_epoch_time:.4f},lr : {optimizer.param_groups[0]["lr"]},train Loss: {epoch_loss:.4f} Acc:{epoch_count/len(train_loader)/256*100:.4f} ')
        if (epoch+1) % 3 == 0:
            model.eval()
            eval_loss=0.0
            eval_count=0.0

            for data in valid_loader:

                image = data[0]["images"].cuda(local_rank)
                label = data[0]["labels"].squeeze(-1).cuda(local_rank).long()

                with torch.no_grad():
                    pred=model(image)

                    loss = criterion(pred,label)
                    _ , count = torch.max(pred,dim=1)
                    eval_count += (count==label).sum().item()
                    eval_loss += loss.item()
            print (f'local_rank is {local_rank},valid Loss: {eval_loss:.4f} Acc:{eval_count /len(valid_loader)/64*100:.4f}')

            model.train()
            
        train_loader.reset()
        valid_loader.reset()
        
        if local_rank==0 and epoch%10==0:
            torch.save(model.module.state_dict(),'test.pt')
def main(resume=True):
    parser = argparse.ArgumentParser(description='Train a model')
    parser.add_argument('--local_rank',type=int)
    args = parser.parse_args()
    
    n_gpu=2
    DDP.init_process_group(backend='nccl',world_size=n_gpu,rank = args.local_rank)
    model =resnet18(pretrained=True) 
    ins = model.fc.in_features
    model.fc =nn.Linear(ins,10)
    
    if resume:
        model.load_state_dict(torch.load('test.pt'))
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(),lr=0.01,weight_decay=1e-4, amsgrad=False)
    exp_lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10)
    
    #实例化训练数据的来源，生成器
    train_eii = ExternalInputIterator(256, 0, 1)
    valids_eii = ExternalInputIterator(64, 0, 1, 
                    ann_file="/kaggle/working/valid.txt",
                    img_prex='/kaggle/input/10-monkey-species/validation/validation/')

    train(args.local_rank, train_eii,valids_eii,criterion,model,optimizer,exp_lr_scheduler)
if __name__ == '__main__':
    main()
