# Test for the CNN Model

This notebook will be used to create and train a dummy version of the model that will be used for classifying the transit light curves.

In [21]:
# conda install pytorch torchvision -c pytorch


# Imports
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from bokeh.plotting import figure, show
from bokeh.io import output_notebook

import torch
import torch.nn.functional as F
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader
from torch import nn

In [22]:
# Check whether GPU is available and choose a device to run the model on
gpu_available = torch.cuda.is_available()
device_name = "cuda" if gpu_available else "cpu"
device = torch.device(device_name)

print(f"Using {device_name}")

Using cpu


We will need to create a dummy dataset that resembles our final data.


In [23]:
def normal(dimensions, mean=0., stddev=1.):
    """Torch tensor of samples from a normal distribution, with given shape.

    Attributes:
        dimensions (tuple): shape of the output tensor 
        mean (float, optional): mean of the sampled normal distribution
        stddev (float, optional): standard deviation of the normal distribution
    
    Returns:
        torch.Tensor: PyTorch multi-dimensional matrix with the samples

    """

    return torch.from_numpy(stddev * np.random.randn(*dimensions) + mean)

def uniform(dimensions, mini=0., maxi=1.):
    """Torch tensor of samples from an uniform distribution, with given shape.

    Attributes:
        dimensions (tuple): shape of the output tensor 
        mini (float, optional): lower limit of the sampled uniform distribution
        maxi (float, optional): upper limit of the sampled uniform distribution
    
    Returns:
        torch.Tensor: PyTorch multi-dimensional matrix with the samples

    """
    return torch.from_numpy(np.random.random(dimensions)*(maxi-mini) + mini)

In [24]:
def create_transit_lightcurve(len_lightcurve, transit_duration, contact_ratio,
                              noise_power=0., time_view=(-1,)):
    """Creates a transit-like normalized lightcurve.

    Normalization means that the curve outside the transit is centered around 0 
    and the depth of the transit is set to -1. The transit is in the middle of
    the vector, with the "greatest transit" exactly in the center.

    It can work with broadcasting.
    
    Attributes:
        len_lightcurve (int): number of points in the lightcurves
        transit_duration (float): normalized duration of the transit with 
            respect to the orbital period
        contact_ratio (float): ratio between ingress or egress and the transit
        noise (float): variance of gaussian noise to add to the lightcurve
        time_view (tuple, optional): description of dimension along which the
            lightcurve vector will be set. 1D by default

    Returns:
        torch.Tensor: PyTorch multi-dimensional matrix

    """
    assert type(len_lightcurve) == int
    
    # Normalized time tensor, from 0 to 1 inclusive
    time = torch.linspace(0., 1., len_lightcurve).view(*time_view)

    # Calculate the times of the contact points at each lightcurve, from 0 to 1
    contact_1 = 0.5 - transit_duration/2 - contact_ratio * transit_duration
    contact_2 = 0.5 - transit_duration/2
    contact_3 = 0.5 + transit_duration/2
    contact_4 = 0.5 + transit_duration/2 + contact_ratio * transit_duration

    # Calculate masks for each section of the light curve
    mask_ingress = (time > contact_1) & (time <= contact_2)
    mask_transit = (time > contact_2) & (time <= contact_3)
    mask_egress  = (time > contact_3) & (time <= contact_4)

    # Calculate normalized light values by section of the lightcurve
    ingress = torch.cos((time-contact_1)/(contact_2-contact_1)*np.pi)* 0.5 - 0.5
    transit = -1.
    egress  = torch.cos((time-contact_3)/(contact_4-contact_3)*np.pi)*-0.5 - 0.5

    # Sum all sections
    lightcurve = (ingress * mask_ingress +
                  transit * mask_transit +
                  egress  * mask_egress)
    
    # Calculate random noise
    noise = normal(tuple(lightcurve.size()), stddev=(noise_power**0.5).numpy())

    return lightcurve + noise


