# Step-by-step: Write your own neural networks with `pytorch`

In [None]:
# load data
import numpy as np
import joblib
flux, flux_err,labels = joblib.load("data.bz2")
flux[~np.isfinite(flux)] = 0
flux[flux > 2] = 2
flux[flux < 0] = 0
labels[:,0] /= 1000

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(5, 4))
plt.scatter(labels[:, 0], labels[:, 1], s=10, c=labels[:, 2], cmap=plt.cm.jet, edgecolor="k", lw=.1)
plt.colorbar()
plt.xlim(12, 3.5)
plt.ylim(5.5, 0.5)
plt.xlabel("Teff/K")
plt.ylabel("logg/dex")

In [None]:
import torch
import torch.nn as nn

In [None]:
# convert to tensor
flux_tensor = torch.from_numpy(flux.astype(np.float32))
flux_err_tensor = torch.from_numpy(flux_err.astype(np.float32))
labels_tensor = torch.from_numpy(labels.astype(np.float32))
flux_tensor.size(), labels_tensor.size(), flux_tensor.dtype, labels_tensor.dtype

## data loader

In [None]:
def data_loader(*args, f_train=0.7, batch_size=50, train=True):
    n_data = len(args[0])
    n_train = int(n_data * f_train)
    
    # shuffle data
    index = np.arange(0, n_data, dtype=int)
    # np.random.seed(0)
    np.random.shuffle(index)
        
    if train:
        # get training set
        for i in range(0, n_train, batch_size):
            yield (_[index[i:min(i + batch_size, n_train)]] for _ in args)
    else:
        # get test set
        for i in range(n_train, n_data, batch_size):
            yield (_[index[i:min(i + batch_size, n_data)]] for _ in args)

print("get training set")    
for x, y in data_loader(flux_tensor, labels_tensor, train=True):
    print(x.size(), y.size())
    
print("get training set")    
for x, y in data_loader(flux_tensor, labels_tensor, train=False):
    print(x.size(), y.size())

In [None]:
# constants
NPIX = 1500
NLBL = 3
BS = 50

## 1. Backward model (Multi-Layer Perceptron)

$(T_\mathrm{eff}, \log{g}, \mathrm{[Fe/H]}) = f(F_\lambda)$

In [None]:
mlp = nn.Sequential(
    nn.BatchNorm1d(num_features=NPIX),
    nn.Linear(in_features=NPIX, out_features=100),
    nn.BatchNorm1d(num_features=100),
    nn.Tanh(),
    nn.Linear(in_features=100, out_features=3),
)
mlp

# train the model

In [None]:
def train(data, model, lr=1e-4, n_epoch=100, batch_size=50, step=10):
    training_loss_history = []
    test_loss_history = []
    
    loss_fn = nn.L1Loss()
    optimizer = torch.optim.RAdam(model.parameters(), lr=1e-4)
    
    for i_epoch in range(n_epoch):
        # in each epoch
        training_data_counts = 0
        training_loss_value = 0
        for batch_X, batch_y in data_loader(*data, batch_size=batch_size, train=True):
            # for each batch
            model.zero_grad()
            batch_y_pred = model(batch_X)
            batch_loss = loss_fn(batch_y_pred, batch_y)
            training_data_counts += len(batch_X)
            training_loss_value += batch_loss.detach() * training_data_counts
            # print(batch_loss.detach())
            batch_loss.backward()
            optimizer.step()
            #print(batch_loss)
        training_loss_history.append(training_loss_value / training_data_counts)
        
        with torch.no_grad():
            test_data_counts = 0
            test_loss_value = 0
            for batch_X, batch_y in data_loader(*data, batch_size=batch_size, train=False):
                batch_y_pred = model(batch_X)
                batch_loss = loss_fn(batch_y_pred, batch_y)
                test_data_counts += len(batch_X)
                test_loss_value += batch_loss.detach() * test_data_counts
            test_loss_history.append(test_loss_value / test_data_counts)
        if i_epoch % step == 0:
            print(f"Epoch {i_epoch:05d}: training loss = {training_loss_history[-1]}, test loss = {test_loss_history[-1]}")
    plt.figure(figsize=(6, 4))
    plt.plot(np.log10(training_loss_history), label="training loss")
    plt.plot(np.log10(test_loss_history), label="test loss")
    plt.legend()
    plt.xlabel("Epoch")
    plt.ylabel("log10(loss)")
    return training_loss_history, test_loss_history

