# Generate json flie

In [3]:
import json

input_list = list(range(150))
dict_json_data = {'input_data': input_list}


with open('input_data.json', 'w') as f:
    json_object = json.dumps(dict_json_data)
    f.write(json_object)

# Torch train

## Create dataset

In [60]:
from torch.utils.data import Dataset
import numpy as np

class DatasetGenerator(Dataset):
    def __init__(self, data:dict):
        self.x = data["input_data"]
        self.y = data["output_data"]
        
        self.x = np.array(self.x, dtype=np.float32).reshape(-1, 1)
        self.y = np.array(self.y, dtype=np.float32).reshape(-1, 1)
        
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, index):
        return self.x[index], self.y[index]

In [80]:
from torch.utils.data import DataLoader
from torchvision.transforms import ToTensor
from torchvision import datasets
from torch import nn
import python_pachyderm
import json

def get_data():
    client_pachyderm = python_pachyderm.Client()
    
    dict_input = {"project": "pach_ray",
                 "repo": "data_preprocessing",
                 "branch": "master"}
    
    data_binary = client_pachyderm.get_file(dict_input, "output_data.json")
    
    str_json = data_binary.read()
    str_json = str_json.decode("utf-8")
    dict_data = json.loads(str_json)

    return DatasetGenerator(dict_data)

class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear_model = nn.Sequential(
            nn.Linear(1, 2),
            nn.SELU(),
            nn.Linear(2,1)
        )
        
    def forward(self, inputs):
        return self.linear_model(inputs)

In [115]:
from torch.optim import SGD

LEARNINIG_RATE = 2 * 1e-6
EPOCHES = 40
BATCH_SIZE = 150

def train_model():
    dataset_train = get_data()
    dataloader = DataLoader(dataset_train, batch_size=BATCH_SIZE)
    model = MyModel()
    
    criterion = nn.MSELoss()
    optimizer = SGD(model.parameters(), lr=LEARNINIG_RATE)

    for epoch in range(EPOCHES):
        for inputs, labels in dataloader:
            optimizer.zero_grad()
            pred = model(inputs)
            loss = criterion(pred, labels)
            loss.backward()
            optimizer.step()
        print(f"epoch:\t{epoch}\tloss:\t{loss.item()}")

In [116]:
train_model()

epoch:	0	loss:	477928.625
epoch:	1	loss:	470770.5625
epoch:	2	loss:	463455.03125
epoch:	3	loss:	454283.625
epoch:	4	loss:	441476.40625
epoch:	5	loss:	422921.15625
epoch:	6	loss:	396105.40625
epoch:	7	loss:	358392.9375
epoch:	8	loss:	307926.84375
epoch:	9	loss:	245363.46875
epoch:	10	loss:	175913.0625
epoch:	11	loss:	109653.890625
epoch:	12	loss:	57618.73828125
epoch:	13	loss:	25123.4765625
epoch:	14	loss:	9177.46875
epoch:	15	loss:	2904.857177734375
epoch:	16	loss:	832.61328125
epoch:	17	loss:	224.9871826171875
epoch:	18	loss:	59.271053314208984
epoch:	19	loss:	15.869817733764648
epoch:	20	loss:	4.745353698730469
epoch:	21	loss:	1.925251841545105
epoch:	22	loss:	1.2144887447357178
epoch:	23	loss:	1.0358177423477173
epoch:	24	loss:	0.9909423589706421
epoch:	25	loss:	0.9796741604804993
epoch:	26	loss:	0.9768619537353516
epoch:	27	loss:	0.97613525390625
epoch:	28	loss:	0.9759318828582764
epoch:	29	loss:	0.9758701324462891
epoch:	30	loss:	0.9758339524269104
epoch:	31	loss:	0.97580462694168

In [107]:
from ray.train import torch as torch_ray
from ray.air import session, Checkpoint

def train_model_distributed():
    
    dataset_train = get_data()
    dataloader = DataLoader(dataset_train, batch_size=BATCH_SIZE)
    model = MyModel()
    
    # formatiing to ray train
    dataloader = torch_ray.prepare_data_loader(dataloader)
    model = torch_ray.prepare_model(model)
    # -------------------------
    criterion = nn.MSELoss()
    optimizer = SGD(model.parameters(), lr=LEARNINIG_RATE)

    for epoch in range(EPOCHES):
        for inputs, labels in dataloader:
            optimizer.zero_grad()
            pred = model(inputs)
            loss = criterion(pred, labels)
            loss.backward()
            optimizer.step()
        # -------------
        session.report({"loss": loss.item(), "epoch": epoch},
                      checkpoint=Checkpoint.from_dict(
                      dict(epoch=epoch, model_state=model.state_dict())))
        # --------------
        
        print(f"epoch:\t{epoch}\tloss:\t{loss.item()}")

In [117]:
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig

use_gpu = False

trainer = TorchTrainer(train_model_distributed,
                      scaling_config=ScalingConfig(num_workers=5, use_gpu=use_gpu))

results = trainer.fit()

0,1
Current time:,2023-07-10 12:09:12
Running for:,00:00:13.32
Memory:,13.1/31.1 GiB

Trial name,status,loc,iter,total time (s),loss,epoch
TorchTrainer_67a58_00000,TERMINATED,192.168.20.185:1668740,40,8.63318,0.10257,39


[2m[36m(TorchTrainer pid=1668740)[0m 2023-07-10 12:09:03,514	INFO backend_executor.py:137 -- Starting distributed worker processes: ['1668866 (192.168.20.185)', '1668867 (192.168.20.185)', '1668875 (192.168.20.185)', '1668877 (192.168.20.185)', '1668878 (192.168.20.185)']
[2m[36m(RayTrainWorker pid=1668866)[0m 2023-07-10 12:09:05,065	INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=5]


[2m[36m(RayTrainWorker pid=1668875)[0m epoch:	0	loss:	463995.34375[32m [repeated 200x across cluster][0m


[2m[36m(RayTrainWorker pid=1668866)[0m 2023-07-10 12:09:06,171	INFO train_loop_utils.py:286 -- Moving model to device: cpu
[2m[36m(RayTrainWorker pid=1668866)[0m 2023-07-10 12:09:06,171	INFO train_loop_utils.py:346 -- Wrapping provided model in DistributedDataParallel.


Trial name,date,done,epoch,experiment_tag,hostname,iterations_since_restore,loss,node_ip,pid,should_checkpoint,time_since_restore,time_this_iter_s,time_total_s,timestamp,training_iteration,trial_id
TorchTrainer_67a58_00000,2023-07-10_12-09-10,True,39,0,kbp1-lhp-a11064,40,0.10257,192.168.20.185,1668740,True,8.63318,0.0825279,8.63318,1688980150,40,67a58_00000


2023-07-10 12:09:12,268	INFO tune.py:1111 -- Total run time: 13.33 seconds (13.31 seconds for the tuning loop).


In [118]:
results.metrics

{'loss': 0.10256955772638321,
 'epoch': 39,
 'timestamp': 1688980150,
 'time_this_iter_s': 0.08252787590026855,
 'should_checkpoint': True,
 'done': True,
 'training_iteration': 40,
 'trial_id': '67a58_00000',
 'date': '2023-07-10_12-09-10',
 'time_total_s': 8.633183479309082,
 'pid': 1668740,
 'hostname': 'kbp1-lhp-a11064',
 'node_ip': '192.168.20.185',
 'config': {},
 'time_since_restore': 8.633183479309082,
 'iterations_since_restore': 40,
 'experiment_tag': '0'}