In [1]:
import torch
import pandas as pd
from torch import nn
from torch import optim
from torch.utils.data import Dataset, DataLoader, random_split

In [2]:
class CustomDataset(Dataset):
    def __init__(self, file_path):
        df = pd.read_csv(file_path)
        self.x1 = df.iloc[:, 0].values
        self.x2 = df.iloc[:, 1].values
        self.y = df.iloc[:, 2].values
        self.length = len(df)

    def __getitem__(self, index):
        x = torch.FloatTensor([self.x1[index], self.x2[index]])
        y = torch.FloatTensor([self.y[index]])
        return x, y

    def __len__(self):
        return self.length

class CustomModel(nn.Module):
    def __init__(self):
        super().__init__()

        self.layer1 = nn.Sequential(
            nn.Linear(2, 2),
            nn.Sigmoid()
        )
        self.layer2 = nn.Sequential(
            nn.Linear(2, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        return x

In [3]:
dataset = CustomDataset("./perceptron.csv")
train_dataset, test_dataset = random_split(dataset, [int(len(dataset)*0.8), int(len(dataset)*0.2)], torch.manual_seed(4))
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True, drop_last=True)

In [4]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device = "mps" if torch.backends.mps.is_available() else "cpu"
model = CustomModel().to(device)
criterion = nn.BCELoss().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01)

In [5]:
def train(model, train_dataloader, optimizer, criterion, device, epoch):
    model.train()
    cost = 0
    for x, y in train_dataloader:
        x = x.to(device)
        y = y.to(device)

        output = model(x)
        loss = criterion(output, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        cost += loss

    cost = cost / len(train_dataloader)

    if (epoch + 1) % 1000 == 0:
        print(f"Epoch : {epoch+1:4d}, Cost : {cost:.3f}")

def evaluate(model, test_dataloader, criterion, device):
    model.eval()  
    cost = 0

    with torch.no_grad():  
        for x, y in test_dataloader:
            x = x.to(device)
            y = y.to(device)

            output = model(x)
            loss = criterion(output, y)
            cost += loss

    cost /= len(test_dataloader)
    return cost


In [6]:
for epoch in range(10000):
    train(model, train_dataloader, optimizer, criterion, device, epoch)
    
    if (epoch + 1) % 1000 == 0:
        test_loss = evaluate(model, test_dataloader, criterion, device)
        print("\n[EPOCH: {}], Test cost: {:.4f}\n".format(epoch + 1, test_loss))


  x = torch.FloatTensor([self.x1[index], self.x2[index]])
  y = torch.FloatTensor([self.y[index]])


Epoch : 1000, Cost : 0.690

[EPOCH: 1000], Test cost: 0.6908

Epoch : 2000, Cost : 0.606

[EPOCH: 2000], Test cost: 0.6064

Epoch : 3000, Cost : 0.501

[EPOCH: 3000], Test cost: 0.5069

Epoch : 4000, Cost : 0.444

[EPOCH: 4000], Test cost: 0.4592

Epoch : 5000, Cost : 0.192

[EPOCH: 5000], Test cost: 0.1934

Epoch : 6000, Cost : 0.069

[EPOCH: 6000], Test cost: 0.0697

Epoch : 7000, Cost : 0.039

[EPOCH: 7000], Test cost: 0.0397

Epoch : 8000, Cost : 0.027

[EPOCH: 8000], Test cost: 0.0272

Epoch : 9000, Cost : 0.021

[EPOCH: 9000], Test cost: 0.0205

Epoch : 10000, Cost : 0.016

[EPOCH: 10000], Test cost: 0.0165



In [7]:
torch.save(model, 'model.pth')

In [8]:
model = torch.load('model.pth')