# Clasificador de Pulsares

In [None]:
import seaborn as sns

from src.common import *

%matplotlib inline

# Exploratory Data Analysis

Data load and feature naming

In [None]:
filename = "https://raw.githubusercontent.com/charitarthchugh/PulsarIdentification/master/HTRU_2.csv"
features = [
    "Mean of the integrated profile",
    "Standard deviation of the integrated profile",
    "Excess kurtosis of the integrated profile",
    "Skewness of the integrated profile",
    "Mean of the DM-SNR curve",
    "Standard deviation of the DM-SNR curve",
    "Excess kurtosis of the DM-SNR curve",
    "Skewness of the DM-SNR curve"
]
target = "target_class"

In [None]:
df = pd.read_csv(
    filename,
    names=features+[target]
)

In [None]:
df.info()

In [None]:
df

In [None]:
df.describe()

### Features distribution and correlations

In [None]:
df["target_class"].value_counts()

In [None]:
df["target_class"].value_counts(normalize=True)*100

In [None]:
sns.pairplot(
    data = df,
    hue = "target_class",
    corner = True
);

In [None]:
# Compute the correlation matrix
corr = df[features].corr()

# Generate a mask for the upper triangle
mask = np.triu(np.ones_like(corr, dtype=bool))

# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(11, 9))

# Draw the heatmap with the mask and correct aspect ratio
sns.heatmap(corr,
    mask=mask, 
    square=True,
    linewidths=.5,
    cbar_kws={"shrink": .5}
)

In [None]:
corr

In [None]:
# feature selection
features = [
    "Mean of the integrated profile",
    "Standard deviation of the integrated profile",
    #"Excess kurtosis of the integrated profile",
    "Excess kurtosis of the integrated profile",
    "Skewness of the integrated profile",
    "Mean of the DM-SNR curve",
    "Standard deviation of the DM-SNR curve",
    #"Excess kurtosis of the DM-SNR curve",
    "Excess kurtosis of the DM-SNR curve",
    "Skewness of the DM-SNR curve"
]

df = df[features+[target]]
df

In [None]:
# Transformation to features
df = feature_standardization(original_df = df, features=features, target=target)

In [None]:
df

#### Dimensionallity reduction visualization

In [None]:
X = df.drop(columns=["target_class"])
y = df["target_class"]

# Standardize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Apply PCA to reduce to 2 components
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)

# Plot the PCA-reduced data
plt.figure(figsize=(8, 6))
scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y, cmap='tab10', alpha=0.7)
plt.xlabel('PCA Component 1')
plt.ylabel('PCA Component 2')
plt.title('2D PCA Projection of Clusters')
plt.legend(*scatter.legend_elements(), title="Target")
plt.grid(True)
plt.tight_layout()
plt.show()


### Pytorch dataset creation

In [None]:
# To numpy arrays
inputs_df = df.drop("target_class",axis=1)
inputs_arr = inputs_df.to_numpy() 
targets_df = df["target_class"] 
targets_arr = targets_df.to_numpy()
# To torch tensors
inputs=torch.from_numpy(inputs_arr).type(torch.float64)
targets=torch.from_numpy(targets_arr).type(torch.long)
inputs.shape, targets.shape

In [None]:
dataset=TensorDataset(inputs, targets)

### Train, validation, test split

In [None]:
num_rows=df.shape[0]
val_percent = 0.1 #Porcentaje de validaicón
test_percent = 0.1 #Porcentaje de test
val_size = int(num_rows*val_percent)
test_size = int(num_rows*test_percent)
train_size = num_rows - val_size-test_size

In [None]:
torch.manual_seed(2)#Nos aseguramos de que consigamos la misma validación cada vez.
train_ds, val_ds, test_ds = random_split(dataset, (train_size, val_size, test_size))
print("Muestras de entrenamiento: ",len(train_ds))
print("Muestras de validation:", len(val_ds))
print("Muestras test: ",len(test_ds))

In [None]:
batch_size=256

In [None]:
# PyTorch data loaders
train_dl = DataLoader(train_ds, batch_size, num_workers=3, pin_memory=True)
val_dl = DataLoader(val_ds, batch_size, num_workers=3, pin_memory=True)
test_dl = DataLoader(test_ds, batch_size, num_workers=3, pin_memory=True)

GPU/CPU Configuration

In [None]:
# corroboramos el dispositivo
device=get_default_device()
device

In [None]:
train_dl = DeviceDataLoader(train_dl, device)
val_dl = DeviceDataLoader(val_dl, device)
test_dl = DeviceDataLoader(test_dl, device)