def sample_transit_lightcurves(nof_lightcurves, len_lightcurves, 
                               transit_duration_range = (0.001, 0.01),
                               contact_ratio_range = (0.1, 1.0),
                               noise_power_range = (0.001, 0.01),
                               ):
    """Creates a series of transit-like normalized lightcurves.

    Normalization means that the curve outside the transit is centered around 0 
    and the depth of the transit is set to -1. The transit is in the middle of
    the vector, with the "greatest transit" exactly in the center.
    
    Attributes:
        nof_lightcurves (int): number of lightcurves that will be created, 
            which will be stacked along the first dimension of the tensor
        len_lightcurves (int): number of points in the lightcurves, which will
            be set along the third dimension of the tensor
        transit_duration_range (tuple): range of the uniform distribution from
            which transit durations will be sampled for each light curve
        contact_ratio_range (tuple): range of the uniform distribution from
            which contact ratios will be sampled for each light curve
        noise_power_range (tuple): range of the uniform distribution from
            which noise powers will be sampled for each light curve

    Returns:
        torch.Tensor: PyTorch multi-dimensional matrix of size (nof_lightcurves,
            1, len_lightcurves), with different light curves along the third
            dimension

    """
    assert type(nof_lightcurves) == int
    assert type(len_lightcurves) == int

    # Random distribution for the normalized transit duration, defined as the 
    # ratio of the time between second and third contacts and the period
    transit_duration = uniform((nof_lightcurves, 1, 1), *transit_duration_range)

    # Random distribution for the time between first and second contacts divided 
    # by the transit duration, dependent on the relative size of planet and star
    contact_ratio = uniform((nof_lightcurves, 1, 1), *contact_ratio_range)
    
    # Random distribution for the noise power, measured as variance, for the 
    # gaussian distributions from which it will be sampled
    noise_power = uniform((nof_lightcurves, 1, 1), *noise_power_range)

    lightcurves = create_transit_lightcurve(len_lightcurves, transit_duration, 
                                            contact_ratio, 
                                            noise_power=noise_power,
                                            time_view=(1, 1, -1))

    return lightcurves

In [25]:
def create_binary_lightcurve(len_lightcurve, transit_duration, contact_duration,
                             noise_power=0., time_view=(-1,)):
    """Creates a binary eclipse-like normalized lightcurve.

    Normalization means that the curve outside the transit is centered around 0 
    and the depth of the transit is set to -1. The transit is in the middle of
    the vector, with the "greatest transit" exactly in the center.

    It can work with broadcasting.
    
    Attributes:
        len_lightcurve (int): number of points in the lightcurves
        transit_duration (float): normalized duration of the transit with 
            respect to the orbital period
        contact_duration (float): normalized duration of ingress or egress with
            respect to the orbital period
        noise (float): variance of gaussian noise to add to the lightcurve
        time_view (tuple, optional): description of dimension along which the
            lightcurve vector will be set. 1D by default

    Returns:
        torch.Tensor: PyTorch multi-dimensional matrix

    """
    assert type(len_lightcurve) == int
    
    # Normalized time tensor, from 0 to 1 inclusive
    time = torch.linspace(0., 1., len_lightcurve).view(*time_view)

    # Parameters of the model
    contact_ratio = contact_duration/transit_duration
    contact_depth = contact_ratio * np.exp(-contact_ratio)*0.2-1

    # Calculate the times of the contact points at each lightcurve, from 0 to 1
    contact_1 = 0.5 - transit_duration/2 - contact_duration
    contact_2 = 0.5 - transit_duration/2
    contact_3 = 0.5 + transit_duration/2
    contact_4 = 0.5 + transit_duration/2 + contact_duration

    # Calculate masks for each section of the light curve
    mask_ingress = (time > contact_1) & (time <= contact_2)
    mask_transit = (time > contact_2) & (time <= contact_3)
    mask_egress  = (time > contact_3) & (time <= contact_4)

    # Calculate normalized light values by section of the lightcurve
    ingress = (time-contact_1)/(contact_2-contact_1)*contact_depth
    transit = -(1+contact_depth)*torch.cos(
               (time-0.5)*np.pi/(contact_3-contact_2))+contact_depth
    egress  = contact_depth-(time-contact_3)/(contact_4-contact_3)*contact_depth 

    # Sum all sections
    lightcurve = (ingress * mask_ingress +
                  transit * mask_transit +
                  egress  * mask_egress)
    
    # Calculate random noise
    try:
        noise = normal(tuple(lightcurve.size()), 
                       stddev=(noise_power**0.5).numpy())
    except:
        noise = normal(tuple(lightcurve.size()), 
                       stddev=(noise_power**0.5))
    return lightcurve + noise


