In [1]:
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [2]:
df = pd.read_csv('data/wine_data.csv')
features = df.drop('Class', axis=1)
labels = df[['Class']]

In [3]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=0)

Xtrain_ = torch.from_numpy(X_train.values).float()
Xtest_ = torch.from_numpy(X_test.values).float()
ytrain_ = torch.from_numpy(y_train.values).long().view(1, -1)[0]
ytest_ = torch.from_numpy(y_test.values).long().view(1, -1)[0]

input_size = 13
output_size = 3
hidden_size = 100

class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
    

    def forward(self, X):
        X = torch.sigmoid(self.fc1(X))
        X = torch.sigmoid(self.fc2(X))
        X = self.fc3(X)

        return F.log_softmax(X, dim=-1)

In [4]:
# The train function will be run in parallel across the workers and will use data placed in the shared memory
def train(model, Xtrain_, ytrain_, optimizer):
    model.train()
    for epoch in range(1, 1001):

        pid = os.getpid() # get the process id

        optimizer.zero_grad()
        ypred = model(Xtrain_)

        loss = nn.NLLLoss()(ypred, ytrain_)
        loss.backward()

        optimizer.step()

        if epoch % 100 == 0:
            print('Process: {} | Epoch: {} | Loss: {}'.format(pid, epoch, loss.item()))

In [5]:
import torch.multiprocessing as mp
mp.get_all_sharing_strategies()
# If we're using GPU, sharing GPU tensors use the CUDA API

{'file_system'}

In [6]:
mp.get_sharing_strategy() # default is 'file_system'

'file_system'

In [7]:
if __name__ == '__main__':
    mp.set_start_method('spawn')
    # 'spawn' starts a fresh python interpreter.Child process inherits only some resources 
    # 'fork' strategy =  Forks the Python interpreter. Child process is identical to the parent process, shares resources of the parent process
    
    Xtrain_.share_memory_()
    ytrain_.share_memory_()
    Xtest_.share_memory_()
    ytest_.share_memory_()

    model = Net()
    model.share_memory()

    optimizer = optim.Adam(model.parameters(), lr=0.001)

    processes = []

    for rank in range(4):

        p = mp.Process(target=train, args=(model, Xtrain_, ytrain_, optimizer))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()
    
    if (processes[0].is_alive() == False):
        model.eval()

        with torch.no_grad():
            predict_out = model(Xtest_)
            _, predict_y = torch.max(predict_out, 1)

            print('\n')
            print(f'Accuracy: {accuracy_score(ytest_.data, predict_y.data):.2f}')
            print(f'Precision: {precision_score(ytest_.data, predict_y.data, average="micro"):.2f}')
            print(f'Recall: {recall_score(ytest_.data, predict_y.data, average="micro"):.2f}')
            print(f'F1 Score: {f1_score(ytest_.data, predict_y.data, average="micro"):.2f}')



Accuracy: 0.44
Precision: 0.44
Recall: 0.44
F1 Score: 0.44
