# Method 4(b): GAMA using DWave quantum annealer

In [None]:
!pip install dimod
!pip install dwave-neal
!pip install dwave-ocean-sdk
!pip install kaggle

from IPython.display import clear_output
clear_output()

In [None]:
# Import the Dwave packages dimod and neal
import dimod
import neal
# Import Matplotlib to generate plots
import matplotlib.pyplot as plt
# Import other necessary libraries
import numpy as np
from scipy.special import gamma
import math
from collections import Counter
import pandas as pd
from itertools import chain
import time
import networkx as nx

import os
import PIL
import PIL.Image
import tensorflow as tf
import tensorflow_datasets as tfds
from matplotlib import image
from matplotlib import pyplot
from os import listdir
from numpy import save
from tensorflow.keras.utils import load_img
from tensorflow.keras.utils import img_to_array

In [None]:
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/

In [None]:
! chmod 600 ~/.kaggle/kaggle.json
! kaggle datasets download pcbreviglieri/pneumonia-xray-images

Downloading pneumonia-xray-images.zip to /content
 99% 1.13G/1.14G [00:16<00:00, 73.6MB/s]
100% 1.14G/1.14G [00:16<00:00, 74.9MB/s]


In [None]:
! unzip pneumonia-xray-images
clear_output()

In [None]:
# resize opacity images from training set

folder = '/content/train/opacity'
photos_op1 = list()

n1 = 200
n2 = 200

# enumerate files in the directory
for file in listdir(folder):
    # load image
    photo = load_img( '/content/train/opacity/'+ file, target_size=(n1, n2), color_mode='grayscale')
    # convert to numpy array
    photo = img_to_array(photo)
    # store in the list of photos
    photos_op1.append(photo)

# convert list of photos and labels to numpy arrays
photos_op = np.array(photos_op1[:1000])

# label = -1 for opacity cases
labels_op = np.array([-1.0]*1000)

op_train = list()
for i in range(1000):
  op_train.append(photos_op[i].reshape(-1))
op_train = np.array(op_train)
op_train = op_train/255.0

print(op_train.shape)

(1000, 40000)


In [None]:
# resize normal images from training set
folder = '/content/train/normal'
photos_no = list()

# enumerate files in the directory
for file in listdir(folder):
    # load image
    photo = load_img( '/content/train/normal/'+ file, target_size=(n1, n2), color_mode='grayscale')
    # convert to numpy array
    photo = img_to_array(photo)
    # store in the list of photos
    photos_no.append(photo)

# convert list of photos and labels to numpy arrays
photos_no = np.array(photos_no[:1000])

# label = +1 for normal cases
labels_no = np.array([1.0]*1000)

no_train = list()
for i in range(1000):
  no_train.append(photos_no[i].reshape(-1))
no_train = np.array(no_train)
no_train = no_train/255.0

print(no_train.shape)

(1000, 40000)


In [None]:
# for normal case images in test data set
folder = '/content/val/normal'
photos_test_no = list()

# enumerate files in the directory
for file in listdir(folder):
    # load image
    photo = load_img( '/content/val/normal/'+ file, target_size=(n1, n2), color_mode='grayscale')
    # convert to numpy array
    photo = img_to_array(photo)
    # store in the list of photos
    photos_test_no.append(photo)

# convert list of photos and labels to numpy arrays
photos_test_no = np.array(photos_test_no)

# label = +1 for normal cases
labels_test_no = np.array([1.0]*photos_test_no.shape[0])

no_test = list()
for i in range(267):
  no_test.append(photos_test_no[i].reshape(-1))
no_test = np.array(no_test)
no_test = no_test/255.0

print(no_test.shape)

(267, 40000)


In [None]:
# for opacity case test data take extra images from train folder that are not used for training

# convert list of photos and labels to numpy arrays
photos_test_op = np.array(photos_op1[1000:2000])

# label = -1 for normal cases
labels_test_op = np.array([-1.0]*photos_test_op.shape[0])

op_test = list()
for i in range(1000):
  op_test.append(photos_test_op[i].reshape(-1))
op_test = np.array(op_test)
op_test = op_test/255.0

print(op_test.shape)

(1000, 40000)


In [None]:
# divide all images and labels in sets of 50 each
train = np.empty((40, 50, 40000))
lab = np.empty((40, 50))
for i in range(40):
    train[i] = np.append(op_train[25*i:25*(i+1)], no_train[25*i:25*(i+1)], axis = 0)
    lab[i] = np.append(labels_op[25*i:25*(i+1)], labels_no[25*i:25*(i+1)])

