In [1]:
# %run lime.ipynb

In [2]:
import pydicom as dicom
import PIL

In [3]:
import matplotlib.pyplot as plt
from PIL import Image
import torch.nn as nn
import numpy as np

import torch
from torchvision import models, transforms
from torch.autograd import Variable
import torch.nn.functional as F

from tensorflow.keras.utils import load_img

import anvil.server, anvil.media

2023-04-09 07:44:44.230346: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [4]:
import matplotlib.pyplot as plt
from PIL import Image
import torch.nn as nn
import numpy as np
import os, json, io

from lime import lime_image
import skimage.segmentation as segmentation

import anvil

In [5]:
def denormalize(inp, title=None):
    """denormalize inp"""
    # inp = inp.numpy().transpose((1, 2, 0))
    # Inverse of the initial normalization operation.
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    return inp
    # plt.figure()
    # plt.imshow(inp)
    # if title is not None:
    #     plt.title(title)

In [6]:
def explain(image, batch_predict, num_samples=1000):
    """Returns a byte array representing the image + explanations
    in PNG format."""
    explainer = lime_image.LimeImageExplainer()
    # print("try to explain")
    # print(image)
    explanation = explainer.explain_instance(image, 
                                         batch_predict, # classification function
                                         top_labels=2, 
                                         hide_color=0, 
                                         num_samples=num_samples) # number of images that will be sent to classification function
    # print("explained")
    temp, mask = explanation.get_image_and_mask(explanation.top_labels[0], positive_only=False, num_features=2, hide_rest=False)
    # print(temp, mask)
    temp = denormalize(temp) * 255
    # print(temp)
    image_boundary = segmentation.mark_boundaries(temp, mask)
    # print(("image_boundary", image_boundary))
    # np_array_for_explain = transform_img(filename).numpy()
    img = Image.fromarray((image_boundary).astype('uint8'), mode='RGB')
    # print(img)
    img_byte_arr = io.BytesIO()
    img.save(img_byte_arr, format='PNG')
    img_byte_arr = img_byte_arr.getvalue()
    return img_byte_arr

In [7]:
anvil.server.connect("server_Z2TJZAQV66QSSNBWMV4L4S4A-SDDDJFHQEVJ3JDQW")

Connecting to wss://anvil.works/uplink
Anvil websocket open
Connected to "Published" as SERVER


In [8]:
import time
import os
import copy

# PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
from torchvision import datasets, transforms

# Pennylane
import pennylane as qml
from pennylane import numpy as np

In [9]:
n_qubits = 4                # Number of qubits
step = 0.0004               # Learning rate
batch_size = 4              # Number of samples for each training step
num_epochs = 3              # Number of training epochs
q_depth = 6                 # Depth of the quantum circuit (number of variational layers)
gamma_lr_scheduler = 0.1    # Learning rate reduction applied every 10 epochs.
q_delta = 0.01              # Initial spread of random quantum weights
start_time = time.time()    # Start of the computation timer

In [10]:
dev = qml.device("default.qubit", wires=n_qubits)

In [11]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [12]:
def H_layer(nqubits):
    """Layer of single-qubit Hadamard gates.
    """
    for idx in range(nqubits):
        qml.Hadamard(wires=idx)


def RY_layer(w):
    """Layer of parametrized qubit rotations around the y axis.
    """
    for idx, element in enumerate(w):
        qml.RY(element, wires=idx)


def entangling_layer(nqubits):
    """Layer of CNOTs followed by another shifted layer of CNOT.
    """
    # In other words it should apply something like :
    # CNOT  CNOT  CNOT  CNOT...  CNOT
    #   CNOT  CNOT  CNOT...  CNOT
    for i in range(0, nqubits - 1, 2):  # Loop over even indices: i=0,2,...N-2
        qml.CNOT(wires=[i, i + 1])
    for i in range(1, nqubits - 1, 2):  # Loop over odd indices:  i=1,3,...N-3
        qml.CNOT(wires=[i, i + 1])

