In [None]:
import time
# Pandas used for reading data and converting dataframes to machine understandable code
import pandas as pd
import matplotlib.pyplot as plt
import random
import torch
import pytorch_lightning as pl
from pytorch_lightning.metrics import Precision
from pytorch_lightning.metrics import Recall
from torchmetrics import (
    Accuracy,
    AUROC,
)
from typing import List, Dict
from torch import nn
from torch.nn import functional as F
from torch.utils.data import (
    DataLoader,
    Dataset,
)

# from sklearn.metrics import classification_report

# from torch.utils.data import random_split

# %load_ext nb_black


In [None]:
# reading clinical data
bc_df =pd.read_csv("data.csv") # breast cancer DataFrame
# loc = location
# location of benign and malignant are seperated bc data is small and you want the data to be even
bc_benign_diags = bc_df.loc[bc_df["diagnosis"] == "B"]
bc_malignant_diags = bc_df.loc[bc_df["diagnosis"] == "M"]

# print(type(bc_benign_diags["diagnosis"]))

# we dont need the id or unnamed coloumns from the dataFrame so drop is used to drop them
# split up the data for two reasons: 1) Because there is not a lot of data we split to prevent the majority of the training set from being from one class. 
# 2) Double the Malignant to make the data equal
# 3) Prevents false negatives. Even balance of both diagnosis
# 4) Only want to double when necessary to avoid overfitting
benign_examples = bc_benign_diags.drop(["id","Unnamed: 32" ], axis = 1)
# replaces B with 0.0 for binary classification
benign_examples["diagnosis"].replace(to_replace="B", value=0.0, inplace=True)

malignant_examples_orig = bc_malignant_diags.drop(["id","Unnamed: 32" ], axis = 1)
malignant_examples_orig["diagnosis"].replace(to_replace="M", value=1.0, inplace=True)

# doubling the data
malignant_examples = pd.concat([malignant_examples_orig, malignant_examples_orig])

print(f"malignant_examples size={len(malignant_examples)}")
print(f"benign_examples size={len(benign_examples)}")

input_variable_names = [
    i_labl for i_labl in benign_examples.columns if i_labl != "diagnosis"
]

# prints the first 5 example of each table
malignant_examples.head()
benign_examples.head()


# print(f"malignant_examples = {malignant_examples}")
# xxx = list(range(100))
# dd = {
#     "x": xxx,
#     "y": [x * x + 2000 * random.random() - 1000 for x in xxx],
# #     "y": [x * x for x in xxx],

# }



# examples_df = pd.DataFrame(data=dd)
# examples_df.head()
# print(examples_df.iloc[10, :])



# plt.plot(examples_df.x, examples_df.y)



In [None]:
class ResNetBlock(nn.Module):
    
    # This is the setup of the resnet
    def __init__(self, num_inputs, layer_size, num_output, dropout_rate=0.0):

        super(ResNetBlock, self).__init__()

        # TODO: discusiton point
        self.dropout_rate = dropout_rate

        self.lin_1 = nn.Linear(num_inputs, layer_size)
        # torch.nn.init.xavier_uniform(self.lin_1.weight)
        # torch.nn.init.xavier_uniform(self.lin_1.bias)
        # self.lin_1.bias.data.fill_(0.1)
        # m.bias.data.fill_(0.01)

        self.lin_2 = nn.Linear(layer_size, num_output)
        # torch.nn.init.xavier_uniform(self.lin_2.weight)
        # self.lin_2.bias.data.fill_(0.1)

        # This is the resnet component that maps the dimension of the input to the output and adds them.
        if num_inputs != num_output:
            self.lin_map = nn.Linear(num_inputs, num_output, bias=False)
            # torch.nn.init.xavier_uniform(self.lin_map.weight)
            # self.lin_map.weight.data.fill_(1.0)

        else:
            self.lin_map = None

    # override that defines how to forward propogate input through the defined layers of the model
    
    # set up geometry of the layers and set activation functions/dropout rate
    # x refers to input
    def forward(self, x):

        nn_sp = nn.Softplus()

        # TODO: discusiton point
        nn_dropout = nn.Dropout(p=self.dropout_rate)

        output = self.lin_1(x)
        output = nn_sp(output)
        # output = nn_dropout(output)
        output = self.lin_2(output)

        if self.lin_map:
            output = self.lin_map(x) + output
        else:
            output = x + output

        # output = nn_sp(output)
        return nn_dropout(output)