print(train.shape, lab.shape)

(40, 50, 40000) (40, 50)


In [None]:
# define RBF kernel
import math
def Kernel(mat1, mat2, sigma):
  norm=np.linalg.norm(mat1-mat2)
  k = -(0.5*norm**2)/(sigma**2)
  return np.exp(k)

In [None]:
# evaluate kernel matrix for all image sets
# kern is the list of all 40 kernel matrices
kern=list()
for k in range(40):
  a = np.empty((50,50))
  for i in range(50):
    for j in range(50):
      a[i][j]=Kernel(train[k][i],train[k][j],50)
  kern.append(a)
kern = np.array(kern)

In [None]:
# since the contraint equation is same for all sets, we have only one QUBO
I = np.identity(50)
p = [1,2]
E = np.kron(I,p)

YY = np.outer(lab[0], lab[0])

Q = np.matmul(np.matmul(E.T, YY), E)

In [None]:
!pip install dwave-ocean-sdk
!dwave setup
!dwave ping

In [None]:
import dwave_networkx as dnx
from dwave.system import (DWaveSampler, EmbeddingComposite,
                          FixedEmbeddingComposite)
from pprint import pprint

In [None]:
# Solve the QUBO on DWave quantum annealer
model = dimod.BinaryQuadraticModel.from_qubo(Q)
DWavesampler = EmbeddingComposite(DWaveSampler(solver='Advantage_system6.2'))
DWaveSamples = DWavesampler.sample(bqm=model, num_reads=1000,
                                   return_embedding=True,
                                  #  chain_strength=chain_strength,
                                  #  annealing_time=annealing_time
                                   )

X1 = np.array([[ 0 for j in range(100)] for k in range(1000)])
m = 0

# saving parameter values that give zero energy
for sample,energy, num_occ in DWaveSamples.data(['sample','energy','num_occurrences']):
    if (energy == 0):
        X1[m] = list(sample.values())
        m=m+1

#find the unique solutions
X1 = np.unique(X1, axis = 0)
indices = list()
for l in range(len(X1)):
    for j in range(len(X1)):
        m = 0
        if (l == j):
            m = 1
        else:
            for k in range(100):
                if (X1[l,k] >= X1[j,k]):
                    m += 1
        if (m == 0):
            indices.append(l)
            break
s = np.delete(X1,indices,0)

In [None]:
print('total graver elements:', len(s))

total graver elements: 127


In [None]:
from sympy import *
from typing import Callable
import itertools
import random

In [None]:
# Define rules to choose augmentation element. Here we use the greedy approach
def greedy(iterable):
    for i, val in enumerate(iterable):
        if val[1] != 0:
            return i, val
    else:
        return i, val

In [None]:
# We can just have a single step move (works well with greedy approach)
def single_move(i: int, g: np.ndarray, fun: Callable, x: np.ndarray, x_lo: np.ndarray = None, x_up: np.ndarray = None, laststep: np.ndarray = None) -> (float, int):
    if x_lo is None:
        x_lo = np.zeros_like(x)
    if x_up is None:
        x_up = np.ones_like(x)*max(x)*2

    alpha = 0

    if (x + g <= x_up).all() and (x + g >= x_lo).all():
        if fun(x + g, i) < fun(x, i):
            alpha = 1
    elif (x - g <= x_up).all() and (x - g >= x_lo).all():
        if fun(x - g, i) < fun(x, i) and fun(x - g, i) < fun(x + g, i):
            alpha = -1

    return (fun(x + alpha*g, i), alpha)

In [None]:
x0 = np.zeros(100)
x_lo = np.zeros(100)
x_up = np.ones(100)

In [None]:
# Objective function definition
def f(x_in, i):
    term1 = -np.sum(np.matmul(E, x_in))
    term2 = 0.5*np.matmul(np.matmul(x_in.T, E.T), np.matmul(np.multiply(np.matmul(kern[i].T, kern[i]), YY), np.matmul(E, x_in)))
    return term1+term2

# Constraints definition
def const(x_in):
    rhs = np.zeros(50)
    return np.array_equiv(np.dot(np.matmul(E, x_in), lab[0]),rhs.T) or np.array_equiv(np.dot(np.matmul(E, x_in), lab[0]),rhs)