def sample_binary_lightcurves(nof_lightcurves, len_lightcurves, 
                              transit_duration_range = (0.001, 0.01),
                              contact_duration_range = (0.001, 0.01),
                              noise_power_range = (0.001, 0.01),
                              ):
    """Creates a binary eclipse-like normalized lightcurves.

    Normalization means that the curve outside the transit is centered around 0 
    and the depth of the transit is set to -1. The transit is in the middle of
    the vector, with the "greatest transit" exactly in the center.
    
    Attributes:
        nof_lightcurves (int): number of lightcurves that will be created, 
            which will be stacked along the first dimension of the tensor
        len_lightcurves (int): number of points in the lightcurves, which will
            be set along the third dimension of the tensor
        transit_duration_range (tuple): range of the uniform distribution from
            which transit durations will be sampled for each light curve
        contact_duration_range (tuple): range of the uniform distribution from
            which contact durations will be sampled for each light curve
        noise_power_range (tuple): range of the uniform distribution from
            which noise powers will be sampled for each light curve

    Returns:
        torch.Tensor: PyTorch multi-dimensional matrix of size (nof_lightcurves,
            1, len_lightcurves), with different light curves along the third
            dimension

    """
    assert type(nof_lightcurves) == int
    assert type(len_lightcurves) == int

    # Random distribution for the normalized transit duration, defined as the 
    # ratio of the time between second and third contacts and the period
    transit_duration = uniform((nof_lightcurves, 1, 1), *transit_duration_range)

    # Random distribution for the time between first and second contacts divided 
    # by the transit duration, dependent on the relative size of planet and star
    contact_duration = uniform((nof_lightcurves, 1, 1), *contact_duration_range)
    
    # Random distribution for the noise power, measured as variance, for the 
    # gaussian distributions from which it will be sampled
    noise_power = uniform((nof_lightcurves, 1, 1), *noise_power_range)

    lightcurves = create_binary_lightcurve(len_lightcurves, transit_duration, 
                                           contact_duration, 
                                           noise_power=noise_power,
                                           time_view=(1, 1, -1))

    return lightcurves

In [26]:
# Create dummy tensors for input

nof_transit_like = 5000  # Number of samples of transit-like curves
nof_binary_like = 5000  # Number of samples of binary eclipse-like curves

len_lightcurves = 2001  # Length of input array, should be odd

# Labels will be a categorical variable directly
transit_lightcurves = sample_transit_lightcurves(
                          nof_transit_like, len_lightcurves,
                          transit_duration_range = (0.001, 0.01),
                          contact_ratio_range = (0.1, 1.0),
                          noise_power_range = (0.0001, 0.001),
                          )
transit_label = torch.zeros(nof_transit_like)

binary_lightcurves = sample_binary_lightcurves(
                          nof_binary_like, len_lightcurves,
                          transit_duration_range = (0.001, 0.01),
                          contact_duration_range = (0.001, 0.01),
                          noise_power_range = (0.0001, 0.001),
                          )
binary_label  = torch.ones(nof_binary_like)

Let's plot from the planetary transit dataset.

In [27]:
output_notebook()
transit_plot = figure(x_axis_label=("Normalized time"), 
                y_axis_label=("Normalized light intensity"), 
                plot_width=800, plot_height=350)
transit_plot.scatter(x=np.linspace(0, 1, len_lightcurves), 
                  y=transit_lightcurves[0, 0, :].cpu().numpy().flatten()
                  )
show(transit_plot)

And now from the binary eclipse dataset.

In [28]:
output_notebook()
binary_plot = figure(x_axis_label=("Normalized time"), 
                y_axis_label=("Normalized light intensity"), 
                plot_width=800, plot_height=350)
