<a href="https://colab.research.google.com/github/canoztas/CMP684-Neural-Networks-supernovae/blob/main/supernovae.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import warnings
warnings.filterwarnings('ignore')

In [4]:
import gzip
import pickle
import pandas as pd
import numpy as np

def read_data(filename): #Read data from a pickled file to a pandas DataFrame.
    with gzip.open(filename, 'rb') as f:
        data = pickle.load(f)

    X = to_dataframe(data)
    y = pd.get_dummies(X['type'] == 0, prefix='SNIa', drop_first=True)
    X = X.drop(columns=['comment', 'type'])

    return X, y

def to_dataframe(data): #Converts from a Python dictionary to a pandas DataFrame.
    for idx, sn in data.items():
        for filt in 'griz':
            sn[f'mjd_{filt}'] = np.array(sn[filt]['mjd'])
            sn[f'fluxcal_{filt}'] = np.array(sn[filt]['fluxcal'])
            sn[f'fluxcalerr_{filt}'] = np.array(sn[filt]['fluxcalerr'])
            del sn[filt]
        sn.update(sn['header'])
        del sn['header']

    return pd.DataFrame.from_dict(data, orient='index')

In [63]:
X_train_raw, y_train = read_data('/content/drive/MyDrive/dataset/neuralnetwork/des_train.pkl')
X_test_raw, y_test = read_data('/content/drive/MyDrive/dataset/neuralnetwork/des_test.pkl')

In [6]:
def bazin(time, A, B, t0, tfall, trise):
    X = np.exp(-(time - t0) / tfall) / (1 + np.exp((time - t0) / trise))
    return A * X + B

In [7]:
from scipy.optimize import least_squares

def lightcurve_fit(time, flux):
    scaled_time = time - time.min()
    t0 = scaled_time[flux.argmax()]
    guess = (0, 0, t0, 40, -5)

    errfunc = lambda params: abs(flux - bazin(scaled_time, *params))

    result = least_squares(errfunc, guess, method='lm')

    return result.x

In [8]:
def uncertain_mean(flux, flux_err, w):
    UM = [x / y if y != 0 else x for x, y in zip(flux, flux_err)]

    if w != 0:
        return sum(UM) / w
    return sum(UM)

In [9]:
def uncertain_moving_average(flux, flux_err, w):
    l = len(flux)
    UMA = []

    if l <= 2 * w:
        UMA = flux
    else:
        for i in range(l):
            if i <= l - w:
                um = uncertain_mean(flux[i:i + w], flux_err[i:i + w], w)
            else:
                um = uncertain_mean(flux[i:l], flux_err[i:l], l - i)

            UMA.append(um)

    return UMA

In [10]:
def znormalization(UMA):
    array_UMA = np.array(UMA)
    m = np.mean(array_UMA)
    std = np.std(array_UMA)
    zUMA = [(x - m)/std  for x in UMA]
    return zUMA

In [11]:
def preprocessing(data):
    #Window length for computing uncertain moving average filtering
    w = 10
    DES_FILTERS = 'griz'
    # Create palceholder for output matrix
    full_params = np.zeros((len(data), 5 * len(DES_FILTERS)))
    # Iterate over supernovae
    for idx, snid in enumerate(data.index):
        params = np.zeros((len(DES_FILTERS), 5))
        # Iterate over filters
        for id_f, f in enumerate(DES_FILTERS):
            time = data.loc[snid, 'mjd_%s' % f]
            flux = data.loc[snid, 'fluxcal_%s' % f]
            flux_err = data.loc[snid, 'fluxcalerr_%s' % f]

            flux_uma = uncertain_moving_average(flux, flux_err, w)
            flux_zuma = znormalization(flux_uma)
            flux_zuma = np.array(flux_zuma)
            try:
                params[id_f] = lightcurve_fit(time,  flux_zuma)
            except ValueError:
                # If fit does not converge leave zeros
                continue
        full_params[idx] = params.ravel()

    return full_params

