In [0]:
%spark.pyspark
sc
spark = SparkSession.builder.appName("Pytorch_test").getOrCreate()


In [1]:
%spark.pyspark

import os
os.environ["SPARK_LOCAL_DIRS"]

In [2]:
%spark.pyspark

results_dir = "/users/brenton/pyspark-ml/mnist-pytorch"


In [3]:
%spark.pyspark

import os
from pyspark.sql import SparkSession
from app import main

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
    "spark.archives",  # 'spark.yarn.dist.archives' in YARN.
    "/mnt/pyenv/mnist_pytorch.tar.gz#environment"
    #).config("spark.pyspark.virtualenv.enabled", True
    #).config("spark.pyspark.virtualenv.type", "native"
    ).getOrCreate()
#main(spark)


In [4]:
%spark.pyspark

spark.conf.get(f"spark.pyspark.virtualenv.enabled")
spark.conf.get(f"spark.pyspark.virtualenv.type")


In [5]:
%spark.pyspark

sc.install_packages("pyarrow")



In [6]:
%spark.pyspark

def init_logging(fname):
    import logging
    formatter = logging.Formatter('%(asctime)s|%(msecs)04d|%(name)s|%(levelname)s|%(message)s', datefmt='%H:%M:%S')
    handler = logging.FileHandler(fname, mode='a')
    handler.setFormatter(formatter)
    logger = logging.getLogger('Pytorch_test')
    logger.setLevel(logging.INFO)
    logger.addHandler(handler)
    
    return logger


In [7]:
%spark.pyspark

import torch
import torch.nn as nn
import torch.nn.functional as F
from pyspark.ml.torch.distributor import TorchDistributor

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

In [8]:
%spark.pyspark
from torchvision import datasets
from typing import Union, Optional, Callable, Tuple, Any
from pathlib import Path
import codecs
import numpy as np
from PIL import Image

def get_int(b: bytes) -> int:
    return int(codecs.encode(b, "hex"), 16)


SN3_PASCALVINCENT_TYPEMAP = {
    8: torch.uint8,
    9: torch.int8,
    11: torch.int16,
    12: torch.int32,
    13: torch.float32,
    14: torch.float64,
}

