<a href="https://colab.research.google.com/github/HanyuSun/Medical-Image-Processing/blob/main/Deep_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import tensorflow as tf
tf.test.gpu_device_name()

'/device:GPU:0'

In [None]:
!/opt/bin/nvidia-smi

Mon Jun 21 12:25:26 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.32.03    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   37C    P0    32W / 250W |    347MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [None]:
# example of defining a u-net encoder-decoder generator model
from keras.initializers import RandomNormal
from keras.models import Model
from keras.models import Input
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Activation
from keras.layers import Concatenate
from keras.layers import Dropout
from keras.layers import BatchNormalization
from keras.layers import LeakyReLU
from keras.utils.vis_utils import plot_model
from keras.optimizers import Adam
from PIL import Image
from os import listdir
from numpy import asarray
from numpy import vstack
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
from numpy import savez_compressed
from numpy import load
from random import random
from numpy.random import randint
from matplotlib import pyplot
import numpy as np
from numpy import ones
from numpy import zeros
import os
from os import listdir
import math


def mkdir(path):
 
	folder = os.path.exists(path)
 
	if not folder:                   #判断是否存在文件夹如果不存在则创建为文件夹
		os.makedirs(path)            #makedirs 创建文件时如果路径不存在会创建这个路径
		print ('new folder')
		print ('OK')
	else:
		print ('There is this folder!')
        
# def psnr(img1, img2):
# 	mse = np.mean((img1 - img2) ** 2 )
# 	return 10 * np.log10(255.0**2/mse)

def psnr(img1, img2):
  Diff = np.double(img1) - np.double(img2)
  mse = np.sum(Diff**2)
  return 10 * np.log10(255.0**2/mse)

def mae(img1, img2):
    mae = np.mean( abs(img1 - img2)  )
    return mae 


# define an encoder block
def define_encoder_block(layer_in, n_filters, batchnorm=True):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# add downsampling layer
	g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
	# conditionally add batch normalization
	if batchnorm:
		g = BatchNormalization()(g, training=True)
	# leaky relu activation
	g = LeakyReLU(alpha=0.2)(g)
	return g

# define a decoder block
def decoder_block(layer_in, skip_in, n_filters, dropout=True):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# add upsampling layer
	g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
	# add batch normalization
	g = BatchNormalization()(g, training=True)
	# conditionally add dropout
	if dropout:
		g = Dropout(0.5)(g, training=True)
	# merge with skip connection
	g = Concatenate()([g, skip_in])
	# relu activation
	g = Activation('relu')(g)
	return g

# define the standalone generator model
def define_generator(image_shape=(256,256,1)):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# image input
	in_image = Input(shape=image_shape)
	# encoder model: C64-C128-C256-C512-C512-C512-C512-C512
	e1 = define_encoder_block(in_image, 64, batchnorm=False)
	e2 = define_encoder_block(e1, 128)
	e3 = define_encoder_block(e2, 256)
	e4 = define_encoder_block(e3, 512)
	e5 = define_encoder_block(e4, 512)
	e6 = define_encoder_block(e5, 512)
	e7 = define_encoder_block(e6, 512)
	# bottleneck, no batch norm and relu
	b = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(e7)
	b = Activation('relu')(b)
	# decoder model: CD512-CD1024-CD1024-C1024-C1024-C512-C256-C128
	d1 = decoder_block(b, e7, 512)
	d2 = decoder_block(d1, e6, 512)
	d3 = decoder_block(d2, e5, 512)
	d4 = decoder_block(d3, e4, 512, dropout=False)
	d5 = decoder_block(d4, e3, 256, dropout=False)
	d6 = decoder_block(d5, e2, 128, dropout=False)
	d7 = decoder_block(d6, e1, 64, dropout=False)
	# output
	g = Conv2DTranspose(1, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d7)
	out_image = Activation('tanh')(g)
	# define model
	model = Model(in_image, out_image)
    # compile model
	#opt = Adam(lr=0.0002)
	# model.compile(loss='mae', optimizer=opt)
	return model

# define the discriminator model
def define_discriminator(image_shape):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# source image input
	in_src_image = Input(shape=image_shape)
	# target image input
	in_target_image = Input(shape=image_shape)
	# concatenate images channel-wise
	merged = Concatenate()([in_src_image, in_target_image])
	# C64
	d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(merged)
	d = LeakyReLU(alpha=0.2)(d)
	# C128
	d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C256
	d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C512
	d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# second last output layer
	d = Conv2D(512, (4,4), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# patch output
	patch_out = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
# 	patch_out = Activation('sigmoid')(patch_out)
	# define model
	model = Model([in_src_image, in_target_image], patch_out)
	# compile model
	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss='binary_crossentropy', optimizer=opt, loss_weights=[0.5])
	return model


In [None]:
def U_NET(g_model, image_shape):
	g_model.trainable = True
	in1 = Input(shape=image_shape)
	gen_out1 = g_model(in1)
	model = Model(in1, gen_out1)
	# compile model
	opt = Adam(lr=0.002)
	model.compile(loss='mae', optimizer=opt, loss_weights=1)
	return model

def M_U_NET(g_model1, g_model2, image_shape):
	g_model1.trainable = True
	g_model2.trainable = True
	in1 = Input(shape=image_shape)
	in2 = Input(shape=image_shape)
	gen_out1 = g_model1(in1)
	gen_out2 = g_model2(in2)
	model = Model([in1, in2], [gen_out1, gen_out2])
	# compile model
	opt = Adam(lr=0.002)
	model.compile(loss=['mae', 'mae'], optimizer=opt, loss_weights=[1,1])
	return model