In [13]:
@qml.qnode(dev, interface="torch")
def quantum_net(q_input_features, q_weights_flat):
    """
    The variational quantum circuit.
    """

    # Reshape weights
    q_weights = q_weights_flat.reshape(q_depth, n_qubits)

    # Start from state |+> , unbiased w.r.t. |0> and |1>
    H_layer(n_qubits)

    # Embed features in the quantum node
    RY_layer(q_input_features)

    # Sequence of trainable variational layers
    for k in range(q_depth):
        entangling_layer(n_qubits)
        RY_layer(q_weights[k])

    # Expectation values in the Z basis
    exp_vals = [qml.expval(qml.PauliZ(position)) for position in range(n_qubits)]
    return tuple(exp_vals)

In [14]:
class DressedQuantumNet(nn.Module):
    """
    Torch module implementing the *dressed* quantum net.
    """

    def __init__(self):
        """
        Definition of the *dressed* layout.
        """

        super().__init__()
        self.pre_net = nn.Linear(512, n_qubits)
        self.q_params = nn.Parameter(q_delta * torch.randn(q_depth * n_qubits))
        self.post_net = nn.Linear(n_qubits, 2)

    def forward(self, input_features):
        """
        Defining how tensors are supposed to move through the *dressed* quantum
        net.
        """

        # obtain the input features for the quantum circuit
        # by reducing the feature dimension from 512 to 4
        pre_out = self.pre_net(input_features)
        q_in = torch.tanh(pre_out) * np.pi / 2.0

        # Apply the quantum circuit to each element of the batch and append to q_out
        q_out = torch.Tensor(0, n_qubits)
        q_out = q_out.to(device)
        for elem in q_in:
            q_out_elem = quantum_net(elem, self.q_params).float().unsqueeze(0)
            q_out = torch.cat((q_out, q_out_elem))

        # return the two-dimensional prediction from the postprocessing layer
        return self.post_net(q_out)

In [15]:
# model_hybrid = torchvision.models.resnet18(pretrained=True)
from torchvision.models import resnet18, ResNet18_Weights
# model_hybrid = torchvision.models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
model_hybrid = resnet18() # weights=ResNet18_Weights.IMAGENET1K_V1)

for param in model_hybrid.parameters():
    param.requires_grad = False


# Notice that model_hybrid.fc is the last layer of ResNet18
model_hybrid.fc = DressedQuantumNet()

# Use CUDA or CPU according to the "device" object.
model_hybrid = model_hybrid.to(device)

model_hybrid.load_state_dict(torch.load("../model_hybrid_sim.pth", map_location=device))

<All keys matched successfully>

In [16]:
def transform_img(img): # also reads image
    data_transforms = transforms.Compose(
        [
            # transforms.RandomResizedCrop(224),     # uncomment for data augmentation
            # transforms.RandomHorizontalFlip(),     # uncomment for data augmentation
            # transforms.ToPILImage(), # convert to PIL image
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            # Normalize input channels using mean values and standard deviations of ImageNet.
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            # transforms.ToPILImage(), # convert to PIL image
        ]
    )
    image = dicom.dcmread(img) # unsafe?
    arr = image.pixel_array
    image_2d = arr.astype(float)
    image_2d = image_2d / image.LargestImagePixelValue * 255
    image_2d = PIL.Image.fromarray(image_2d) # , mode="L")
    image_2d = image_2d.convert("RGB")
    # image_2d = transform_img(image_2d)
    image_2d = data_transforms(image_2d)
    return image_2d
    # return data_transforms(img)

In [17]:
def model_predict(model, img):
    # print("Type of img:", type(img))
    # print(img.shape)
    # image_2d = transform_img(img)
    image_2d = img
    # image_2d = torch.Tensor(image_2d)
    with torch.no_grad():
        inputs = image_2d.to(device)
        # print(inputs)
        inputs = inputs.unsqueeze(0)
        outputs = model(inputs)
        # print(outputs)
        prob = F.softmax(outputs, dim=1)
        prob = prob[0,1].item() # always get prob of cancer, which is the 1 label
        _, preds = torch.max(outputs, 1)
        # rint(preds)
        #     for j in range(inputs.size()[0]):
        # ground_truth = labels[j].item() # "MALIGNANT" if labels[j].item() else "BENIGN"
        predicted = preds.item()
        # y_true.append(ground_truth)
        # y_pred.append(predicted)
        # print(predicted)
        # print(prob[0,1].item()) # always get prob of cancer, which is the 1 label
    return predicted, prob