binary_plot.scatter(x=np.linspace(0, 1, len_lightcurves), 
                  y=binary_lightcurves[0, 0, :].cpu().numpy().flatten(),
                  color="red")
show(binary_plot)

In [29]:
# Concatenate tensors from both types and store them in GPU if available
X, y = (torch.cat((binary_lightcurves, transit_lightcurves)).to(
            device, dtype=torch.float), 
        torch.cat((transit_label, binary_label)).to(
            device, dtype=torch.long))

# Create an iterable dataset from the input and label tensors
dataset = TensorDataset(X, y)

In [30]:
# Perform the train/test/validation split
train_size = int(0.7 * len(y))
test_size  = int(0.2 * len(y))
val_size = len(y) - train_size - test_size
train_dataset, test_dataset, val_dataset = torch.utils.data.random_split(
                                             dataset, 
                                             [train_size, test_size, val_size])

In [39]:
# Choose a batch size and create a data loader from the dataset

bs = 50  # Batch size
train_dl = DataLoader(train_dataset, batch_size=bs, shuffle=True)  # Needs shuffling to prevent correlation between batches
test_dl  = DataLoader(test_dataset,  batch_size=bs * 2)  # Test can use higher batch size because it needs less memory
valid_dl = DataLoader(val_dataset, batch_size=bs * 2)  # Validation can use higher batch size because it needs less memory