# define the combined generator and discriminator model, for updating the generator
def define_PIX2PIX(g_model, d_model, image_shape):
	# make weights in the discriminator not trainable
	d_model.trainable = False
	# define the source image
	in_src = Input(shape=image_shape)
	# connect the source image to the generator input
	gen_out = g_model(in_src)
	# connect the source input and generator output to the discriminator input
	dis_out = d_model([in_src, gen_out])
	# src image as input, generated image and classification output
	model = Model(in_src, [dis_out, gen_out])
	# compile model
	opt = Adam(lr=0.002, beta_1=0.5)
	model.compile(loss=['binary_crossentropy', 'mae'], optimizer=opt, loss_weights=[0.01,1])
	return model

def define_M_PIX2PIX(g_model1, g_model2, d_model1, d_model2, image_shape):
	# make weights in the discriminator not trainable
	d_model1.trainable = False
	d_model2.trainable = False
	g_model1.trainable = True
	g_model2.trainable = True
	in1 = Input(shape=image_shape)
	in2 = Input(shape=image_shape)
	gen_out1 = g_model1(in1)
	gen_out2 = g_model2(in2)
	# connect the source input and generator output to the discriminator input
	dis_out1 = d_model1([in1, gen_out1])
	dis_out2 = d_model2([in1, gen_out1])
	model = Model([in1, in2], [gen_out1, gen_out2, dis_out1, dis_out2])
	# compile mode
	opt = Adam(lr=0.002, beta_1=0.5)
	model.compile(loss=['mae', 'mae', 'binary_crossentropy', 'binary_crossentropy'], optimizer=opt, loss_weights=[1, 1, 0.01, 0.01])
	return model

In [None]:
def bytescale(data, cmin=None, cmax=None, high=255, low=0):
    """
    Byte scales an array (image).
    Byte scaling means converting the input image to uint8 dtype and scaling
    the range to ``(low, high)`` (default 0-255).
    If the input image already has dtype uint8, no scaling is done.
    This function is only available if Python Imaging Library (PIL) is installed.
    Parameters
    ----------
    data : ndarray
        PIL image data array.
    cmin : scalar, optional
        Bias scaling of small values. Default is ``data.min()``.
    cmax : scalar, optional
        Bias scaling of large values. Default is ``data.max()``.
    high : scalar, optional
        Scale max value to `high`.  Default is 255.
    low : scalar, optional
        Scale min value to `low`.  Default is 0.
    Returns
    -------
    img_array : uint8 ndarray
        The byte-scaled array.
    Examples
    --------
    >>> from scipy.misc import bytescale
    >>> img = np.array([[ 91.06794177,   3.39058326,  84.4221549 ],
    ...                 [ 73.88003259,  80.91433048,   4.88878881],
    ...                 [ 51.53875334,  34.45808177,  27.5873488 ]])
    >>> bytescale(img)
    array([[255,   0, 236],
           [205, 225,   4],
           [140,  90,  70]], dtype=uint8)
    >>> bytescale(img, high=200, low=100)
    array([[200, 100, 192],
           [180, 188, 102],
           [155, 135, 128]], dtype=uint8)
    >>> bytescale(img, cmin=0, cmax=255)
    array([[91,  3, 84],
           [74, 81,  5],
           [52, 34, 28]], dtype=uint8)
    """
    if data.dtype == np.uint8:
        return data

    if high > 255:
        raise ValueError("`high` should be less than or equal to 255.")
    if low < 0:
        raise ValueError("`low` should be greater than or equal to 0.")
    if high < low:
        raise ValueError("`high` should be greater than or equal to `low`.")

    if cmin is None:
        cmin = data.min()
    if cmax is None:
        cmax = data.max()

    cscale = cmax - cmin
    if cscale < 0:
        raise ValueError("`cmax` should be larger than `cmin`.")
    elif cscale == 0:
        cscale = 1

    scale = float(high - low) / cscale
    bytedata = (data - cmin) * scale + low
    return (bytedata.clip(low, high) + 0.5).astype(np.uint8)


# load all images in a directory into memory
def load_images(path, size=(256,256)):
	data_list = list()
	# enumerate filenames in directory, assume all are images
	for filename in listdir(path):
		# load and resize the image
		pixels = load_img(path + filename, target_size=size, color_mode='grayscale')
		# convert to numpy array
		pixels = img_to_array(pixels)
		# store
		data_list.append(pixels)
	return asarray(data_list)

# load and prepare training images
def load_real_samples(filename):
	# load the dataset
	data = load(filename)
	# unpack arrays
	X1, X2, X3 = data['arr_0'], data['arr_1'], data['arr_2']
	# scale from [0,255] to [-1,1]
	X1 = (X1 - 127.5) / 127.5
	X2 = (X2 - 127.5) / 127.5
	X3 = (X3 - 127.5) / 127.5
	return [X1, X2, X3]

In [None]:
# select a batch of random samples, returns images and target
def generate_real_samples(dataset, n_samples, patch_shape):
	# unpack dataset
	trainA1,trainA2, trainB = dataset
	# choose random instances
	ix = randint(0, trainA1.shape[0], n_samples)
	# retrieve selected images
	X1, X2, Y = trainA1[ix], trainA2[ix], trainB[ix]
	# generate 'real' class labels (1)
	y = ones((n_samples, patch_shape, patch_shape, 1))
	return [X1, X2, Y], y