class LazyLoad_MNIST(datasets.VisionDataset):
    def __init__(
        self,
        root: Union[str, Path],
        train: bool = True,
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None,
        download: bool = False,
        rank: int = -1
    ) -> None:
        super().__init__(root, transform=transform, target_transform=target_transform)
        self.folder_path = root
        self.rank = int(rank)+1
        self.data, self.targets = self._load_data()

    def _load_data(self):
        image_file = str(self.rank)+"-mnist-patterns-idx3-ubyte"
        data = self.read_image_file(os.path.join(self.folder_path,image_file))

        label_file = str(self.rank)+"-mnist-labels-idx1-ubyte"
        targets = self.read_label_file(os.path.join(self.folder_path,label_file))

        return data, targets
    
    def read_sn3_pascalvincent_tensor(self, path: str, strict: bool = True) -> torch.Tensor:
        """Read a SN3 file in "Pascal Vincent" format (Lush file 'libidx/idx-io.lsh').
        Argument may be a filename, compressed filename, or file object.
        """
        # read
        with open(path, "rb") as f:
            data = f.read()
    
        # parse
        if sys.byteorder == "little":
            magic = get_int(data[0:4])
            nd = magic % 256
            ty = magic // 256
        else:
            nd = get_int(data[0:1])
            ty = get_int(data[1:2]) + get_int(data[2:3]) * 256 + get_int(data[3:4]) * 256 * 256
    
        assert 1 <= nd <= 3
        assert 8 <= ty <= 14
        torch_type = SN3_PASCALVINCENT_TYPEMAP[ty]
        s = [get_int(data[4 * (i + 1) : 4 * (i + 2)]) for i in range(nd)]
    
        if sys.byteorder == "big":
            for i in range(len(s)):
                s[i] = int.from_bytes(s[i].to_bytes(4, byteorder="little"), byteorder="big", signed=False)
    
        parsed = torch.frombuffer(bytearray(data), dtype=torch_type, offset=(4 * (nd + 1)))
    
        # The MNIST format uses the big endian byte order, while `torch.frombuffer` uses whatever the system uses. In case
        # that is little endian and the dtype has more than one byte, we need to flip them.
        if sys.byteorder == "little" and parsed.element_size() > 1:
            parsed = _flip_byte_order(parsed)
    
        assert parsed.shape[0] == np.prod(s) or not strict
        return parsed.view(*s)


    def read_label_file(self, path: str) -> torch.Tensor:
        x = self.read_sn3_pascalvincent_tensor(path, strict=False)
        if x.dtype != torch.uint8:
            raise TypeError(f"x should be of dtype torch.uint8 instead of {x.dtype}")
        if x.ndimension() != 1:
            raise ValueError(f"x should have 1 dimension instead of {x.ndimension()}")
        return x.long()


    def read_image_file(self, path: str) -> torch.Tensor:
        x = self.read_sn3_pascalvincent_tensor(path, strict=False)
        if x.dtype != torch.uint8:
            raise TypeError(f"x should be of dtype torch.uint8 instead of {x.dtype}")
        if x.ndimension() != 3:
            raise ValueError(f"x should have 3 dimension instead of {x.ndimension()}")
        return x
    
    def __getitem__(self, index: int) -> Tuple[Any, Any]:
        """
        Args:
            index (int): Index

        Returns:
            tuple: (image, target) where target is index of the target class.
        """
        img, target = self.data[index], int(self.targets[index])

        # doing this so that it is consistent with all other datasets
        # to return a PIL Image
        img = Image.fromarray(img.numpy(), mode="L")

        if self.transform is not None:
            img = self.transform(img)

        if self.target_transform is not None:
            target = self.target_transform(target)

        return img, target


    def __len__(self) -> int:
        return len(self.data)


In [9]:
%spark.pyspark
transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ])
LazyLoad_MNIST('/mnt/infimnist/mnist-split', train=True, download=False, transform=transform, rank="0")

In [10]:
%spark.pyspark

import os
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
import torch.distributed as dist

def getTrainDataset(rank):
    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ])
    
    train8m_kwargs = {'batch_size': 3125}
    trainset8m = LazyLoad_MNIST('/mnt/mnist-split/mnist-split', train=True, download=False, transform=transform, rank=rank)
    
    return trainset8m, train8m_kwargs

def getTestLoader():
    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ])
    
    test8m_kwargs = {'batch_size': 1000}
    testset8m = datasets.MNIST('/mnt/MNIST/', train=False, download=False, transform=transform)
    test_loader = torch.utils.data.DataLoader(testset8m, **test8m_kwargs)
    
    return test_loader


In [11]:
%spark.pyspark