In [219]:
class Test_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv1d(1, 8, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(8, 8, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv1d(8, 4, kernel_size=3, stride=1, padding=1)
        self.fc1   = nn.Linear(int(len_lightcurves/4)*4, 50)
        self.fc2   = nn.Linear(50, 2)
        
    def forward(self, xb):  # xb is of size (1, L)
        
        # Convolutions and pooling
        xb = F.relu(self.conv1(xb))  # Size (8, L)
        xb = F.relu(self.conv2(xb))  # Size (8, L)
        xb = F.relu(self.conv3(xb))  # Size (4, L)
        xb = F.max_pool1d(xb, 4)     # Size (4, floor(L/4))
        
        # Reshape to input the linear layer
        xb = xb.view(-1, int(len_lightcurves/4)*4)  # -1 infers the necessary dimesion from the rest

        # Apply fully connected layers
        xb = F.relu(self.fc1(xb))    # Size (1, 50)
        xb = self.fc2(xb)            # Size (1, 2)
        
        return xb

In [220]:
class AstroNET_v1(nn.Module):
    def __init__(self):
        super().__init__()
                
        self.conv_5_16_in  = nn.Conv1d(  1,  16, kernel_size=5, stride=1, padding=2)
        self.conv_5_16     = nn.Conv1d( 16,  16, kernel_size=5, stride=1, padding=2)
        
        self.conv_5_32_in  = nn.Conv1d( 16,  32, kernel_size=5, stride=1, padding=2)
        self.conv_5_32     = nn.Conv1d( 32,  32, kernel_size=5, stride=1, padding=2)
        
        self.conv_5_64_in  = nn.Conv1d( 32,  64, kernel_size=5, stride=1, padding=2)
        self.conv_5_64     = nn.Conv1d( 64,  64, kernel_size=5, stride=1, padding=2)
        
        self.conv_5_128_in = nn.Conv1d( 64, 128, kernel_size=5, stride=1, padding=2)
        self.conv_5_128    = nn.Conv1d(128, 128, kernel_size=5, stride=1, padding=2)
        
        self.conv_5_256_in = nn.Conv1d(128, 256, kernel_size=5, stride=1, padding=2)
        self.conv_5_256    = nn.Conv1d(256, 256, kernel_size=5, stride=1, padding=2)
        
        self.maxp_5_2     = nn.MaxPool1d(5, stride=2, padding=2)
        self.maxp_7_2     = nn.MaxPool1d(7, stride=2, padding=3)

        self.fc_512_1     = nn.Linear(63*256, 512)
        self.fc_512_2     = nn.Linear(512, 512)
        self.fc_512_3     = nn.Linear(512, 512)
        self.fc_512_4     = nn.Linear(512, 512)
        
        self.fc_out       = nn.Linear(512, 2)
        
    def forward(self, xb):  # xb is of size (1, L)
        
        # Convolutions and pooling
        xb = F.relu(self.conv_5_16_in (xb))
        xb = F.relu(self.conv_5_16    (xb))
        xb = self.maxp_5_2(xb)
        
        xb = F.relu(self.conv_5_32_in (xb))
        xb = F.relu(self.conv_5_32    (xb))
        xb = self.maxp_5_2(xb)
        
        xb = F.relu(self.conv_5_64_in (xb))
        xb = F.relu(self.conv_5_64    (xb))
        xb = self.maxp_5_2(xb)
        
        xb = F.relu(self.conv_5_128_in(xb))
        xb = F.relu(self.conv_5_128   (xb))
        xb = self.maxp_5_2(xb)
        
        xb = F.relu(self.conv_5_256_in(xb))
        xb = F.relu(self.conv_5_256   (xb))
        xb = self.maxp_5_2(xb)
        
        # Reshape to input the linear layer
        xb = xb.view(-1, 63*256)  # -1 infers the necessary dimesion from the rest

        # Apply fully connected layers
        xb = F.relu(self.fc_512_1(xb))
        xb = F.relu(self.fc_512_2(xb))
        xb = F.relu(self.fc_512_3(xb))
        xb = F.relu(self.fc_512_4(xb))

        # Output layer
        xb = self.fc_out(xb)
        
        return xb

In [221]:
def get_model():
    model = Test_CNN()
    return model, optim.SGD(model.parameters(), lr=lr)

In [222]:
def loss_batch(model, loss_func, xb, yb, opt=None):
    loss = loss_func(model(xb), yb)

    # If an optimizer is used, then run as if trianing, otherwise as if testing
    if opt is not None:
        loss.backward()
        opt.step()
        opt.zero_grad()

    return loss.item(), len(xb)

In [284]:
def acc_batch(model, xb, yb):
    max_vals, max_indices = torch.max(model(xb), 1)  # Maximum along output dimension (0 is batch dimension)
    accuracy = (max_indices == yb).sum().float()/len(yb)
    return accuracy, len(yb)

In [285]:
def fit(epochs, model, loss_func, opt, train_dl, test_dl, verbose=True):
    for epoch in range(epochs):
        model.train()
        train_losses, nums = zip(
                *[loss_batch(model, loss_func, xb, yb, opt) for xb, yb in train_dl]
                )
        train_loss = np.sum(np.multiply(train_losses, nums)) / np.sum(nums)  # Average the individual losses in the batch

        model.eval()
        with torch.no_grad():
            test_losses, n_loss = zip(
                *[loss_batch(model, loss_func, xb, yb) for xb, yb in test_dl]
                )
            test_accs, n_acc = zip(*[acc_batch(model, xb, yb) for xb, yb in test_dl])
            
        test_loss = np.sum(np.multiply(test_losses, n_loss)) / np.sum(n_loss)  # Average the individual losses in the batch
        test_acc  = np.sum(np.multiply(test_accs,    n_acc)) / np.sum(n_acc )  # Average the individual accuracies in the batch
        
        if verbose:
            print(f"Epoch: {(epoch+1):3}    Train loss: {train_loss:7.5f}    Test loss: {test_loss:7.5f}    Test accuracy: {test_acc: 5.3f}")

In [286]:
learning_rate = 0.2
momentum      = 0.5
epochs        = 20

loss_func = F.cross_entropy
#model = AstroNET_v1().to(device)
model = AstroNET_v1().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)

In [287]:
fit(epochs, model, loss_func, optimizer, train_dl, test_dl)

Epoch:   1    Train loss: 0.69488    Test loss: 0.69649    Test accuracy:  0.496
Epoch:   2    Train loss: 0.69408    Test loss: 0.69441    Test accuracy:  0.496
Epoch:   3    Train loss: 0.69450    Test loss: 0.69653    Test accuracy:  0.496
Epoch:   4    Train loss: 0.69458    Test loss: 0.69324    Test accuracy:  0.496
Epoch:   5    Train loss: 0.69413    Test loss: 0.69344    Test accuracy:  0.496
