#### Categorize Data (Normal, Pneumonia, Covid) into Seperate Folders

In [27]:
import os
from os import listdir
from os.path import isfile, join
import numpy as np
import matplotlib.pyplot as plt
import shutil
from tqdm import tqdm
import csv
import sys
from skimage.color import rgb2gray
import cv2
from skimage import io 
from skimage.transform import rotate, AffineTransform, warp
import random
from skimage import img_as_ubyte
from skimage.util import random_noise

In [22]:
path_to_xray = "train"
path_to_masks = "mask"
path_to_sorted = "sorted_data"
label_file = "train_split_v2.txt"
label_dict = {"normal":0, "pneumonia":1, "covid-19":2}

In [23]:
"""
Helper function for progress bar
"""
def tqdm_enumerate(iterator):
    i = 0
    for y in tqdm(iterator):
        yield i, y
        i += 1

In [1]:
"""
Loads a files, checks which category it belongs and puts it in a respective folder
"""
def find_and_sort(img=None, category=None):
    if img and category:
        # Build the directory path
        actual_file_path = os.path.join(path_to_xray, img)
        actual_mask_path = os.path.join(path_to_masks, img)
        sorted_img_base = path_to_sorted + "\\" + category + "\\img"
        sorted_mask_base = path_to_sorted + "\\" + category + "\\mask"
        if not os.path.exists(sorted_img_base) or not os.path.exists(sorted_mask_base):
            os.makedirs(sorted_img_base)
            os.makedirs(sorted_mask_base)
        sorted_img_path = os.path.join(sorted_img_base, img)
        sorted_mask_path = os.path.join(sorted_mask_base, img)
        try:
            # Copy the files according to category
            shutil.copy(actual_file_path, sorted_img_path)
            shutil.copy(actual_mask_path, sorted_mask_path)
        except Exception as e:
            print("Error {} occured while copying file {}".format(e, img))
    else:
        print("Image Path Required!")

"""
Reads the text files containng annotations and returns a CSV files
"""
def read_label(file_name=None):
    if file_name:
        file = open(file_name, "r")
        lines = file.readlines()
        # Get count
        covid_counter = 0
        normal_counter = 0
        pn_counter = 0
        with open("labels.csv", 'w', newline='') as csvfile:
            writer = csv.writer(csvfile, delimiter=',')
            # Write header
            writer.writerow(['slice_id', 'type', 'label'])
            for index, data in tqdm_enumerate(lines):
                if index <= len(lines):
                    try:
                        #print("Processing File:", data)
                        # Getting file names and labels right
                        arr = data.strip('\n').split(" ")
                        if len(arr) == 2:
                            tag, label = str(arr[0]), str(arr[1]).lower()
                        elif len(arr) > 2:
                            tag, label = str(arr[-2]), str(arr[-1]).lower()
                        else:
                            print("Unknown Name Format!")
                            continue

                        if label == "normal":
                            normal_counter += 1
                        elif label == "pneumonia":
                            pn_counter += 1
                        elif label == "covid-19":
                            covid_counter += 1
                        else:
                            print("Unidentied Lable Found:", label)
                            
                        # Write it to file
                        writer.writerow([tag, label, label_dict[label]])
                        # Sort files
                        find_and_sort(tag, label)
                    except Exception as e:
                        print("Error {} occured while processing {}".format(data))
                        
        print("Total Cases:\n=========")
        print("Normal:", normal_counter)
        print("Pneumonia:", pn_counter)
        print("Covid-19:", covid_counter)

In [5]:
read_label(label_file)

100%|███████████████████████████████████████████████████████████████████████████| 16576/16576 [01:54<00:00, 144.56it/s]

Total Cases:
Normal: 7966
Pneumonia: 8521
Covid-19: 89





#### Randomize Augmentation of Positive Samples

In [24]:
image_path = "C:\\Users\\AnilYadav\\Desktop\\Projects\\covid-19\\sorted_data\\covid-19\\segmented"
aug_path = "C:\\Users\\AnilYadav\\Desktop\\Projects\\covid-19\\sorted_data\\covid-19\\aug_mask"

In [5]:
#Transformation Functions
def anticlockwise_rotation(image):
    angle= random.randint(0,180)
    return rotate(image, angle)

def clockwise_rotation(image):
    angle= random.randint(0,180)
    return rotate(image, -angle)

def h_flip(image):
    return  np.fliplr(image)

def v_flip(image):
    return np.flipud(image)

def add_noise(image):
    return random_noise(image)

def blur_image(image):
    return cv2.GaussianBlur(image, (9,9),0)

def warp_shift(image): 
    transform = AffineTransform(translation=(0,40))  #chose x,y values according to your convinience
    warp_image = warp(image, transform, mode="wrap")
    return warp_image

In [6]:
transformations = {'rotate anticlockwise': anticlockwise_rotation,
                   'rotate clockwise': clockwise_rotation,
                   'horizontal flip': h_flip, 
                   'vertical flip': v_flip,
                   'adding noise': add_noise,
                   'blurring image':blur_image
                 } 

def augment_images(image_path = None, augmented_path = None, images_to_generate=3000):
    if (image_path and augmented_path) and images_to_generate > 0:
        img_path = [os.path.join(image_path, f) for f in listdir(image_path) if isfile(join(image_path, f))]
        i = 0
        for i in tqdm(range(images_to_generate)):
            image = random.choice(img_path)
            original_image = rgb2gray(io.imread(image))
            transformed_image = None
            n = 0
            transformation_count = random.randint(1, len(transformations)) # choose random no. of transformation to apply
            while n <= transformation_count:
                key = random.choice(list(transformations)) # randomly choosing method to call
                transformed_image = transformations[key](original_image)
                n += 1
                
            ext = ['png', 'jpg', 'jpeg']
            for arg in ext:
                if arg in image:
                    new_image_path = "%s/aug_image_%s.%s" % (augmented_path, i, arg)
            #Convert an image to unsigned byte format, with values in [0, 255].
            transformed_image = img_as_ubyte(transformed_image)  
            cv2.imwrite(new_image_path, transformed_image) 
        print("All Images Augmented!")

In [7]:
augment_images(image_path, aug_path)

  .format(dtypeobj_in, dtypeobj_out))
100%|██████████████████████████████████████████████████████████████████████████████| 3000/3000 [11:45<00:00,  4.25it/s]

All Images Augmented!