def train(model, device, getTrainDataset, optimizer, epoch, scheduler):
    import os
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    from torchvision import datasets, transforms
    from torch.optim.lr_scheduler import StepLR
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP
    from torch.utils.data.distributed import DistributedSampler
    import logging
    import time
    import math
    import warnings
    warnings.filterwarnings("ignore")

    
    mu, sigma = 1562.5, 400 # mean and standard deviation
    sample_size = np.random.normal(mu, sigma, 3125)
    sample_size = [max(0,min(math.ceil(i),3125)) for i in sample_size]
    
    
    print("Running distributed training")
    dist.init_process_group("gloo")
    fname = results_dir+'/data_split_logs_1562_400_25/log_rank_{}.log'.format(os.environ['RANK'])
    logger = init_logging(fname)
    
    ddp_model = DDP(model)
    ddp_model.train()
    
    def _backward_hook(module, grad_input, grad_output):
        logger.info("{}|Loss Grad End|Rank {}|Epoch {}|Iteration {}".format(time.time(),os.environ['RANK'],epoch,batch_idx))
    hook_handler = ddp_model.module.conv1.register_full_backward_hook(_backward_hook)
    
    # The sampler returns a iterator over indices, which are fed into dataloader to bachify
    train_dataset, train8m_kwargs = getTrainDataset(os.environ['RANK'])
    # training_sampler = DistributedSampler(train_dataset, shuffle=True)
    # train_loader = torch.utils.data.DataLoader(train_dataset, sampler=training_sampler, **train8m_kwargs)
    train_loader = torch.utils.data.DataLoader(train_dataset, **train8m_kwargs)
    
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data[:sample_size[batch_idx]], target[:sample_size[batch_idx]]
        #time.sleep(int(os.environ['RANK']))
        #logger.info("{}|Start|Rank {}|Epoch {}|Iteration {}".format(time.time(),os.environ['RANK'],epoch,batch_idx))
        #dist.barrier()
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = ddp_model(data)
        loss = F.nll_loss(output, target)
        logger.info("{}|Loss Start|Rank {}|Epoch {}|Iteration {}".format(time.time(),os.environ['RANK'],epoch,batch_idx))
        loss.backward()
        logger.info("{}|Sync End|Rank {}|Epoch {}|Iteration {}".format(time.time(),os.environ['RANK'],epoch,batch_idx))
        optimizer.step()

        if batch_idx % 10 == 0:
            print('[Rank{}]Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(os.environ['RANK'],
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
    
    
    non_ddp_model = ddp_model.module
    hook_handler.remove()
    handlers = logger.handlers[:]
    for handler in handlers:
        logger.removeHandler(handler)
        handler.close()

    dist.destroy_process_group() #DDP anything cannot exist after this point
    return (non_ddp_model, optimizer, scheduler)


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))


In [12]:
%spark.pyspark

def dst_train(getTrainDataset, getTestLoader, num_proc):

    torch.manual_seed(1)
    device = torch.device("cpu")
    model = Net().to(device)
    
    optimizer = optim.Adadelta(model.parameters(), lr=1)
    scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
    test_ldr = getTestLoader()

    for epoch in range(1, 25 + 1):
        model, optimizer, scheduler = TorchDistributor(num_processes=num_proc, local_mode=False, use_gpu=False).run(train, model, device, getTrainDataset, optimizer, epoch, scheduler)
        test(model, device, test_ldr)
        scheduler.step()
    
    return "Finished"


Packing a virtual environment to distribute to the cluster

Many options explained here:
https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html

I'm using virtual enviromnents, because I'm familiar with that, and am not using conda or anything ATM.

python -m venv mnist_pytorch
source mnist_pytorch/bin/activate
pip install pyarrow pandas venv-pack


In [14]:
%spark.pyspark

dst_train(dataset1,test_loader,4)

Potential ways to make it faster - 
1. Have the data on the local (/mnt/) instead of NAS(/users/brenton/pyspark-ml/mnist-pytorch)
2. Each worker has 16 cores but only 1 used per machine


In [16]:
%spark.pyspark

dst_train(getTrainDataset, getTestLoader, 16)

In [17]:
%spark.pyspark

dst_train(dataset1,test_loader,3)

In [18]:
%spark.pyspark

dst_train(dataset1,test_loader,2)

In [19]:
%spark.pyspark

dst_train(dataset1,test_loader,1)

In [20]:
%spark.pyspark

output_dist = TorchDistributor(num_processes=4, local_mode=False, use_gpu=False).run(main)


In [21]:
%spark.pyspark

def dst_train_and_test(train_dataset, test_ldr):

    torch.manual_seed(1)
    device = torch.device("cpu")
    model = Net().to(device)
    
    optimizer = optim.Adadelta(model.parameters(), lr=1)
    scheduler = StepLR(optimizer, step_size=1, gamma=0.7)

    for epoch in range(1, 4 + 1):
        model, optimizer, scheduler = loadModelFromCheckpoint() if epoch > 1 else (model, optimizer, scheduler)
        TorchDistributor(num_processes=4, local_mode=False, use_gpu=False).run(train_and_test, model, device, train_dataset, optimizer, epoch,test_ldr, scheduler)
    return "Finished"