# from pytorch-lightning
class BreastCancerClassifier(pl.LightningModule):
    # define model elements( e.i: layers and forward() function)
    # TODO: remove hard coded num inputs and and layer size and num outputs
    def __init__(self, hparams: Dict):

        super().__init__()
        
        self.save_hyperparameters(hparams)

        self.val_accuracy = Accuracy(compute_on_step=False)
        self.val_auroc = AUROC(compute_on_step=False)

        self.train_accuracy = Accuracy(compute_on_step=False)
        self.train_auroc = AUROC(compute_on_step=False)

        self.temp_mod_lst = [
            ResNetBlock(self.hparams.layer_size, self.hparams.layer_size, self.hparams.layer_size, self.hparams.dropout_rate) for i in range(self.hparams.num_layers)
        ]

        self.model = nn.Sequential(
            ResNetBlock(self.hparams.num_inputs, self.hparams.layer_size, self.hparams.layer_size, self.hparams.dropout_rate),
            *self.temp_mod_lst,
            ResNetBlock(self.hparams.layer_size, self.hparams.layer_size, self.hparams.num_outputs),
            nn.Sigmoid(),
        )

    # override that defines how to forward propogate input through the defined layers of the model
    def forward(self, x):
        output = self.model(x)
        # print(f"x = {x}, output={output}")
        # return F.sigmoid(output)
        return output

    # ???
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams.lr)
        return optimizer

    # defines a loss function(mse) and an optimization algorithm
    def training_step(self, train_batch, batch_idx):
        # print(f"\n training_step: train_batch = {train_batch}\n")
        x, y = train_batch
        # print(f"training_step: x = {x}")
        # print(f"training_step: y = {y}")
        # x = x.view(x.size(0), -1)
        # print(f"training_step: x = {x}")
        y_hat = self(x)
        # print(f"training_step: y_hat = {y_hat}")
        # loss = F.binary_cross_entropy_with_logits(y_hat, y)
        loss = F.mse_loss(y_hat, y)
        # print(f"training_step: loss = {loss}")

        # print(f"training_step: target = {y}, y_hat = {y_hat}, pred = {F.sigmoid(y_hat)}")

        self.log("train_loss", loss, enable_graph=True, on_epoch=True, on_step=False)

        # self.train_accuracy.update(F.sigmoid(y_hat), y.to(torch.int32))
        self.train_accuracy.update(y_hat, y.to(torch.int32))

        # self.train_auroc.update(F.sigmoid(y_hat), y.to(torch.int32))
        self.train_auroc.update(y_hat, y.to(torch.int32))

        return loss

    def training_epoch_end(self, val_outputs):
        try:
            self.log("train_acc_epoch", self.train_accuracy.compute(), on_epoch=True)
            self.train_accuracy.reset()
        except:
            pass

        try:
            self.log("train_auroc_epoch", self.train_auroc.compute())
            self.train_auroc.reset()
        except:
            pass



    def validation_step(self, val_batch, batch_idx):
        #         print(f"val_batch = {val_batch}")

        x, y = val_batch
        # x = x.view(x.size(0), -1)
        y_hat = self.model(x)
        # loss = F.binary_cross_entropy_with_logits(y_hat, y)
        loss = F.mse_loss(y_hat, y)
        self.log("val_loss", loss, enable_graph=True)

        # print(f"validation_step: target = {y}, y_hat = {y_hat}, pred = {F.sigmoid(y_hat)}")
        # self.val_accuracy.update(F.sigmoid(y_hat), y.to(torch.int32))
        self.val_accuracy.update(y_hat, y.to(torch.int32))

        # self.val_auroc.update(F.sigmoid(y_hat), y.to(torch.int32))
        self.val_auroc.update(y_hat, y.to(torch.int32))

        return loss

    def validation_epoch_end(self, val_outputs):
        try:
            self.log("valid_acc_epoch", self.val_accuracy.compute(), on_epoch=True)
            self.val_accuracy.reset()
        except:
            pass

        try:
            self.log("valid_auroc_epoch", self.val_auroc.compute())
            self.val_auroc.reset()
        except:
            pass


