In [1]:
import shutil
import sys
import os
import json
import numpy as np
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import precision_recall_fscore_support, accuracy_score, confusion_matrix
from ultralytics import YOLO
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from ultralytics import YOLO
import ultralytics
import matplotlib.pyplot as plt
from collections import Counter, defaultdict
import matplotlib.ticker as ticker
import warnings
import random
import seaborn as sns
import csv
import datetime

In [13]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
import torchvision
from torchvision.datasets import MNIST
from torchvision import transforms
from IPython.display import display

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.datasets import cifar10
from keras.preprocessing import image
import keras.backend as K
from tensorflow.keras.layers import *


import time
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [3]:
def create_num_to_label():
    with open("IP102/ip102_v1.1/num_label_reduced.txt") as f:
        num_to_label = {}
        for text in f.read().split("\n"):
            n,l = text.split(' ',1);
            num_to_label[n] = l.strip()

    return num_to_label

num_to_label = create_num_to_label()

def create_name_to_num():
    with open("IP102/ip102_v1.1/name_num_table.txt") as f:
        name_to_num = [] # class data
        for l in f.read().split("\n"):
            v = tuple(l.split())
            if v:
                if str(int(v[1])+1) in num_to_label.keys():
                    v = [v[0],str(int(v[1])+1)] # to fit the format in num_to_label.txt
                    name_to_num.append(v)
    
    return np.array(name_to_num)

name_to_num = create_name_to_num()

In [4]:
original_DA_num_to_label = {23:"corn borer", 52:"blister beetle"} # original num_to_label

DA_label_to_num = {"corn borer":0, "blister beetle":1} # num_to_label for CGAN

def assign_augment_img(name_to_num):
    DA_path_to_num = []
    src_directory = "IP102/ip102_v1.1/images/"
    for name, num in zip(name_to_num[:, 0], name_to_num[:, 1]):
        num = int(num)
        if num in np.array(list(original_DA_num_to_label.keys())):
            DA_path_to_num.append([src_directory + name, DA_label_to_num[original_DA_num_to_label[num]]])

    return np.array(DA_path_to_num)


DA_path_to_num = assign_augment_img(name_to_num)
print(len(DA_path_to_num))

3595


In [17]:
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.preprocessing import LabelEncoder

batch_size = 64

img_size = 640
epoch_count = 50
noise_dim = 100 
n_class = 2

tags = list(DA_label_to_num.keys())

# Sample data
image_paths = DA_path_to_num[:,0]
labels = DA_path_to_num[:,1]

# Load images and labels
X_train = []
y_train = []

for path, label in zip(image_paths, labels):
    img = load_img(path, target_size=(img_size,img_size))
    img_array = img_to_array(img)
    X_train.append(img_array)
    y_train.append(label)

X_train = np.array(X_train).astype("float32") / 255.0

# Encode string labels into integers
le = LabelEncoder()
y_train = le.fit_transform(y_train)
y_train = np.array(y_train)

X_train = (X_train - 127.5) / 127.5
dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
dataset = dataset.shuffle(buffer_size=1000).batch(batch_size, drop_remainder=True)

KeyboardInterrupt: 

In [None]:
print(dataset)
print("-------images---------")
print(f"Data Amount: {len(X_train)}")
print(f"Image Size : {len(X_train[0])} x {len(X_train[0,0])}")
print(f"Channel    : {len(X_train[0,0,0])}")
print(f"Shape      : {X_train.shape}")

print("-------labels---------")
print(f"Data Amount: {len(y_train)}")
print(f"Shape      : {y_train.shape}")  

### CGAN with tensorflow

In [None]:
bce_loss = tf.keras.losses.BinaryCrossentropy()

def discriminator_loss(real, fake): # real, contains labels predicted for real images. Similary fake.
    real_loss = bce_loss(tf.ones_like(real), real) # loss for real image; small is better
    fake_loss = bce_loss(tf.zeros_like(fake), fake)# loss for fake image; small is better
    total_loss = real_loss + fake_loss # sum
    return total_loss
  
def generator_loss(preds):
    return bce_loss(tf.ones_like(preds), preds) # loss for fake image; big is better
  
d_optimizer=Adam(learning_rate=0.0002, beta_1 = 0.5) # beta_1 is decay rate for momentum, set to 3
g_optimizer=Adam(learning_rate=0.0002, beta_1 = 0.5)

In [None]:
def build_generator():
    global img_size
    global n_class

    emb_d = 50
    srtd = 2
    convT_krn_s = 4
    qtr_s = int(img_size / (srtd**2))
    h1 = 128
    lrelu_alpha = 0.2
    conv_krn_s = qtr_s
    

    in_label = Input(shape=(1,)) # 1 because of only one label for each image
    li = Embedding(n_class, emb_d)(in_label)

    n_nodes = qtr_s * qtr_s
    li = Dense(n_nodes)(li)
    li = Reshape((qtr_s, qtr_s, 1))(li)
    
    in_lat = Input(shape=(noise_dim,))
    n_nodes =  qtr_s * qtr_s * h1
    gen = Dense(n_nodes, activation=LeakyReLU(alpha=lrelu_alpha))(in_lat)
    gen = Reshape((qtr_s, qtr_s, h1))(gen)
    
    merge = Concatenate()([gen, li])

    gen = Conv2DTranspose(
        h1, (convT_krn_s,convT_krn_s), strides=(srtd, srtd), padding='same')(merge)  
    gen = LeakyReLU(alpha=lrelu_alpha)(gen)

    gen = Conv2DTranspose(
        h1, (convT_krn_s, convT_krn_s), strides=(srtd, srtd), padding='same')(gen)  
    gen = LeakyReLU(alpha=lrelu_alpha)(gen)

    out_layer = Conv2D(
        3, (conv_krn_s, conv_krn_s), activation='tanh', padding='same')(gen) # 3 because of RGB

    model = Model([in_lat, in_label], out_layer)
    return model