In [64]:
X_train = preprocessing(X_train_raw)

In [65]:
X_test = preprocessing(X_test_raw)

In [66]:
y_train = y_train.to_numpy()

In [67]:
y_test = y_test.to_numpy()

In [16]:
# Sigmoid and tanh activation functions
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def tanh(x):
    return np.tanh(x)

In [140]:
def neural_network(isZincluded,epoch,lr,X_train,y_train,X_test):
  # Assuming 4 filters (g, r, i, z) are used in the light curve. If used input size = 20 if not input size = 16
  input_size = 14
  if isZincluded:
    input_size = 20

  # Initialize weights and biases for the hidden layer
  hidden_weights = np.random.randn(input_size, 500)
  hidden_biases = np.zeros((1, 500))

  # Initialize weights and biases for the output layer
  output_weights = np.random.randn(500, 1)
  output_biases = np.zeros((1, 1))

  # Training parameters
  learning_rate = lr
  epochs = epoch
  batch_size = 16
  validation_split = 0.1
  start_training(X_train,y_train,epoch,lr,validation_split,hidden_weights,hidden_biases,output_weights,output_biases,X_test)

In [174]:
def start_training(X_train,y_train,epochs,learning_rate,validation_split,hidden_weights,hidden_biases,output_weights,output_biases,X_test):
  # Training loop
  for epoch in range(epochs):
      # Forward pass
      hidden_layer_input = np.dot(X_train, hidden_weights) + hidden_biases
      hidden_layer_output = tanh(hidden_layer_input)

      output_layer_input = np.dot(hidden_layer_output, output_weights) + output_biases
      output_layer_output = sigmoid(output_layer_input)

      # Compute loss (assuming binary cross-entropy)
      loss = -np.mean(y_train * np.log(output_layer_output) + (1 - y_train) * np.log(1 - output_layer_output))

      # Backward pass
      output_error = output_layer_output - y_train
      hidden_error = np.dot(output_error, output_weights.T) * (1 - np.square(hidden_layer_output))

      # Update weights and biases
      output_weights -= learning_rate * np.dot(hidden_layer_output.T, output_error) / len(X_train)
      output_biases -= learning_rate * np.sum(output_error, axis=0, keepdims=True) / len(X_train)

      hidden_weights -= learning_rate * np.dot(X_train.T, hidden_error) / len(X_train)
      hidden_biases -= learning_rate * np.sum(hidden_error, axis=0, keepdims=True) / len(X_train)

      # Validation loss
      if epoch % 10 == 0:
          val_hidden_layer_output = tanh(np.dot(X_train[int(X_train.shape[0] * (1 - validation_split)):], hidden_weights) + hidden_biases)
          val_output_layer_output = sigmoid(np.dot(val_hidden_layer_output, output_weights) + output_biases)
          val_loss = -np.mean(y_train[int(X_train.shape[0] * (1 - validation_split)):] * np.log(val_output_layer_output) +
                              (1 - y_train[int(X_train.shape[0] * (1 - validation_split)):]) * np.log(1 - val_output_layer_output))
          print(f"Epoch {epoch}/{epochs}, Loss: {loss:.4f}, Validation Loss: {val_loss:.4f}")
  prediction(hidden_weights,hidden_biases,output_weights,output_biases,X_test)


In [19]:
def prediction(hidden_weights,hidden_biases,output_weights,output_biases,X_test):
  # Prediction
  test_hidden_layer_output = tanh(np.dot(X_test, hidden_weights) + hidden_biases)
  y_pred = sigmoid(np.dot(test_hidden_layer_output, output_weights) + output_biases)
  calculate_results(y_pred)

In [71]:
from sklearn.metrics import confusion_matrix