In [18]:
def model_predict_transform(model, img):
    return model_predict(transform_img(img))

In [19]:
def batch_predict(np_array):
    # hacky - lime expects x,y,3 and classifier expects 3,x,y
    # rotate things around
    np_array = [torch.tensor(np.transpose(a, (2, 0, 1))) for a in np_array] # array of arrays?
    return [model_predict(model_hybrid, a) for a in np_array]
    # convert np_array to tensor
    img_tensor = torch.tensor(np_array)
    # img_tensor = img_tensor.apply_(transform_img)
    model = model_hybrid
    with torch.no_grad():
        inputs = img_tensor.to(device)
        # print(inputs)
        # inputs = inputs.unsqueeze(0)
        outputs = model(inputs)
        # print(outputs)
        prob = F.softmax(outputs, dim=1)
        # need to always get the cancer prob
        prob = prob[0,1].item() # always get prob of cancer, which is the 1 label
        _, preds = torch.max(outputs, 1)
        # rint(preds)
        #     for j in range(inputs.size()[0]):
        # ground_truth = labels[j].item() # "MALIGNANT" if labels[j].item() else "BENIGN"
        predicted = preds.item()
        # y_true.append(ground_truth)
        # y_pred.append(predicted)
        # print(predicted)
        # print(prob[0,1].item()) # always get prob of cancer, which is the 1 label
    return predicted, prob

In [20]:
@anvil.server.callable
def classify(file, give_explanation=True, num_samples=100): 
    # Classify
    # print(type(file))
    with anvil.media.open(file) as filename:
        # print(type(filename))
        ti = transform_img(filename)
        # print(ti)
        _, pred_cancer = model_predict(model_hybrid, ti)
    # model.predict
    # Give explanation
        if give_explanation:
            # with anvil.media.open(file) as filename:
                # img = load_img(filename)
            # print(filename)
            # np_array_for_explain = transform_img(filename).numpy()
            np_array_for_explain = ti.numpy()
            np_array_for_explain = np.transpose(np_array_for_explain, (1, 2, 0))
            # print(np_array_for_explain)
            img_byte_arr = explain(np_array_for_explain, batch_predict, num_samples=100)
            anvil_img = anvil.BlobMedia(content_type="image/png", content=img_byte_arr, name="explain.png")
            return pred_cancer, anvil_img
        return pred_cancer

In [21]:
import io
import torchvision.io
@anvil.server.callable
def to_png(file):
    with anvil.media.open(file) as filename:
        # print(filename)
        image = dicom.dcmread(filename)
        arr = image.pixel_array
        image_2d = arr.astype(float)
        image_2d = image_2d / image.LargestImagePixelValue * 255
        image_2d = PIL.Image.fromarray(image_2d) # , mode="L")
        image_2d = image_2d.convert("RGB")
        # print(image_2d)
        img_byte_arr = io.BytesIO()
        image_2d.save(img_byte_arr, format='PNG')
        img_byte_arr = img_byte_arr.getvalue()
        # image_2d = transform_img(image_2d) # tensor
        # print(image_2d)
        # image_2d = image_2d.to(torch.uint8)
        # print(image_2d)
        # image_2d = torchvision.io.encode_png(image_2d)
        # print(image_2d)
        # image_2d.seek(0)
        # image_2d = image_2d.read()
        # is a tensor
        # buff = io.BytesIO()
        # torch.save(image_2d, buff)
        # buff.seek(0)
        # print(buff.read())
        # buff.seek(0)
        # img_byte_arr = explain(pill_transf(img), batch_predict, num_samples=100)
        # anvil_img = anvil.BlobMedia(content_type="image/png", content=img_byte_arr, name="explain.png")
        anvil_img = anvil.BlobMedia(content_type="image/png", content=img_byte_arr, name="preview.png")
        return anvil_img

In [22]:
example = "../tciaDownload/1.3.6.1.4.1.9590.100.1.2.100018879311824535125115145152454291132/1-1.dcm"
t=transform_img(example)

In [23]:
predicted, prob = model_predict(model_hybrid, t)
print(predicted)
print(prob)

0
0.4761468768119812