In [None]:
class ExamplesDataset(Dataset):
    def __init__(self, data: pd.DataFrame, input_variable_names: list):
        #         self.data = torch.FloatTensor(data.values.astype("float"))
        self.data = data
        self.input_variable_names = input_variable_names

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        one_item = self.data.iloc[index, :].astype("float32")

        input_values = [one_item[a_label] for a_label in self.input_variable_names]
        target_value = one_item["diagnosis"]

        # print(f"ExamplesDataset: input vals = {input_values}, target = {target_value}")
        return torch.tensor(input_values), torch.tensor([target_value])


class BreastCancerClassifierDataLoader(pl.LightningDataModule):
    def __init__(
        self, benign_examples: pd.DataFrame, malignant_examples: pd.DataFrame, input_variable_names: List[str],
    ):
        super().__init__()

        # print(f"benign examples = {benign_examples}")

        self.batch_size = 1
        self.benign_examples = benign_examples
        self.malignant_examples = malignant_examples

        self.input_variable_names = input_variable_names

        # print(f"input labels = {self.input_variable_names}")

        # Setting the training data
        self.benign_train_df = self.benign_examples.sample(frac=0.8)
        self.malignant_train_df = self.malignant_examples.sample(frac=0.8)
        # Concatenating both training sets
        self.train_df = pd.concat([self.benign_train_df, self.malignant_train_df])
        # print(f"train_df = {self.train_df}")
        self.train_ds = ExamplesDataset(self.train_df, self.input_variable_names)

        self.benign_val_df = self.benign_examples.drop(self.benign_train_df.index)
        self.malignant_val_df = self.malignant_examples.drop(
            self.malignant_train_df.index
        )
        self.val_df = pd.concat([self.benign_val_df, self.malignant_val_df])
        # print(f"val_df = {self.val_df}")
        self.val_ds = ExamplesDataset(self.val_df, self.input_variable_names)

    def prepare_data(self):
        pass

    def setup(self, stage=None):
        pass

    def train_dataloader(self):
        return DataLoader(self.train_ds, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val_ds, batch_size=self.batch_size, shuffle=True)

    def test_dataloader(self):
        return DataLoader(self.val_ds, batch_size=self.batch_size, shuffle=True)


# train_loader = DataLoader(train_df, batch_size=2)
# val_loader = DataLoader(val_df, batch_size=2)

In [None]:
dm_loader = BreastCancerClassifierDataLoader(
    benign_examples, malignant_examples, input_variable_names
)

# model
hparams = {
    "num_inputs": 30,
    "num_outputs": 1,
    "layer_size": 34,
    "dropout_rate": 0.04,
    "lr": 1e-4,
    "num_layers": 8,
}

model = BreastCancerClassifier(hparams)

# training
trainer = pl.Trainer(gpus=0, max_epochs=600, logger=True, progress_bar_refresh_rate=50)
start_time = time.time()
trainer.fit(model, dm_loader)
print(f"training duration: {time.time()-start_time} seconds")
# trainer.fit(model, dm_loader.train_dataloader(), dm_loader.val_dataloader())
# print(model)


In [None]:
# xxxt = torch.tensor(range(100), dtype=torch.float32)
# xxxt = torch.unsqueeze(xxxt, 1)
# # print(xxxt)
# yyyp = model(xxxt)
# # print(yyyp)
# # plt.plot(xxxt.detach().numpy(), yyyp.detach().numpy())

trained_model = BreastCancerClassifier.load_from_checkpoint("./lightning_logs/version_6/checkpoints/epoch=168-step=105624.ckpt")

# TODO: discussion point
trained_model.eval()

print(f"model params: {trained_model.hparams}")

for idx, mlex in pd.concat([malignant_examples, benign_examples]).iterrows():
    input_values = [mlex[a_label] for a_label in input_variable_names]
    target_value = mlex["diagnosis"]

    target_tnsr = torch.tensor([target_value], dtype=torch.float32)
    input_tnsr = torch.tensor([input_values], dtype=torch.float32)
    # print(input_tnsr)
    print(f"model prediction: {trained_model(input_tnsr)}, actual diagnosis: {target_value}" )
    