In [4]:
import torch
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda, Compose
import matplotlib.pyplot as plt
from pica.ica import * 

In [5]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [6]:
# 訓練データをdatasetsからダウンロード
training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# テストデータをdatasetsからダウンロード
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

In [47]:
class ChebyshevDataset(Dataset):
    def __init__(self, init_vals: np.ndarray):
        self.init_vals = init_vals
        self.length = 1000
        self.delay = 3
        self.users = init_vals.shape[0]
        # users x length
        self.data = np.array([ chebyt_samples(2, i, self.length) for i in self.init_vals])
        self.mixed_data = np.ones((self.users)).T @ self.data

        self.data = torch.from_numpy(self.data).type(torch.LongTensor)
        self.mixed_data = torch.from_numpy(self.mixed_data).type(torch.float)

    def __len__(self):
        return self.length - self.delay - 1

    def __getitem__(self, idx):
        return self.mixed_data[idx: idx+self.delay], self.data[0, idx+self.delay]

train_cdl = DataLoader(ChebyshevDataset(np.array([0.1, 0.4])), batch_size=64, shuffle=True)
test_cdl = DataLoader(ChebyshevDataset(np.array([0.2, 0.3])), batch_size=64, shuffle=True)


In [42]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(3, 5), # delay
            nn.ReLU(),
            nn.Linear(5, 5),
            nn.ReLU(),
            nn.Linear(5, 1), # users
            nn.ReLU()
        )

    def forward(self, x):
        logits = self.linear_relu_stack(x)
        return logits

model = NeuralNetwork().to(device)

In [48]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        
        # 損失誤差を計算
        pred = model(X)
        loss = loss_fn(pred, y)
        
        # バックプロパゲーション
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(dataloader, model):
    size = len(dataloader.dataset)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= size
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_cdl, model, loss_fn, optimizer)
    test(test_cdl, model)
print("Done!")

Epoch 1
-------------------------------
loss: 0.000000  [    0/  996]
Test Error: 
 Accuracy: 100.0%, Avg loss: 0.000000 

Epoch 2
-------------------------------
loss: 0.000000  [    0/  996]
Test Error: 
 Accuracy: 100.0%, Avg loss: 0.000000 

Epoch 3
-------------------------------
loss: 0.000000  [    0/  996]
Test Error: 
 Accuracy: 100.0%, Avg loss: 0.000000 

Epoch 4
-------------------------------
loss: 0.000000  [    0/  996]
Test Error: 
 Accuracy: 100.0%, Avg loss: 0.000000 

Epoch 5
-------------------------------
loss: 0.000000  [    0/  996]
Test Error: 
 Accuracy: 100.0%, Avg loss: 0.000000 

Done!