In [None]:
def augmentation(i=0,grav = s, func = f, x = x0, x_lo = x_lo, x_up = x_up, OPTION: int = 3, VERBOSE: bool = True, itermax: int = 1000) -> (int, float, np.ndarray):
    # Let's perform the augmentation and return the number of steps and the best solution
    # OPTION = 1 # Best augmentation, select using bisection rule
    # OPTION = 2 # Greedy augmentation, select using bisection rule
    # OPTION = 3 # Greedy augmentation, select using first found

    dist = 1
    gprev = None
    k = 1
    if VERBOSE:
        print("Initial point:", x)
        print("Objective function:",func(i,x))
    while dist != 0 and k < itermax:
        if OPTION == 1:
            g1, (obj, dist) = argmin(
                bisection(g=e, fun=func, x=x, laststep=gprev, x_lo=x_lo, x_up=x_up) for e in grav)
        elif OPTION == 2:
            g1, (obj, dist) = greedy(
                bisection(g=e, fun=func, x=x, laststep=gprev, x_lo=x_lo, x_up=x_up) for e in grav)
        elif OPTION == 3:
            g1, (obj, dist) = greedy(
                single_move(i,g=e, fun=func, x=x, x_lo=x_lo, x_up=x_up) for e in grav)
        else:
            print("Option not implemented")
            break
        x = x + grav[g1]*dist
        gprev = grav[g1]
        if VERBOSE:
            print("Iteration ", k)
            print(g1, (obj, dist))
            print("Augmentation direction:", gprev)
            print("Distanced moved:", dist)
            print("Step taken:", grav[g1]*dist)
            print("Objective function:", obj)
            print(func(x))
            print("Current point:", x)
            print("Are constraints satisfied?", const(x))
        else:
            if k%50 == 0:
                print(k)
                print(obj)
        k += 1
    return(k,obj,x)

In [None]:
I = np.identity(50)
p = [1,2]
E = np.kron(I,p)

YY = np.outer(lab[0], lab[0])

Q = np.matmul(np.matmul(E.T, YY), E)

In [None]:
# taking all graver elements
t4=0
feas_sols=s
print(np.array(feas_sols).shape)
init_obj = np.zeros((len(feas_sols),1))
iters_few = np.zeros((len(feas_sols),1))
final_obj_few = np.zeros((len(feas_sols),1))
times_few = np.zeros((len(feas_sols),1))
lagrange=list()
for j in range(40):
 #lagrange.append(list())
 for i, sol in enumerate(feas_sols):
    #if not const(sol):
        #print("Infeasible")
        #pass
    start = time.process_time()
    iter, f_obj, xf = augmentation(j,grav = s, x = sol, func = f, OPTION=3,VERBOSE=False)
    times_few[i] = time.process_time() - start
    iters_few[i] = iter
    final_obj_few[i] = f_obj
    t4=t4+times_few[i]
 for k in range(len(feas_sols)):
    if (final_obj_few[k]==min(final_obj_few)):
        lagrange.append(feas_sols[k])
        break
#print('total augmentation time using all graver element',t4)

(127, 100)


In [None]:
lagrange=np.array(lagrange)

In [None]:
xa=list()
for i in range(40):
  xa.append(np.matmul(E,lagrange[i].T))

In [None]:
# calculate bias for each SVM
bias=list()
for i in range(40):
 b=0
 c=0
 for n in range(50):
   a=0
   for m in range(50):
    a=a+xa[i][m]*lab[i][m]*kern[i][m][n]
   b=b+xa[i][n]*(3-xa[i][n])*(lab[i][n]-a)
   c=c+xa[i][n]*(3-xa[i][n])
 b = b/c
 bias.append(b)

In [None]:
print(len(bias))

40


In [None]:
# testing for normal test images
yy=list()
for i in range(267):
    a=0.00
    tes=list()
    for j in range(40):
        for k in range(50):
                a=a+xa[j][k]*lab[j][k]*Kernel(no_test[i],train[j][k],50)
        a=a+bias[j]
        if a<0:
         tes.append(-1)
        else:
         tes.append(1)
        yy.append(max(set(tes), key=tes.count))


In [None]:
# count for normal images
a=0
for i in range(267):
  if yy[i]==1:
    a=a+1
print(a)

258


In [None]:
# testing for opacity test images
yy=list()
for i in range(1000):
    a=0.00
    tes=list()
    for j in range(40):
        for k in range(50):
                a=a+xa[j][k]*lab[j][k]*Kernel(op_test[i],train[j][k],50)
        a=a+bias[j]
        if a<0:
         tes.append(-1)
        else:
         tes.append(1)
        yy.append(max(set(tes), key=tes.count))

In [None]:
# count for opacity images
a=0
for i in range(1000):
  if yy[i]==-1:
    a=a+1
print(a)

875
