In [None]:
import numpy as np
import tensorflow as tf
from sklearn.utils import shuffle
import os
import keras
import cv2
from tqdm import tqdm

this function get 4 inputs
* `data`: this is the train pics which is gonna get augmented.
* `label`: labels of those train pics
* `augmentation_num`: number of times which we wanna pics get augmented.
* `batch_size`: we wanna batch our datas so we can give them batch by batch it this make us able to get pic f.e if we set our `batch_size` 32 we append our pics in batches with 32 pics in them

In [None]:
# it takes pics and its labels and divide it into batches and augment each batch
def pic_augmentation(data , label , augmentation_num , batch_size):
    length = len(label)
    ultimate_list = []
    list_of_pic_x = []
    list_of_pic_y = []
    for index in range(length):
        pic = data[index]
        y = label[index]
        for num in range(augmentation_num):
            Seed = (num , 0)
            pic = tf.image.stateless_random_brightness(pic , max_delta = 0.3 , seed = Seed)
            pic = tf.image.stateless_random_contrast(pic , lower = 0.1 , upper = 0.9 , seed = Seed)
            pic = tf.image.stateless_random_flip_left_right(pic , seed = Seed)
            augmented_pic = tf.image.stateless_random_flip_up_down(pic , seed = Seed)
            # tf.image.stateless_random_jpeg_quality(pic,?)
            # tf.image.stateless_random_crop(pic,?)
            # tf.image.stateless_random_saturation(pic)
            # tf.image.stateless_random_hue(pic)
            # tf.image.stateless_sample_distorted_bounding_box(pic)
            data.append(augmented_pic)
            label.append(y)
    shuffled_data , shuffled_label = shuffle(data , label)
    for data , label in zip(shuffled_data , shuffled_label):
        list_of_pic_x.append(data)
        list_of_pic_y.append(label)
        if len(list_of_pic_x) == batch_size:
            ultimate_list.append((np.array(list_of_pic_x) , np.array(list_of_pic_y)))
            list_of_pic_x = []
            list_of_pic_y = []
    return ultimate_list

this function give us total number of pics in every folder(classes)

In [None]:
# get num of all pics
def get_len(the_path):
    length = 0
    categories = os.listdir(the_path)
    for category in categories:
        Path = os.path.join(the_path , category)
        length = length + len(os.listdir(Path))
    return length

this function take these 4 inputs:
* `Batch`: f.e we have 200 batches in every epoch. our `Batch` will be in range of 1-200. its indicate our batch number
* `Big_Batch_size`: this variable indicates number of *paths of pics* into a list. f.e we have 1000 pics to train on. if this variable be 250 we will load **paths** of 250 pics into a list
* `the_path`: the path which we have our pics in
* `Batch_size`: our batch size which we choose for our training process

and after loading we shuffle the list and return it

In [None]:
# get path of a big batch of images that we will load portion of it into ram using the paths in big batch
def get_image_big_batch(Batch , Big_Batch_size , the_path , Batch_size):
    categories = os.listdir(the_path)
    Start = Big_Batch_size * (Batch // ((Big_Batch_size * len(categories)) // Batch_size))
    categories = os.listdir(the_path)
    MyDataSet = []
    for category in categories:
        label = categories.index(category)
        Path = os.path.join(the_path , category)
        if len(os.listdir(Path)) > Start + Big_Batch_size:
            for pic_name in os.listdir(Path)[Start : Start + Big_Batch_size]:
                MyDataSet.append((os.path.join(Path , pic_name) , label))
        else:
            for pic_name in os.listdir(Path)[Start : ]:
                MyDataSet.append((os.path.join(Path , pic_name) , label))
    shuffled_MyDataSet = shuffle(MyDataSet)
    return shuffled_MyDataSet


this function load a part of Big Batch into our ram. the whole process is that we load a list of pics after that we load portion of it into our ram for training. this function take 4 inputs:
* `batch`: f.e we have 200 batches in every epoch. our `Batch` will be in range of 1-200. its indicate our batch number
* `portion_size`: the size of portion we want to load. `portion_size`= 10 we load 10 pics from our Big Batch
* `the_big_batch`: the Big Batch to load from
* `pic_size`: indicates the size of our pic. must be a tuple 


In [None]:
# loading a portion of pics into ram
def get_portion_from_big_batch(batch , portion_size , the_big_batch , pic_size: tuple[int , int]):
    Start = portion_size * (batch % (len(the_big_batch) // portion_size))
    X_portion_of_MyDataSet = []
    Y_portion_of_MyDataSet = []
    for path , Y in the_big_batch[Start : Start + portion_size]:
        try:
            image = cv2.imread(path)
            resized = cv2.resize(image , pic_size)
            X_portion_of_MyDataSet.append(resized)
            Y_portion_of_MyDataSet.append(Y)
        except:
            pass

    return X_portion_of_MyDataSet , Y_portion_of_MyDataSet

In [None]:
def train_on_batch(the_path , batch_size , portion_size , Big_Batch_size , n_epoch , pic_size: tuple[int , int] , model , augmentation_num = 0):
    loss_history = []
    acc_history = []
    categories = os.listdir(the_path)
    for epoch in range(n_epoch):
        for batch in tqdm(range((get_len(the_path)) // batch_size)):
            if batch % (((Big_Batch_size * len(categories)) // batch_size)) == 0:
                access_1 = 'granted'
            else:
                access_1 = 'not granted'
            if access_1 == 'granted':
                Big_Batch = get_image_big_batch(batch , Big_Batch_size , the_path , batch_size)   
            portion_x , portion_y = get_portion_from_big_batch(batch , portion_size , Big_Batch , pic_size)
            list_of_augmented_pic = zip(portion_x , portion_y)
            if augmentation_num > 0:
                list_of_augmented_pic = pic_augmentation(portion_x , portion_y , augmentation_num , batch_size)
            for batch_x , batch_y in list_of_augmented_pic:
                loss , acc = model.train_on_batch(np.array(batch_x) , np.array(batch_y))
                loss_history.append(loss)
                acc_history.append(acc)
        print('Epoch: %d ,     Train Loss %.4f,    Train Acc. %.4f' % (epoch+1 , loss , acc))
    return {'loss_history' : loss_history , 'acc_history' : acc_history}