g_model = build_generator()
g_model.summary()

In [None]:
def build_discriminator():

    global img_size

    h2 = 128
    krn_s = 3
    strd = 2
    drp_r = 0.4
    lrelu_alpha = 0.2
    
    
    in_label = tf.keras.layers.Input(shape=(1,))
    
    li = tf.keras.layers.Embedding(n_class, 50)(in_label)
    
    n_nodes = img_size * img_size 
    li = tf.keras.layers.Dense(n_nodes)(li) 
    
    li = tf.keras.layers.Reshape((img_size, img_size, 1))(li) 
    
    in_image = tf.keras.layers.Input(shape=(img_size, img_size, 3)) 
    
    merge = tf.keras.layers.Concatenate()([in_image, li]) 
    
    fe = tf.keras.layers.Conv2D(h2, (krn_s,krn_s), strides=(strd,strd), padding='same')(merge) 
    fe = tf.keras.layers.LeakyReLU(alpha=lrelu_alpha)(fe)
    
    fe = tf.keras.layers.Conv2D(h2, (krn_s,krn_s), strides=(strd,strd), padding='same')(fe) 
    fe = tf.keras.layers.LeakyReLU(alpha=lrelu_alpha)(fe)
    
    fe = tf.keras.layers.Flatten()(fe) 
    
    fe = tf.keras.layers.Dropout(drp_r)(fe)
    
    out_layer = tf.keras.layers.Dense(1, activation='sigmoid')(fe)
    
    model = Model([in_image, in_label], out_layer)
    
    return model


d_model = build_discriminator()
d_model.summary()

In [10]:
@tf.function
def train_step(dataset):
   
    real_images, real_labels = dataset
 
    random_latent_vectors = tf.random.normal(shape=(batch_size, noise_dim))
    generated_images = g_model(
        [random_latent_vectors, real_labels]
    )

    with tf.GradientTape() as tape:
        pred_fake = d_model([generated_images, real_labels])
        pred_real = d_model([real_images, real_labels])
        
        d_loss = discriminator_loss(pred_real, pred_fake)
      
    grads = tape.gradient(d_loss, d_model.trainable_variables)
   
    d_optimizer.apply_gradients(zip(grads, d_model.trainable_variables))

   
    random_latent_vectors = tf.random.normal(shape=(batch_size, noise_dim))
   

    with tf.GradientTape() as tape:
        fake_images = g_model({"noise_input": random_latent_vectors, "label_input": real_labels})
        predictions = d_model([fake_images, real_labels])
        g_loss = generator_loss(predictions)
    
    grads = tape.gradient(g_loss, g_model.trainable_variables)
    g_optimizer.apply_gradients(zip(grads, g_model.trainable_variables))
    
    return d_loss, g_loss

In [11]:
def show_samples(num_samples, n_class, g_model):
    fig, axes = plt.subplots(n_class,num_samples, figsize=(10,5)) 
    fig.tight_layout()

    for l in np.arange(n_class):
      random_noise = tf.random.normal(shape=(num_samples, noise_dim))
      label = tf.ones(num_samples)*l
      label = tf.constant([[l]] * num_samples, dtype=tf.int32) # make values integer to be fed into Embedding
        
      gen_imgs = g_model.predict([random_noise, label])
      for j in range(gen_imgs.shape[0]):
        img = image.array_to_img(gen_imgs[j], scale=True)
        axes[l,j].imshow(img)
        axes[l,j].yaxis.set_ticks([])
        axes[l,j].xaxis.set_ticks([])

        if j ==0:
          axes[l,j].set_ylabel(tags[l])
    plt.show()

In [11]:
def train(dataset, epochs=epoch_count):

    for epoch in range(epochs):
        print('Epoch: ', epochs)
        d_loss_list = []
        g_loss_list = []
        q_loss_list = []
        start = time.time()
        
        itern = 0
        for image_batch in tqdm(dataset):
            real_images, real_labels = image_batch
            d_loss, g_loss = train_step(image_batch)
            d_loss_list.append(d_loss)
            g_loss_list.append(g_loss)
            itern=itern+1
        if epoch % 10 == 0:
            show_samples(3, n_class, g_model)
            
        print (f'Epoch: {epoch} -- Generator Loss: {np.mean(g_loss_list)}, Discriminator Loss: {np.mean(d_loss_list)}\n')
        print (f'Took {time.time()-start} seconds. \n\n')
        

model = train(dataset, epochs=epoch_count)
model.save(f"GAN_result/{str(datetime.datetime.today())}")

Epoch:  5000


  4%|█▌                                          | 1/28 [00:06<03:07,  6.95s/it]

KeyboardInterrupt



KeyboardInterrupt: 