def generate_val_samples(dataset, n_samples):
	# unpack dataset
	valA, valB, real = dataset
	# choose random instances
	ix = randint(0, valA.shape[0], n_samples)
	# retrieve selected images
	X1, X2, Y = valA[ix], valB[ix], real[ix]
	# generate 'real' class labels (1)
	return [X1, X2, Y]  

In [None]:
# generate a batch of images, returns images and targets
def generate_fake_sample1(g_model1, sample1, patch_shape):
	# generate fake instance
	Y1 = g_model1.predict(sample1)
	# create 'fake' class labels (0)
	y = zeros((len(Y1), patch_shape, patch_shape, 1))
	return Y1, y

# generate a batch of images, returns images and targets
def generate_fake_sample2(g_model1, g_model2, sample1, sample2, patch_shape):
	# generate fake instance
	Y1 = g_model1.predict(sample1)
	Y2 = g_model2.predict(sample2)
	# create 'fake' class labels (0)
	y = zeros((len(Y1), patch_shape, patch_shape, 1))
	return [Y1, Y2], y

In [None]:
def summarize_performance1(step, X_in, Y_in, X_out1, path):


	# save plot to file
	filename1 = '%06d_image_plot.png' % (step+1)
	X = bytescale(X_in[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file1 = path + filename1
	img.save(file1)


    
	# save plot to file
	filename2 = '%06d_generate1_plot.png' % (step+1)
	X = bytescale(X_out1[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file1 = path + filename2
	img.save(file1)


    
	# save plot to file
	filename3 = '%06d_label_plot.png' % (step+1)
	X = bytescale(Y_in[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file1 = path + filename3
	img.save(file1)

In [None]:
def summarize_performance2(step, X_in, Y_in, X_out1, X_out2, path):


	# save plot to file
	filename1 = '%06d_image_plot.png' % (step+1)
	X = bytescale(X_in[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file1 = path + filename1
	img.save(file1)


    
	# save plot to file
	filename2 = '%06d_generate1_plot.png' % (step+1)
	X = bytescale(X_out1[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file1 = path + filename2
	img.save(file1)
    
	# save plot to file
	filename4 = '%06d_generate2_plot.png' % (step+1)
	X = bytescale(X_out2[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')    
	file1 = path + filename4
	img.save(file1)

    
	# save plot to file
	filename3 = '%06d_label_plot.png' % (step+1)
	X = bytescale(Y_in[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file1 = path + filename3
	img.save(file1)


In [None]:
# save the generator models to file
def save_model1(step, g_model_AtoB1, path):
	# save the first generator model
	filename1 = '%06d_g_model_AtoB1.h5' % (step+1)
	g_model_AtoB1.save(path+filename1)
	print('>Saved: %s' % (filename1))
# save the generator models to file
def save_model2(step, g_model_AtoB1, g_model_AtoB2, path):
	# save the first generator model
	filename1 = '%06d_g_model_AtoB1.h5' % (step+1)
	filename2 = '%06d_g_model_AtoB2.h5' % (step+1)
	g_model_AtoB1.save(path+filename1)
	g_model_AtoB2.save(path+filename2)
	print('>Saved: %s,%s' % (filename1,filename2))

In [None]:
def generate_real(dataset, ix):
	# unpack dataset
	trainA1, trainA2, trainB = dataset
	# retrieve selected images
	X1, X2, Y = trainA1[[ix]], trainA2[[ix]], trainB[[ix]]
	# generate 'real' class labels (1)
	return [X1, X2, Y]

def test_all(valset, M_model1, M_model2):
	testA1, testA2, testB = valset
	ts = testA1.shape
	n_steps = ts[0]
	p0 = zeros(n_steps)
	p1 = zeros(n_steps)
	p2 = zeros(n_steps)
	for i in range(n_steps):
		# select a batch of real samples
		[X_realA1, X_realA2, X_realB] = generate_real(valset, i)
		# generate a batch of fake samples
		X_fakeBM1 = M_model1.predict(X_realA1)
		X_fakeBM2 = M_model2.predict(X_realA2)
		p0[i] = psnr(X_realA1, X_realB)
		p1[i] = psnr(X_fakeBM1, X_realB)
		p2[i] = psnr(X_fakeBM2, X_realB)
	mp0 = np.mean(p0)
	sp0 = np.var(p0)
	mp1 = np.mean(p1)
	sp1 = np.var(p1)
	mp2 = np.mean(p2)
	sp2 = np.var(p2)
	return mp0,sp0,mp1,sp1,mp2,sp2

In [None]:
# train pix2pix models
def train(u_modelLP, u_model1, u_modelMR, u_model2, m_model1, m_model2, m_model, p_model, p_model1, d_model, M_moel, M_model1, M_model2, d_model1, d_model2, trainset, valset, path_u1, path_u2, path_m, path_p, path_M, n_epochs=50, n_batch=1):
	path_u1v = path_u1+'val/'
	path_u2v = path_u2+'val/'
	path_mv = path_m+'val/'
	path_pv = path_p+'val/'
	path_Mv = path_M+'val/'
	mkdir(path_u1v)
	mkdir(path_u2v)
	mkdir(path_mv)
	mkdir(path_pv)
	mkdir(path_Mv)
	mp_old_u = 0
	mp_old_um = 0
	mp_old_mu = 0
	mp_old_p = 0
	mp_old_mp1 = 0
	mp_old_mp2 = 0
    # unpack dataset
	trainA1, trainA2, trainB = trainset
	valA1, valA2, valB = valset
	# calculate the number of batches per training epoch
	n_patch = d_model.output_shape[1]
	bat_per_epo = int(len(trainA1) / n_batch)
	# calculate the number of training iterations
	n_steps = bat_per_epo * n_epochs
	print('epo: %d' % (n_steps))
	# manually enumerate epochs
	for i in range(n_steps):
		# select a batch of real samples
		[X_realA, X_realB, Y_realA], y_real = generate_real_samples(trainset, n_batch, n_patch)
		[X_realAv, X_realBv, Y_realAv] = generate_val_samples(valset, n_batch)
		# generate a batch of fake samples
		X_fakeAu1, _ = generate_fake_sample1(u_model1, X_realA, n_patch)
		X_fakeAu2, _ = generate_fake_sample1(u_model2, X_realB, n_patch)
		X_fakeAp, y_fakep = generate_fake_sample1(p_model1, X_realA, n_patch)
		[X_fakeAm,X_fakeBm], _ = generate_fake_sample2(m_model1, m_model2, X_realA, X_realB, n_patch)
		[X_fakeAM,X_fakeBM], y_fakeM = generate_fake_sample2(M_model1, M_model2, X_realA, X_realB, n_patch)


		# generate a batch of val samples
		X_fakeAuv1, _ = generate_fake_sample1(u_model1, X_realAv, n_patch)
		X_fakeAuv2, _ = generate_fake_sample1(u_model2, X_realBv, n_patch)
		X_fakeApv, _ = generate_fake_sample1(p_model1, X_realAv, n_patch)
		[X_fakeAmv,X_fakeBmv], _ = generate_fake_sample2(m_model1, m_model2, X_realAv, X_realBv, n_patch)
		[X_fakeAMv,X_fakeBMv], _ = generate_fake_sample2(M_model1, M_model2, X_realAv, X_realBv, n_patch)
    
        
		psnr_t0 = psnr(X_realA[0], Y_realA[0])    
		psnr_tu1 = psnr(X_fakeAu1[0], Y_realA[0])
		psnr_tu2 = psnr(X_fakeAu2[0], Y_realA[0]) 
		psnr_tm1 = psnr(X_fakeAm[0], Y_realA[0])
		psnr_tm2 = psnr(X_fakeBm[0], Y_realA[0])
		psnr_tp = psnr(X_fakeAp[0], Y_realA[0])
		psnr_tM1 = psnr(X_fakeAM[0], Y_realA[0])
		psnr_tM2 = psnr(X_fakeBM[0], Y_realA[0])
        
		psnr_v0 = psnr(X_realAv[0], Y_realAv[0])    
		psnr_vu1 = psnr(X_fakeAuv1[0], Y_realAv[0]) 
		psnr_vu2 = psnr(X_fakeAuv2[0], Y_realAv[0])
		psnr_vm1 = psnr(X_fakeAmv[0], Y_realAv[0])
		psnr_vm2 = psnr(X_fakeBmv[0], Y_realAv[0])
		psnr_vp = psnr(X_fakeApv[0], Y_realAv[0])
		psnr_vM1 = psnr(X_fakeAMv[0], Y_realAv[0])
		psnr_vM2 = psnr(X_fakeBMv[0], Y_realAv[0])

		result2txtt1=str(psnr_t0)
		file_handlet1.write(result2txtt1)
		file_handlet1.write('\n')
        
		result2txtt2=str(psnr_tu1)
		file_handlet2.write(result2txtt2)
		file_handlet2.write('\n')
 
		result2txtt3=str(psnr_tm1)
		file_handlet3.write(result2txtt3)
		file_handlet3.write('\n')
         
		result2txtt4=str(psnr_tm2)
		file_handlet4.write(result2txtt4)
		file_handlet4.write('\n')

		result2txtt5=str(psnr_tp)
		file_handlet5.write(result2txtt5)
		file_handlet5.write('\n')
  
		result2txtt6=str(psnr_tM1)
		file_handlet6.write(result2txtt6)
		file_handlet6.write('\n')

		result2txtt7=str(psnr_tM2)
		file_handlet7.write(result2txtt7)
		file_handlet7.write('\n')
        
 
        ###############

		result2txtv1=str(psnr_v0)
		file_handlev1.write(result2txtv1)
		file_handlev1.write('\n')
        
		result2txtv2=str(psnr_vu1)
		file_handlev2.write(result2txtv2)
		file_handlev2.write('\n')
 
		result2txtv3=str(psnr_vm1)
		file_handlev3.write(result2txtv3)
		file_handlev3.write('\n')
         
		result2txtv4=str(psnr_vm2)
		file_handlev4.write(result2txtv4)
		file_handlev4.write('\n')

		result2txtv5=str(psnr_vp)
		file_handlev5.write(result2txtv5)
		file_handlev5.write('\n')
  
		result2txtv6=str(psnr_vM1)
		file_handlev6.write(result2txtv6)
		file_handlev6.write('\n')

		result2txtv7=str(psnr_vM2)
		file_handlev7.write(result2txtv7)
		file_handlev7.write('\n')

         
		# update the U-NET
		u_loss1 = u_modelLP.train_on_batch(X_realA, Y_realA)
		u_loss2 = u_modelMR.train_on_batch(X_realB, Y_realA)
 		# update the M-U-NET       
		m_loss = m_model.train_on_batch([X_realA, X_realB], [Y_realA, Y_realA])
        
		# update discriminator for real samples
		d_loss1 = d_model.train_on_batch([X_realA, Y_realA], y_real)
		# update discriminator for generated samples
		d_loss2 = d_model.train_on_batch([X_realA, X_fakeAp], y_fakep)
		# update the generator
		p_loss = p_model.train_on_batch(X_realA, [y_real, Y_realA])
              
 		# update the M-PIX2PIX         
		# update discriminator for real samples
		d_loss11 = d_model1.train_on_batch([X_realA, Y_realA], y_real)
		# update discriminator for generated samples
		d_loss12 = d_model1.train_on_batch([X_realA, X_fakeAM], y_fakeM)       
		# update discriminator for real samples
		d_loss21 = d_model2.train_on_batch([X_realB, Y_realA], y_real)
		# update discriminator for generated samples
		d_loss22 = d_model2.train_on_batch([X_realB, X_fakeBM], y_fakeM)
		# update the generator
		M_loss = M_moel.train_on_batch([X_realA, X_realB], [Y_realA, Y_realA, y_real, y_real]) 
        
        
		# result2txt1=str(u_loss1)
		# file_handle1.write(result2txt1)
		# file_handle1.write('\n')
        
		# result2txt2=str(m_loss[0])
		# file_handle2.write(result2txt2)
		# file_handle2.write('\n')
		# result2txt3=str(m_loss[1])
		# file_handle3.write(result2txt3)
		# file_handle3.write('\n')
		# result2txt12=str(m_loss[2])
		# file_handle12.write(result2txt12)
		# file_handle12.write('\n')
        
        
		# result2txt9=str(p_loss[0])
		# file_handle9.write(result2txt9)
		# file_handle9.write('\n')
		# result2txt10=str(p_loss[1])
		# file_handle10.write(result2txt10)
		# file_handle10.write('\n')
		# result2txt11=str(p_loss[2])
		# file_handle11.write(result2txt11)
		# file_handle11.write('\n')
        
		# summarize performance
		print('>%d,psnr_v0[%.3f],psnr_vu1[%.3f],psnr_vu2[%.3f],psnr_vm1[%.3f],psnr_vm2[%.3f],psnr_vp[%.3f],psnr_vM1[%.3f],psnr_vM2[%.3f]'% (i+1,psnr_v0,psnr_vu1,psnr_vu2,psnr_vm1,psnr_vm2,psnr_vp,psnr_vM1,psnr_vM2) )        
        # evaluate the model performance every so often
		if mp_old_mp1<psnr_vM1:
			summarize_performance2(i, X_realAv, Y_realAv, X_fakeAMv, X_fakeBMv, path_Mv)
			mp_old_mp1=psnr_vM1
			print('>%d' % (i+1))
		if mp_old_u<psnr_vu1:
			summarize_performance1(i, X_realAv, Y_realAv, X_fakeAuv1, path_u1v)
			mp_old_u=psnr_vu1
		if mp_old_um<psnr_vu2:
			summarize_performance1(i, X_realBv, Y_realAv, X_fakeAuv2, path_u2v)
			mp_old_um=psnr_vu2
		if mp_old_mu<psnr_vm1:
			summarize_performance2(i, X_realAv, Y_realAv, X_fakeAmv, X_fakeBmv, path_mv)
			mp_old_mu=psnr_vm1
		if mp_old_p<psnr_vp:
			summarize_performance1(i, X_realAv, Y_realAv, X_fakeApv, path_pv)
			mp_old_p=psnr_vp
		if mp_old_mp2<psnr_vM2:
			summarize_performance2(i, X_realAv, Y_realAv, X_fakeAMv, X_fakeBMv, path_Mv)
			mp_old_mp2=psnr_vM2
		if (i+1) % (bat_per_epo*1) == 0:
			# save the models
			# mp0,sp0,mp1,sp1,mp2,sp2 = test_all(valset, u_model1, u_model2)
			# aa1 = ">"+str(i)+",p0:"+str(mp0)+"+"+str(sp0)+",p1:"+str(mp1)+"+"+str(sp1)+",p2:"+str(mp2)+"+"+str(sp2)
			# file_handleMSU.write(aa1)
			# file_handleMSU.write('\n')
			# mp0,sp0,mp1,sp1,mp2,sp2 = test_all(valset, m_model1, m_model2)
			# aa2 = ">"+str(i)+",p0:"+str(mp0)+"+"+str(sp0)+",p1:"+str(mp1)+"+"+str(sp1)+",p2:"+str(mp2)+"+"+str(sp2)
			# file_handleMSmU.write(aa2)
			# file_handleMSmU.write('\n')
			# mp0,sp0,mp1,sp1,mp2,sp2 = test_all(valset, p_model1, p_model1)
			# aa3 = ">"+str(i)+",p0:"+str(mp0)+"+"+str(sp0)+",p1:"+str(mp1)+"+"+str(sp1)+",p2:"+str(mp2)+"+"+str(sp2)
			# file_handleMSp.write(aa3)
			# file_handleMSp.write('\n')
			# summarize_performance1(i, X_realA, Y_realA, X_fakeAu1, path_u1)
			# summarize_performance1(i, X_realB, Y_realA, X_fakeAu2, path_u2)
			# summarize_performance1(i, X_realA, Y_realA, X_fakeAp, path_p)
			# summarize_performance2(i, X_realA, Y_realA, X_fakeAm, X_fakeBm, path_m)
			print('MAX,psnr_vu1[%.3f],psnr_vu2[%.3f],psnr_vm1[%.3f],psnr_vp[%.3f],psnr_vM1[%.3f],psnr_vM2[%.3f]'% (mp_old_u,mp_old_um,mp_old_mu,mp_old_p,mp_old_mp1,mp_old_mp2) )                





            

In [None]:
# load test image data

dataset = load_real_samples('/content/drive/MyDrive/forpaper/all/L2H.npz')
print(dataset[0].shape,dataset[1].shape,dataset[2].shape)

valset = load_real_samples('/content/drive/MyDrive/forpaper/all/vL2H.npz')
print(valset[0].shape,valset[1].shape,valset[2].shape)

(33, 256, 256, 1) (33, 256, 256, 1) (33, 256, 256, 1)
(1, 256, 256, 1) (1, 256, 256, 1) (1, 256, 256, 1)


In [1]:
# define image shape
image_shape = (256,256,1)

u_model1 = define_generator(image_shape)
u_modelLP = U_NET(u_model1, image_shape)

u_model2 = define_generator(image_shape)
u_modelMR = U_NET(u_model2, image_shape)

m_model1 = define_generator(image_shape)
m_model2 = define_generator(image_shape)
m_model = M_U_NET(m_model1, m_model2, image_shape)

d_model = define_discriminator(image_shape)
p_model1 = define_generator(image_shape)
p_model = define_PIX2PIX(p_model1, d_model, image_shape)

M_model1 = define_generator(image_shape)
M_model2 = define_generator(image_shape)
d_model1 = define_discriminator(image_shape)
d_model2 = define_discriminator(image_shape)
M_moel = define_M_PIX2PIX(M_model1,M_model2, d_model1, d_model2, image_shape)

PATH = '/content/drive/MyDrive/forpaper/all/'

# file_handleMSU=open(PATH+'psnr_MS_U.txt',mode='w')
# file_handleMSmU=open(PATH+'psnr_MS_mU.txt',mode='w')
# file_handleMSp=open(PATH+'psnr_MS_p.txt',mode='w')

# file_handle1=open(PATH+'u_lossu.txt',mode='w')

# file_handle2=open(PATH+'mu_lossall.txt',mode='w')
# file_handle3=open(PATH+'mu_loss1.txt',mode='w')
# file_handle12=open(PATH+'mu_loss2.txt',mode='w')

# file_handle9=open(PATH+'p_lossall.txt',mode='w')
# file_handle10=open(PATH+'p_loss.txt',mode='w')
# file_handle11=open(PATH+'pd_loss.txt',mode='w')

file_handlet1=open(PATH+'psnr_t0.txt',mode='w')
file_handlet2=open(PATH+'psnr_tu.txt',mode='w')
file_handlet3=open(PATH+'psnr_tm1.txt',mode='w')
file_handlet4=open(PATH+'psnr_tm2.txt',mode='w')
file_handlet5=open(PATH+'psnr_tp.txt',mode='w')
file_handlet6=open(PATH+'psnr_tM1.txt',mode='w')
file_handlet7=open(PATH+'psnr_tM2.txt',mode='w')

file_handlev1=open(PATH+'psnr_v0.txt',mode='w')
file_handlev2=open(PATH+'psnr_vu.txt',mode='w')
file_handlev3=open(PATH+'psnr_vm1.txt',mode='w')
file_handlev4=open(PATH+'psnr_vm2.txt',mode='w')
file_handlev5=open(PATH+'psnr_vp.txt',mode='w')
file_handlev6=open(PATH+'psnr_vM1.txt',mode='w')
file_handlev7=open(PATH+'psnr_vM2.txt',mode='w')

path_u1 = '/content/drive/MyDrive/forpaper/all/UNET/'
path_u2 = '/content/drive/MyDrive/forpaper/all/UNETm/'
path_m = '/content/drive/MyDrive/forpaper/all/MUNET/'
path_p = '/content/drive/MyDrive/forpaper/all/PIX2PIX/'
path_M = '/content/drive/MyDrive/forpaper/all/MPIX2PIX/'

mkdir(path_u1)
mkdir(path_u2)
mkdir(path_m)
mkdir(path_p)
mkdir(path_M)

# train model
train(u_modelLP, u_model1, u_modelMR, u_model2, m_model1, m_model2, m_model, p_model, p_model1, d_model, M_moel, M_model1, M_model2, d_model1, d_model2,dataset, valset, path_u1,path_u2, path_m, path_p,path_M)

# file_handleMSU.close()
# file_handleMSmU.close()
# file_handleMSp.close()

# file_handle1.close()
# file_handle2.close()
# file_handle3.close()
# file_handle9.close()
# file_handle10.close()
# file_handle11.close()
# file_handle12.close()

file_handlet1.close()
file_handlet2.close()
file_handlet3.close()
file_handlet4.close()
file_handlet5.close()
file_handlet6.close()
file_handlet7.close()

file_handlev1.close()
file_handlev2.close()
file_handlev3.close()
file_handlev4.close()
file_handlev5.close()
file_handlev6.close()
file_handlev7.close()



NameError: ignored

In [None]:
#  save test result
def save_test_result1(step, X_in1, Y_in, X_out1, filepath):

	# save plot to file
	filename1 = '%06d_1_image_plot.png' % (step+1)
	X = bytescale(X_in1[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file1 = filepath + filename1
	img.save(file1)

    
	# save plot to file
	filename2 = '%06d_2_generate1_plot.png' % (step+1)
	X = bytescale(X_out1[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file2 = filepath + filename2    
	img.save(file2)
 
    
	# save plot to file
	filename3 = '%06d_4_label_plot.png' % (step+1)
	X = bytescale(Y_in[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file3 = filepath + filename3
	img.save(file3)


def save_test_result2(step, X_in1, Y_in, X_out1, X_out2, filepath):

	# save plot to file
	filename1 = '%06d_1_image_plot.png' % (step+1)
	X = bytescale(X_in1[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file1 = filepath + filename1
	img.save(file1)

    
	# save plot to file
	filename2 = '%06d_2_generate1_plot.png' % (step+1)
	X = bytescale(X_out1[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file2 = filepath + filename2    
	img.save(file2)
    
 	# save plot to file
	filename3 = '%06d_3_generate2_plot.png' % (step+1)
	X = bytescale(X_out2[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file3 = filepath + filename3    
	img.save(file3)   

    
	# save plot to file
	filename3 = '%06d_4_label_plot.png' % (step+1)
	X = bytescale(Y_in[0], cmin=-1, cmax=1, high=255, low=0)
	X = np.squeeze(X)
	img = Image.fromarray(X, 'L')
	file3 = filepath + filename3
	img.save(file3)

def generate_real_samples(dataset, n_samples):
	# unpack dataset
	trainA, trainB = dataset
	# choose random instances
	ix = randint(0, trainA.shape[0], n_samples)
	# retrieve selected images
	X1, X2 = trainA[ix], trainB[ix]
	# generate 'real' class labels (1)
	return [X1, X2]    
    
def generate_real(dataset, ix):
	# unpack dataset
	trainA1, trainA2, trainB = dataset
	# retrieve selected images
	X1, X2, Y = trainA1[[ix]], trainA2[[ix]], trainB[[ix]]
	# generate 'real' class labels (1)
	return [X1, X2, Y]
    
def test(u_model1, u_model2, m_model1, m_model2, p_model, dataset, savepath_u1,savepath_u2, savepath_m, savepath_p):
	# unpack dataset
	testA1, testA2, testB = dataset
	print(testA1.shape, testA2.shape, testB.shape)
	ts = testA1.shape
	n_steps = ts[0]
	print('num: %d' % (n_steps))
	# manually enumerate epochs
	for i in range(n_steps):
		# select a batch of real samples
		[X_realA1, X_realA2, X_realB] = generate_real(dataset, i)
		# generate a batch of fake samples
		X_fakeBu1 = u_model1.predict(X_realA1)
		X_fakeBu2 = u_model2.predict(X_realA2)
		X_fakeBm1 = m_model1.predict(X_realA1)
		X_fakeBm2 = m_model2.predict(X_realA2)
		X_fakeBp = p_model.predict(X_realA1)
		save_test_result1(i , X_realA1, X_realB, X_fakeBu1, savepath_u1)
		save_test_result1(i , X_realA2, X_realB, X_fakeBu2, savepath_u2)
		save_test_result1(i , X_realA1, X_realB, X_fakeBp, savepath_p)
		save_test_result2(i , X_realA1, X_realB, X_fakeBm1, X_fakeBm2, savepath_m)
		print('>num: %d done' % (i))

In [None]:
# load test image data
testset = load_real_samples('/content/drive/MyDrive/forpaper/3/vL2H.npz')
print(testset[0].shape,testset[1].shape, testset[2].shape)

(170, 256, 256, 1) (170, 256, 256, 1) (170, 256, 256, 1)


In [2]:
# example of using saved cyclegan models for image translation  078375_g_model_AtoB2
from keras.models import load_model

u_model1 = load_model('/content/drive/MyDrive/forpaper/3/UNET/001632_g_model_AtoB1.h5')
u_model2 = load_model('/content/drive/MyDrive/forpaper/3/UNETm/001632_g_model_AtoB1.h5')

m_model1 = load_model('/content/drive/MyDrive/forpaper/3/MUNET/001632_g_model_AtoB1.h5')
m_model2 = load_model('/content/drive/MyDrive/forpaper/3/MUNET/001632_g_model_AtoB2.h5')

p_model = load_model('/content/drive/MyDrive/forpaper/3/PIX2PIX/001632_g_model_AtoB1.h5')

path_u1 = '/content/drive/MyDrive/forpaper/3/UNET/val/'
path_u2 = '/content/drive/MyDrive/forpaper/3/UNETm/val/'
path_m = '/content/drive/MyDrive/forpaper/3/MUNET/val/'
path_p = '/content/drive/MyDrive/forpaper/3/PIX2PIX/val/'

mkdir(path_u1)
mkdir(path_u2)
mkdir(path_m)
mkdir(path_p)
test(u_model1, u_model2, m_model1, m_model2, p_model, testset, path_u1,path_u2, path_m, path_p)

OSError: ignored

In [3]:
import os
import cv2
import numpy
from os import listdir
from numpy import asarray
from numpy import vstack
import shutil

def mkdir(path):
 
	folder = os.path.exists(path)
 
	if not folder:                   #判断是否存在文件夹如果不存在则创建为文件夹
		os.makedirs(path)            #makedirs 创建文件时如果路径不存在会创建这个路径
		print ('new folder')
		print ('OK')
	else:
		print ('There is this folder!')

path_p = 'D:/for_paper/U&MU&P/e2/PIX2PIX/val/'
mkdir(path_p+'image')
mkdir(path_p+'gen1')
mkdir(path_p+'gen2')
mkdir(path_p+'label')
#D:\TEST\pytorch-CycleGAN-and-pix2pix-master\full_data\1-pytorch-CycleGAN-100%-640-256-neck-7-1-r - ta\pytorch-CycleGAN-and-pix2pix-master\results\L2H\test_latest\images
file =os.listdir(path_p)
for file_ in file:     #循环读取每个文件名
    if file_.find("image") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_p+file_,path_p+'image/'+filename)

for file_ in file:     #循环读取每个文件名
    if file_.find("generate1") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_p+file_,path_p+'gen1/'+filename)

# for file_ in file:     #循环读取每个文件名
#     if file_.find("generate2") > 0:
#         filename = file_[0:6] + ".png"
#         print(filename)
#         shutil.copy(path_p+file_,path_p+'gen2/'+filename)

# for file_ in file:     #循环读取每个文件名
#     if file_.find("generate3") > 0:
#         filename = file_[0:6] + ".png"
#         print(filename)
#         shutil.copy(path+file_,path+'gen3/'+filename)
    
        
for file_ in file:     #循环读取每个文件名
    if file_.find("label") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_p+file_,path_p+'label/'+filename)
    
path_m = 'D:/for_paper/U&MU&P/e2/MUNET/val/'
mkdir(path_m+'image')
mkdir(path_m+'gen1')
mkdir(path_m+'gen2')
mkdir(path_m+'label')
#D:\TEST\pytorch-CycleGAN-and-pix2pix-master\full_data\1-pytorch-CycleGAN-100%-640-256-neck-7-1-r - ta\pytorch-CycleGAN-and-pix2pix-master\results\L2H\test_latest\images
file =os.listdir(path_m)
for file_ in file:     #循环读取每个文件名
    if file_.find("image") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_m+file_,path_m+'image/'+filename)

for file_ in file:     #循环读取每个文件名
    if file_.find("generate1") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_m+file_,path_m+'gen1/'+filename)

for file_ in file:     #循环读取每个文件名
    if file_.find("generate2") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_m+file_,path_m+'gen2/'+filename)

# for file_ in file:     #循环读取每个文件名
#     if file_.find("generate3") > 0:
#         filename = file_[0:6] + ".png"
#         print(filename)
#         shutil.copy(path+file_,path+'gen3/'+filename)
    
        
for file_ in file:     #循环读取每个文件名
    if file_.find("label") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_m+file_,path_m+'label/'+filename)


path_u1 = 'D:/for_paper/U&MU&P/e2/UNET1/val/'
mkdir(path_u1+'image')
mkdir(path_u1+'gen1')
mkdir(path_u1+'gen2')
mkdir(path_u1+'label')
#D:\TEST\pytorch-CycleGAN-and-pix2pix-master\full_data\1-pytorch-CycleGAN-100%-640-256-neck-7-1-r - ta\pytorch-CycleGAN-and-pix2pix-master\results\L2H\test_latest\images
file =os.listdir(path_u1)
for file_ in file:     #循环读取每个文件名
    if file_.find("image") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_u1+file_,path_u1+'image/'+filename)

for file_ in file:     #循环读取每个文件名
    if file_.find("generate1") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_u1+file_,path_u1+'gen1/'+filename)

# for file_ in file:     #循环读取每个文件名
#     if file_.find("generate2") > 0:
#         filename = file_[0:6] + ".png"
#         print(filename)
#         shutil.copy(path_p+file_,path_p+'gen2/'+filename)

# for file_ in file:     #循环读取每个文件名
#     if file_.find("generate3") > 0:
#         filename = file_[0:6] + ".png"
#         print(filename)
#         shutil.copy(path+file_,path+'gen3/'+filename)
    
        
for file_ in file:     #循环读取每个文件名
    if file_.find("label") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_u1+file_,path_u1+'label/'+filename)
        
        
path_u2 = 'D:/for_paper/U&MU&P/e2/UNET2/val/'
mkdir(path_u2+'image')
mkdir(path_u2+'gen1')
mkdir(path_u2+'gen2')
mkdir(path_u2+'label')
#D:\TEST\pytorch-CycleGAN-and-pix2pix-master\full_data\1-pytorch-CycleGAN-100%-640-256-neck-7-1-r - ta\pytorch-CycleGAN-and-pix2pix-master\results\L2H\test_latest\images
file =os.listdir(path_u2)
for file_ in file:     #循环读取每个文件名
    if file_.find("image") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_u2+file_,path_u2+'image/'+filename)

for file_ in file:     #循环读取每个文件名
    if file_.find("generate1") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_u2+file_,path_u2+'gen1/'+filename)

# for file_ in file:     #循环读取每个文件名
#     if file_.find("generate2") > 0:
#         filename = file_[0:6] + ".png"
#         print(filename)
#         shutil.copy(path_p+file_,path_p+'gen2/'+filename)

# for file_ in file:     #循环读取每个文件名
#     if file_.find("generate3") > 0:
#         filename = file_[0:6] + ".png"
#         print(filename)
#         shutil.copy(path+file_,path+'gen3/'+filename)
    
        
for file_ in file:     #循环读取每个文件名
    if file_.find("label") > 0:
        filename = file_[0:6] + ".png"
        print(filename)
        shutil.copy(path_u2+file_,path_u2+'label/'+filename)
    
    


new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
new folder
OK