In [22]:
%spark.pyspark

dst_train_and_test(dataset1, test_loader)

In [23]:
%spark.pyspark

import matplotlib.pyplot as plt
import numpy as np

x=[1,2,3,4]
y=[40.33,24.33,18.36,15.88]
plt.scatter(x,y)

u = np.linspace(0.5, 9, 400)
v = np.linspace(0.5, 60, 400)
u, v = np.meshgrid(u, v)
plt.contour(u, v, (u*(v-5)), [40], colors='k') #There seems to be a 5sec constant added to the total times, presumably because of test step (which takes place in one machine only)

plt.show()

# Task Wait Times -  
Sync_End[i+1] - Loss_End[i]

## Data Multi Split Wait Times

In [26]:
%spark.pyspark

logs_arr = []
for i in range(16):
    logfile =  open(results_dir+"/data_split_logs/log_rank_{}.log".format(i), "r")
    lines = logfile.readlines()
    logs = []
    for line in lines:
        logs.append(line.split('|'))
    logfile.close()
    logs_arr.append(list(reversed(logs)))

datapoints = []
for logs in logs_arr:
    i = 0
    while i<len(logs):
        if logs[i][5]=="Sync End":
            datapoints.append(float(logs[i][4])-float(logs[i+1][4]))
        i+=1
len(datapoints)
import matplotlib.pyplot as plt
import numpy as np

# plt.style.use('_mpl-gallery')

# make data:
y = datapoints
n = len(y)
x = 0.5+np.arange(n)

plt.figure(figsize=(20,5))
plt.plot(x,y, '-o')
plt.show()

plt.figure(figsize=(20, 5))
plt.hist(y, bins=50)
plt.show()

plt.figure(figsize=(20, 5))
plt.hist(y, bins=50, log=True)
plt.show()

## Norm Dist of Data - Wait times

In [28]:
%spark.pyspark

logs_arr = []
for i in range(16):
    logfile =  open(results_dir+"/norm_dist_logs/log_rank_{}.log".format(i), "r")
    lines = logfile.readlines()
    logs = []
    for line in lines:
        logs.append(line.split('|'))
    logfile.close()
    logs_arr.append(list(reversed(logs)))

datapoints = []
print(logs_arr[0][0])

def is_same_epoch(log1, log2):
    log1_epoch = int((log1[7].split(" "))[1])
    log2_epoch = int((log2[7].split(" "))[1])
    return log1_epoch==log2_epoch

for logs in logs_arr:
    i = 0
    while i<len(logs):
        if logs[i][5]=="Sync End" and is_same_epoch(logs[i], logs[i+1]):
            datapoints.append(float(logs[i][4])-float(logs[i+1][4]))
        i+=1
len(datapoints)
import matplotlib.pyplot as plt
import numpy as np

# plt.style.use('_mpl-gallery')

# make data:
y = datapoints
n = len(y)
x = 0.5+np.arange(n)

plt.figure(figsize=(20,5))
plt.plot(x,y, '-o')
plt.xlabel("data points")
plt.ylabel("Time in seconds")
plt.show()


plt.figure(figsize=(20, 5))
plt.hist(y, bins=50)
plt.show()

plt.figure(figsize=(20, 5))
plt.hist(y, bins=50, log=True)
plt.show()


## Norm Dist of data (Mu-512, SD-100, Epoch-25)- Wait times

In [30]:
%spark.pyspark

logs_arr = []
for i in range(16):
    logfile =  open(results_dir+"/data_split_logs_512_100_25/log_rank_{}.log".format(i), "r")
    lines = logfile.readlines()
    logs = []
    for line in lines:
        logs.append(line.split('|'))
    logfile.close()
    logs_arr.append(list(reversed(logs)))

datapoints = []
print(logs_arr[0][0])

def is_same_epoch(log1, log2):
    log1_epoch = int((log1[7].split(" "))[1])
    log2_epoch = int((log2[7].split(" "))[1])
    return log1_epoch==log2_epoch