## NN architecture visualization

In [None]:
layers = [inputs_df.shape[1], 16, 16, 2]
net = Network(layers)
net.graph(layers)

### Architecture definition in Pytorch

In [None]:
class HTRU2Model(nn.Module):
    def __init__(self,):
        super(HTRU2Model,self).__init__()
        self.layer1 = nn.Linear(8, 16)
        self.layer2 = nn.Linear(16, 16)
        self.layer3 = nn.Linear(16, 2)
        self.softmax = nn.Softmax(dim=1)
    def forward(self, x):
        x = x.float()
        x = self.layer1(x)
        x = F.relu(x)
        x = self.layer2(x)
        x = F.relu(x)
        x = self.layer3(x)
        x = self.softmax(x)
        return x
    def training_step(self, batch):
        inputs, targets = batch 
        out = self(inputs)                  # Generates predictions 
        loss = F.cross_entropy(out, targets) # Calculates loss
        return loss

    def predict_test(self, batch):
        inputs, targets = batch 
        out = self(inputs)                  # Gets predictions 
        return out
    
    def validation_step(self, batch):
        inputs, targets = batch 
        out = self(inputs)                    # Gets predictions
        loss = F.cross_entropy(out, targets)   # Calculates loss
        acc = accuracy(out, targets)           # Calculates accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Calculates cost
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Precision aggregation
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print("Epoch [{}], last_lr: {:.5f}, train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format(
            epoch, result['lrs'][-1], result['train_loss'], result['val_loss'], result['val_acc']))

In [None]:
model = to_device(HTRU2Model(), device)

In [None]:
summary(model, (1,8))

### Model initialization

In [None]:
history = [evaluate(model, val_dl)]
history

### Hyperparameters

In [None]:
epochs = 10
max_lr = 0.01
grad_clip = 0.1
weight_decay = 1e-4
opt_func = torch.optim.Adam

### Training

In [None]:
%%time
history += fit_one_cycle(epochs, max_lr, model, train_dl, val_dl, 
                             grad_clip=grad_clip, 
                             weight_decay=weight_decay, 
                             opt_func=opt_func)

### Precision and loss

In [None]:
plot_losses(history)

In [None]:
plot_accuracies(history)

In [None]:
plot_lrs(history)

## Model evaluation in test set

In [None]:
evaluate(model, test_dl)

In [None]:
# Unfolding batched predictions
test_probability_predictions_batch = list()
for batch in test_dl:
  test_probability_predictions_batch.append((model.predict_test(batch)))
#----------Empaquetamiento de predicciones-----------------------------------
test_target_probabilities = []
test_target_predictions = []
for i in range(len(test_probability_predictions_batch)):
  for j in range(len(test_probability_predictions_batch[i])):
    test_target_probabilities.append(
      (test_probability_predictions_batch[i][j].detach()).numpy()
    )
    test_target_predictions.append(
      1 if (test_probability_predictions_batch[i][j].detach()).numpy()[1] >= 0.5 else 0 #If second element is higher than 0.5, it's a Pulsar
    )

test_target_probabilities = np.array(test_target_probabilities)
test_target_predictions = np.array(test_target_predictions)

In [None]:
test_target_predictions

In [None]:
test_target_real = []
for i in range(len(test_ds)):
  test_target_real.append(np.squeeze(test_ds[i][1].numpy()))
test_target_real = np.array(test_target_real)

In [None]:
np.sum(test_target_predictions)

In [None]:
np.sum(test_target_real)

In [None]:
proba_true_df = pd.DataFrame(test_target_probabilities, columns=["not_pulsar", "is_pulsar"])
proba_true_df["true"] = test_target_real

In [None]:
fig, ax = plt.subplots(figsize=(6,4))
sns.histplot(
    proba_true_df,
    x="is_pulsar",
    hue="true",
    bins=16,
    kde=True,
    stat="density",
    ax=ax
)
#ax.set_yscale("log")

In [None]:
arr = confusion_matrix(test_target_real, test_target_predictions).T
arr

In [None]:
arr = confusion_matrix(test_target_real, test_target_predictions, normalize = 'true').T
arr = 100*arr
arr

In [None]:
df_cm = pd.DataFrame(
    arr, 
    index = ["Negative","Positive"],
    columns = ["Negative","Positive"]
)

In [None]:
df_cm

In [None]:
plt.figure(figsize = (10,7))
plt.rcParams.update({'font.size': 22})
ax = sns.heatmap(df_cm, annot=True, fmt=".1f")
ax.set(xlabel=r'Predicted', ylabel=r'Truth');