def calculate_metrics(true_labels, predicted_probs, threshold=0.5, penalty_factor=3):
    # Convert probabilities to class labels based on the threshold
    predicted_labels = (predicted_probs > threshold).astype(int)

    # Confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels)

    # Extract values from the confusion matrix
    N_total_Ia = cm[1, :].sum()
    N_true_Ia = cm[1, 1]
    N_false_Ia = cm[0, 1]

    # Completeness
    completeness_Ia = N_true_Ia / N_total_Ia

    # Purity
    purity_Ia = N_true_Ia / (N_true_Ia + N_false_Ia)

    # SNPCC Figure of Merit
    FIa = (N_true_Ia ** 2) / (N_total_Ia * (N_true_Ia + penalty_factor * N_false_Ia))

    return completeness_Ia, purity_Ia, FIa

def calculate_results(y_pred):
  true_labels = y_test
  predicted_probs = y_pred

  threshold = 0.5
  penalty_factor = 3

  completeness, purity, figure_of_merit = calculate_metrics(true_labels, predicted_probs, threshold, penalty_factor)

  print("Completeness:", completeness)
  GLOBAL_COMPLETENESS.append(completeness)
  print("Purity:", purity)
  GLOBAL_PURITY.append(purity)
  print("SNPCC Figure of Merit:", figure_of_merit)
  GLOBAL_FOM.append(figure_of_merit)

In [179]:
GLOBAL_SAMPLE_SIZE = []
GLOBAL_Z_USED = []
GLOBAL_COMPLETENESS = []
GLOBAL_PURITY = []
GLOBAL_FOM = []

In [180]:
# Calculate the number of elements to select
percentages = [5,10,20,30,40,50,100]
for percentage in percentages:
  num_elements_to_select = int(len(X_train) * (percentage / 100.0))

  # Use numpy.random.choice to get random indices
  random_indices = np.random.choice(len(X_train), size=num_elements_to_select, replace=False)

  # Get the selected elements from the original array
  X_train_local = X_train[random_indices]
  tmp = pd.DataFrame(X_train_local).iloc[:,:9]
  tmp = tmp.join(pd.DataFrame(X_train_local).iloc[:,12])
  tmp = tmp.join(pd.DataFrame(X_train_local).iloc[:,14:18])
  X_train_local = tmp.to_numpy()

  y_train_local = y_train[random_indices]

  X_test_local = X_test
  tmp = pd.DataFrame(X_test_local).iloc[:,:9]
  tmp = tmp.join(pd.DataFrame(X_test_local).iloc[:,12])
  tmp = tmp.join(pd.DataFrame(X_test_local).iloc[:,14:18])
  X_test_local = tmp.to_numpy()

  GLOBAL_SAMPLE_SIZE.append(percentage)
  GLOBAL_Z_USED.append('no')
  neural_network(False,500,0.3,X_train_local,y_train_local,X_test_local)
  print(percentage)

Epoch 0/500, Loss: nan, Validation Loss: 1.9424
Epoch 10/500, Loss: nan, Validation Loss: 2.5785
Epoch 20/500, Loss: nan, Validation Loss: 0.8464
Epoch 30/500, Loss: 1.6705, Validation Loss: 2.2209
Epoch 40/500, Loss: 2.6152, Validation Loss: 1.1359
Epoch 50/500, Loss: 2.3152, Validation Loss: 2.8670
Epoch 60/500, Loss: 0.7698, Validation Loss: 1.3195
Epoch 70/500, Loss: 1.1931, Validation Loss: 1.5157
Epoch 80/500, Loss: 0.7663, Validation Loss: 1.2323
Epoch 90/500, Loss: 0.8791, Validation Loss: 0.4068
Epoch 100/500, Loss: 0.5818, Validation Loss: 0.6922
Epoch 110/500, Loss: 1.4066, Validation Loss: 2.0736
Epoch 120/500, Loss: 0.2947, Validation Loss: 0.1092
Epoch 130/500, Loss: 0.5270, Validation Loss: 0.5913
Epoch 140/500, Loss: 0.5686, Validation Loss: 1.0757
Epoch 150/500, Loss: 0.2991, Validation Loss: 0.4188
Epoch 160/500, Loss: 0.2001, Validation Loss: 0.2374
Epoch 170/500, Loss: 0.2428, Validation Loss: 0.3228
Epoch 180/500, Loss: 0.2347, Validation Loss: 0.2961
Epoch 190/500