for logs in logs_arr:
    i = 0
    while i<len(logs):
        if logs[i][5]=="Sync End" and is_same_epoch(logs[i], logs[i+1]):
            datapoints.append(float(logs[i][4])-float(logs[i+1][4]))
        i+=1
len(datapoints)
import matplotlib.pyplot as plt
import numpy as np

# plt.style.use('_mpl-gallery')

# make data:
y = datapoints
n = len(y)
x = 0.5+np.arange(n)

plt.figure(figsize=(20,5))
plt.plot(x,y, '-o')
plt.xlabel("data points")
plt.ylabel("Time in seconds")
plt.show()


plt.figure(figsize=(20, 5))
plt.hist(y, bins=250)
plt.show()

plt.figure(figsize=(20, 5))
plt.hist(y, bins=50, log=True)
plt.show()


## Norm Dist of data (Mu-1562, SD-400, Epoch-25)- Wait times

In [32]:
%spark.pyspark

logs_arr = []
for i in range(16):
    logfile =  open(results_dir+"/data_split_logs_1562_400_25/log_rank_{}.log".format(i), "r")
    lines = logfile.readlines()
    logs = []
    for line in lines:
        logs.append(line.split('|'))
    logfile.close()
    logs_arr.append(list(reversed(logs)))

datapoints = []
print(logs_arr[0][0])

def is_same_epoch(log1, log2):
    log1_epoch = int((log1[7].split(" "))[1])
    log2_epoch = int((log2[7].split(" "))[1])
    return log1_epoch==log2_epoch

for logs in logs_arr:
    i = 0
    while i<len(logs):
        if logs[i][5]=="Sync End" and is_same_epoch(logs[i], logs[i+1]):
            datapoints.append(float(logs[i][4])-float(logs[i+1][4]))
        i+=1
len(datapoints)
import matplotlib.pyplot as plt
import numpy as np

# plt.style.use('_mpl-gallery')

# make data:
y = datapoints
n = len(y)
x = 0.5+np.arange(n)

plt.figure(figsize=(20,5))
plt.plot(x,y, '-o')
plt.xlabel("data points")
plt.ylabel("Time in seconds")
plt.show()


plt.figure(figsize=(20, 5))
plt.hist(y, bins=500)
plt.show()

plt.figure(figsize=(20, 5))
plt.hist(y, bins=50, log=True)
plt.show()


# Task Service Times -  
Loss_Grad_End[i+1] - Sync_End[i]

## Norm Dist of data - Service times

In [35]:
%spark.pyspark

logs_arr = []
for i in range(16):
    logfile =  open(results_dir+"/norm_dist_logs/log_rank_{}.log".format(i), "r")
    lines = logfile.readlines()
    logs = []
    for line in lines:
        logs.append(line.split('|'))
    logfile.close()
    logs_arr.append(list(reversed(logs)))

datapoints = []

def is_same_epoch(log1, log2):
    log1_epoch = int((log1[7].split(" "))[1])
    log2_epoch = int((log2[7].split(" "))[1])
    return log1_epoch==log2_epoch

for logs in logs_arr:
    i = 0
    while i<len(logs)-2:
        if logs[i][5]=="Loss Grad End" and is_same_epoch(logs[i], logs[i+2]):
            datapoints.append(float(logs[i][4])-float(logs[i+2][4]))
        i+=1

import matplotlib.pyplot as plt
import numpy as np

# plt.style.use('_mpl-gallery')

# make data:
y = [i for i in datapoints if i<100] # Removing some outliers
n = len(y)
x = 0.5+np.arange(n)

plt.figure(figsize=(20,5))
plt.plot(x,y, '-o')
plt.xlabel("data points")
plt.ylabel("Time in seconds")
plt.show()

plt.figure(figsize=(20, 5))
plt.hist(y, bins=50)
plt.show()

plt.figure(figsize=(20, 5))
plt.hist(y, bins=50, log=True)
plt.show()