In [None]:
train_loss, test_loss = train(data=(flux_tensor, labels_tensor), model=mlp, n_epoch=500, step=10)

In [None]:
# try a spectrum
mlp.eval()
flux_tensor[:5], mlp(flux_tensor[:5]), labels_tensor[:5]

In [None]:
def compare_labels(x1, x2):
    ndim = x1.shape[1]
    fig, axs = plt.subplots(1, ndim, figsize=(3*ndim, 3))
    for idim in range(ndim):
        axs[idim].plot(x1[:, idim], x2[:, idim], '.')
        _xlim = axs[idim].get_xlim()
        _ylim = axs[idim].get_ylim()
        _lim = min(_xlim[0], _ylim[0]), min(_xlim[1], _ylim[1])
        axs[idim].set_xlim(_lim)
        axs[idim].set_ylim(_lim)
        axs[idim].plot(_lim, _lim, 'k--')
        axs[idim].set_xlabel("truth")
        axs[idim].set_ylabel("prediction")
    fig.tight_layout()

compare_labels(labels_tensor, mlp(flux_tensor).detach())

# 2. Backward model (Convolutional neural network, CNN)

$(T_\mathrm{eff}, \log{g}, \mathrm{[Fe/H]}) = f(F_\lambda)$

In [None]:
cnn = nn.Sequential(
    
)
print(cnn)
cnn(torch.rand(size=(10, 1, 1500))).shape

In [None]:
train_loss, test_loss = train(data=(flux_tensor.unsqueeze(dim=1), labels_tensor), model=cnn, n_epoch=500, step=10)

In [None]:
compare_labels(labels_tensor, cnn(flux_tensor.unsqueeze(dim=1)).detach())

In [None]:
flux_tensor.unsqueeze(dim=1).shape

# 3. Backward model (MLP)

$F_\lambda = f(T_\mathrm{eff}, \log{g}, \mathrm{[Fe/H]})$

In [None]:
mlp_back = nn.Sequential(
    
)
mlp_back

In [None]:
train_loss, test_loss = train(data=(labels_tensor, flux_tensor), model=mlp_back, n_epoch=5000, step=100)
mlp_back.eval()

In [None]:
%pylab inline
ofst = 0.5
plt.plot(flux[::100].T+ np.arange(10)[None, :]*ofst, color="k")
plt.plot(mlp_back(labels_tensor[::100]).detach().T + np.arange(10)[None, :]*ofst)

In [None]:
torch.median(mlp_back(labels_tensor[:10]) - flux_tensor[:10], axis=1)

# 4. Autoencoder (MLP)

In [None]:
class AE(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(
        )

        self.decoder = nn.Sequential(
        )
    
    def forward(self, x):
        return self.decoder(self.encoder(x))

In [None]:
ae = AE()
print(ae)

In [None]:
# try to init bias in the last layer
ae.decoder[-1].bias.data = torch.median(flux_tensor, axis=0).values

In [None]:
torch.median(ae.decoder[-1].bias.data)

In [None]:
train_loss, test_loss = train(data=(flux_tensor, flux_tensor), model=ae, n_epoch=5000, step=100)
ae.eval()

In [None]:
%pylab inline
ofst = 0.5
plt.plot(flux[::100].T+ np.arange(10)[None, :]*ofst, color="k")
plt.plot(ae(flux_tensor[::100]).detach().T + np.arange(10)[None, :]*ofst)

In [None]:
ae

In [None]:
ae.encoder(torch.rand(size=(10, 1500))).shape

In [None]:
plt.plot(*ae.encoder(flux_tensor).detach().T, '.')

In [None]:
fig, axs = plt.subplots(1, 3, figsize=(10, 2.5))
for idim in range(3):
    h = axs[idim].scatter(*ae.encoder(flux_tensor).detach().T, s=10, c=labels[:,idim], cmap=plt.cm.jet)
    plt.colorbar(mappable=h, ax=axs[idim])