In [None]:
import os
import torch, torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
from loadCIFAR import *
from model import *

import warnings
warnings.filterwarnings('ignore')
%load_ext memory_profiler

In [None]:
print("torch version: ", torch.__version__)
print("torchvision version: ", torchvision.__version__)

In [None]:
batch_size = 4
transform = transforms.Compose(
    [transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

train_list = ["data_batch_1", "data_batch_2", "data_batch_3", "data_batch_4", "data_batch_5"]
test_list = ["test_batch"]
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [None]:
!du -sh ./data/cifar-10-batches-py/*_batch*

# Explore pytorch Dataset and DataLoader memory usage

In [None]:
%memit torchvision.datasets.CIFAR10(root='./data', train=False, download=False, transform=transform)

In [None]:
%memit torchvision.datasets.CIFAR10(root='./data', train=True, download=False, transform=transform)

In [None]:
%mprun -f load_cifar trainloader, testloader = load_cifar(batch_size)

In [None]:
# functions to show an image
def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

# get some random training images
%memit dataiter = iter(trainloader)
images, labels = dataiter.next()

# show images
imshow(torchvision.utils.make_grid(images))
# print labels
print(' '.join(f'{classes[labels[j]]:5s}' for j in range(batch_size)))

# Baseline: Load CIFAR10 from local disk

In [None]:
%%timeit -r 3
net = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

for epoch in range(2):  # loop over the dataset multiple times
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 2000 == 1999:    # print every 2000 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
            running_loss = 0.0

print('Finished Training')

In [None]:
evaluate(net, testloader)

# Exp 1: Load data from S3

In [None]:
import boto3

bucket = 'zhuangwei-bucket'
client = boto3.client('s3')
s3 = boto3.resource('s3')
bucket_obj = s3.Bucket(bucket)

In [None]:
# create the bucket if not exist
bnames = [item['Name'] for item in client.list_buckets()['Buckets']]
if bucket not in bnames:
    response = client.create_bucket(Bucket=bucket)
    print(response)

In [None]:
def uploadDirectory(path, bucketname):
    for root,dirs,files in os.walk(path):
        for file in files:
            client.upload_file(os.path.join(root,file),bucketname,file)
uploadDirectory('./data', bucket)

In [None]:
for obj in bucket_obj.objects.all():
    key = obj.key
    if 'data_batch' in key:
        # operation: directly load data to memory 
        print('reading %s, size: %fMB' % (key, obj.size/1024/1024))
        %timeit body = obj.get()['Body'].read()
        print('-------------')

In [None]:
%%timeit -r 3

import torch, torchvision
from datasets.S3CIFAR10 import S3CIFAR10
import torchvision.transforms as transforms

net = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

trainset = S3CIFAR10(bucket_obj.Object('batches.meta'), transform)
for epoch in range(2):  # loop over the dataset multiple times
    running_loss = 0.0
    chunk_index = 0
    for obj in bucket_obj.objects.all():
        key = obj.key
        if key not in train_list: continue
        trainset.load_data(obj)
        trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)
        for i, data in enumerate(trainloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 2000 == 1999:    # print every 2000 mini-batches
                print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
                running_loss = 0.0

print('Finished Training')
PATH = './cifar_net.pth'
torch.save(net.state_dict(), PATH)

In [None]:
testset = S3CIFAR10(bucket_obj.Object('batches.meta'), transform)
testset.load_data(bucket_obj.Object(test_list[0]))
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

In [None]:
net = Net()
net.load_state_dict(torch.load(PATH))

In [None]:
evaluate(net, testloader)

In [None]:
keys = []
for obj in bucket_obj.objects.all():
    key = obj.key
    if 'data_batch' in key:
        # operation: download file then load data to memory 
        print('downloading %s, size: %fMB' % (key, obj.size/1024/1024))
        %timeit bucket_obj.download_file(key, './temp/%s' % key)
        print('-------------')

# Exp 2. load data from Hazelcast cluster

In [None]:
import hazelcast as hz
from hazelcast.config import InMemoryFormat, EvictionPolicy

# Start the Hazelcast Client and connect to an already running Hazelcast Cluster on 127.0.0.1
hz_client = hz.HazelcastClient(
    cluster_name="dev",
    cluster_members=[
        "10.145.41.32:5701",
        "10.145.41.33:5701"
    ],
    use_public_ip=True,
    smart_routing=True,
    client_name='hz.client_0',
    lifecycle_listeners=[
        lambda state: print("Lifecycle event >>>", state),
    ],
    connection_timeout=30,
    # near_caches={
    #     "mostly-read-map": {
    #         "invalidate_on_change": True,
    #         "in_memory_format": InMemoryFormat.OBJECT,
    #         "eviction_policy": EvictionPolicy.LRU,
    #         "eviction_max_size": 5000,
    #     }
    # }
)

In [None]:
# Get the Distributed Map from Cluster.
asyc = True
if asyc:
    my_map = hz_client.get_map("cifar10-map")
else:
    my_map = hz_client.get_map("cifar10-map").blocking()

In [None]:
# %%time
# Standard Put and Get
def put_callback(f):
    print("map.put", f.result())
for obj in bucket_obj.objects.all():
    key = obj.key
    # operation: directly load data to memory 
    body = obj.get()['Body'].read().decode('latin1')
    my_map.put(key, body).add_done_callback(put_callback)

In [None]:
my_rep_map = hz_client.get_replicated_map('cifar10-rep-map')
def put_callback(f):
    print("map.put", f.result())
for obj in bucket_obj.objects.all():
    key = obj.key
    # operation: directly load data to memory 
    body = obj.get()['Body'].read().decode('latin1')
    my_rep_map.put(key, body).add_done_callback(put_callback)

In [None]:
# Shutdown this Hazelcast Client
hz_client.shutdown()

In [None]:
%%timeit -r 3

import torch, torchvision
from datasets.HZCIFAR10 import HZCIFAR10
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

net = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

trainset = None
for epoch in range(2):  # loop over the dataset multiple times
    running_loss = 0.0
    chunk_index = 0
    for chunk_index in range(len(train_list)):
        if trainset is None:
            trainset = HZCIFAR10(my_map, transform)
        trainset.load_data(train_list[chunk_index])
        trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)
        for i, data in enumerate(trainloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 2000 == 1999:    # print every 2000 mini-batches
                print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
                running_loss = 0.0
    np.random.shuffle(train_list)

print('Finished Training')
PATH = './cifar_net.pth'
torch.save(net.state_dict(), PATH)

In [None]:
from datasets.HZCIFAR10 import HZCIFAR10
testset = HZCIFAR10(hz_obj=my_map, transform=transform, asyc=asyc)
testset.load_data(test_list[0])
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

In [None]:
net = Net()
net.load_state_dict(torch.load(PATH))

In [None]:
evaluate(net, testloader)

# Exp 3. Load data from Redis cluster

In [None]:
import redis
client = redis.RedisCluster(host="10.145.41.33", port=30007)

In [None]:
%%timeit -r 3

import torch, torchvision
from datasets.RedisCIFAR10 import RedisCIFAR10
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

net = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

trainset = None
for epoch in range(2):  # loop over the dataset multiple times
    running_loss = 0.0
    chunk_index = 0
    for chunk_index in range(len(train_list)):
        if trainset is None:
            trainset = RedisCIFAR10(client, transform)
        trainset.load_data(train_list[chunk_index])
        trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)
        for i, data in enumerate(trainloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 2000 == 1999:    # print every 2000 mini-batches
                print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
                running_loss = 0.0
    np.random.shuffle(train_list)

print('Finished Training')
PATH = './cifar_net.pth'
torch.save(net.state_dict(), PATH)

In [None]:
testset = RedisCIFAR10(client=client, transform=transform)
testset.load_data(test_list[0])
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

In [None]:
net = Net()
net.load_state_dict(torch.load('./cifar_net.pth'))

In [None]:
evaluate(net, testloader)

# Others

In [None]:
%%timeit

# load from disk
torchvision.datasets.CIFAR10(root='./data', train=True, download=False, transform=transform)

In [None]:
%%timeit

trainset = S3CIFAR10(bucket_obj.Object('batches.meta'), transform)
for obj in bucket_obj.objects.all():
    key = obj.key
    if key not in train_list: continue
    trainset.load_data(obj)
    time.sleep(5)

In [None]:
%%timeit

trainset = None
for chunk_index in range(len(train_list)):
    if trainset is None:
        trainset = HZCIFAR10(my_map, transform, asyc=asyc)
    trainset.load_data(train_list[chunk_index])