In [181]:
# Calculate the number of elements to select
percentages = [5,10,20,30,40,50,100]
for percentage in percentages:
  num_elements_to_select = int(len(X_train) * (percentage / 100.0))

  # Use numpy.random.choice to get random indices
  random_indices = np.random.choice(len(X_train), size=num_elements_to_select, replace=False)

  # Get the selected elements from the original array
  X_train_local = X_train[random_indices]
  y_train_local = y_train[random_indices]
  X_test_local = X_test
  GLOBAL_SAMPLE_SIZE.append(percentage)
  GLOBAL_Z_USED.append('yes')
  neural_network(True,500,0.3,X_train_local,y_train_local,X_test_local)
  print(percentage)

Epoch 0/500, Loss: 4.2217, Validation Loss: 13.9801
Epoch 10/500, Loss: 0.4889, Validation Loss: 2.1256
Epoch 20/500, Loss: 0.1119, Validation Loss: 0.3967
Epoch 30/500, Loss: 0.0684, Validation Loss: 0.1508
Epoch 40/500, Loss: 0.0397, Validation Loss: 0.0473
Epoch 50/500, Loss: 0.0356, Validation Loss: 0.0325
Epoch 60/500, Loss: 0.0339, Validation Loss: 0.0282
Epoch 70/500, Loss: 0.0328, Validation Loss: 0.0252
Epoch 80/500, Loss: 0.0320, Validation Loss: 0.0229
Epoch 90/500, Loss: 0.0314, Validation Loss: 0.0209
Epoch 100/500, Loss: 0.0309, Validation Loss: 0.0193
Epoch 110/500, Loss: 0.0305, Validation Loss: 0.0179
Epoch 120/500, Loss: 0.0301, Validation Loss: 0.0167
Epoch 130/500, Loss: 0.0298, Validation Loss: 0.0156
Epoch 140/500, Loss: 0.0296, Validation Loss: 0.0147
Epoch 150/500, Loss: 0.0293, Validation Loss: 0.0139
Epoch 160/500, Loss: 0.0291, Validation Loss: 0.0132
Epoch 170/500, Loss: 0.0290, Validation Loss: 0.0125
Epoch 180/500, Loss: 0.0288, Validation Loss: 0.0119
Epo

In [187]:
res = pd.DataFrame(columns=['Sample Size','Z Used','COMPLENETENESS','PURITY','FOM','name'])

In [193]:
names = ['d0','d1','d2','d3','d4','d5','S']

In [194]:
res['Sample Size'] = GLOBAL_SAMPLE_SIZE
res['Z Used'] = GLOBAL_Z_USED
res['COMPLENETENESS'] = GLOBAL_COMPLETENESS
res['PURITY'] = GLOBAL_PURITY
res['FOM'] = GLOBAL_FOM
res['name'] = names*2

In [195]:
res

Unnamed: 0,Sample Size,Z Used,COMPLENETENESS,PURITY,FOM,name
0,5,no,0.675867,0.250163,0.06764,d0
1,10,no,0.743873,0.235348,0.069216,d1
2,20,no,0.578494,0.280815,0.066622,d2
3,30,no,0.833297,0.248994,0.082928,d3
4,40,no,0.476706,0.29804,0.059102,d4
5,50,no,0.860234,0.244034,0.083572,d5
6,100,no,0.909251,0.243683,0.088182,S
7,5,yes,0.578715,0.258074,0.060129,d0
8,10,yes,0.688231,0.244835,0.067124,d1
9,20,yes,0.524178,0.28326,0.061015,d2