## Norm Dist of data (Mu-512, SD-100, Epoch-25)- Service times

In [37]:
%spark.pyspark

logs_arr = []
for i in range(16):
    logfile =  open(results_dir+"/data_split_logs_512_100_25/log_rank_{}.log".format(i), "r")
    lines = logfile.readlines()
    logs = []
    for line in lines:
        logs.append(line.split('|'))
    logfile.close()
    logs_arr.append(list(reversed(logs)))

datapoints = []

def is_same_epoch(log1, log2):
    log1_epoch = int((log1[7].split(" "))[1])
    log2_epoch = int((log2[7].split(" "))[1])
    return log1_epoch==log2_epoch

for logs in logs_arr:
    i = 0
    while i<len(logs)-2:
        if logs[i][5]=="Loss Grad End" and is_same_epoch(logs[i], logs[i+2]):
            datapoints.append(float(logs[i][4])-float(logs[i+2][4]))
        i+=1

import matplotlib.pyplot as plt
import numpy as np

# plt.style.use('_mpl-gallery')

# make data:
# y = [i for i in datapoints if i<100] # Removing some outliers
y = datapoints
n = len(y)
x = 0.5+np.arange(n)

plt.figure(figsize=(20,5))
plt.plot(x,y, '-o')
plt.xlabel("data points")
plt.ylabel("Time in seconds")
plt.show()

plt.figure(figsize=(20, 5))
plt.hist(y, bins=500)
plt.show()

plt.figure(figsize=(20, 5))
plt.hist(y, bins=50, log=True)
plt.show()

## Norm Dist of data (Mu-1562.5, SD-400, Epoch-25)- Service times

In [39]:
%spark.pyspark

logs_arr = []
for i in range(16):
    logfile =  open(results_dir+"/data_split_logs_1562_400_25/log_rank_{}.log".format(i), "r")
    lines = logfile.readlines()
    logs = []
    for line in lines:
        logs.append(line.split('|'))
    logfile.close()
    logs_arr.append(list(reversed(logs)))

datapoints = []

def is_same_epoch(log1, log2):
    log1_epoch = int((log1[7].split(" "))[1])
    log2_epoch = int((log2[7].split(" "))[1])
    return log1_epoch==log2_epoch

for logs in logs_arr:
    i = 0
    while i<len(logs)-2:
        if logs[i][5]=="Loss Grad End" and is_same_epoch(logs[i], logs[i+2]):
            datapoints.append(float(logs[i][4])-float(logs[i+2][4]))
        i+=1

import matplotlib.pyplot as plt
import numpy as np

# plt.style.use('_mpl-gallery')

# make data:
# y = [i for i in datapoints if i<100] # Removing some outliers
y = datapoints
n = len(y)
x = 0.5+np.arange(n)

plt.figure(figsize=(20,5))
plt.plot(x,y, '-o')
plt.xlabel("data points")
plt.ylabel("Time in seconds")
plt.show()

plt.figure(figsize=(20, 5))
plt.hist(y, bins=500)
plt.show()

plt.figure(figsize=(20, 5))
plt.hist(y, bins=50, log=True)
plt.show()

In [40]:
%spark.pyspark
#Infimnist Data Gen Script
for i in range(1,33):
    start = str(i*100000+10000)
    end = str((i+1)*100000+10000-1)
    print("./infimnist lab "+start+" "+end+" > mnist-split/"+str(i)+"-mnist-labels-idx1-ubyte")
    print("./infimnist pat "+start+" "+end+" > mnist-split/"+str(i)+"-mnist-patterns-idx3-ubyte")

In [41]:
%spark.pyspark
# Testing different Distributions
import math
mu, sigma = 1562.5, 400 # mean and standard deviation
s = np.random.normal(mu, sigma, 3125)
s = [max(0,min(math.ceil(i),3125)) for i in s]
plt.figure(figsize=(20,5))
plt.hist(s, bins=200)
plt.show()

In [42]:
%spark.pyspark
