In [1]:
# install if necessary
!pip install --upgrade pip
!pip install pillow



In [5]:
import os
import tarfile
import urllib.request
import pickle
import numpy as np
from PIL import Image, ImageDraw
from IPython.display import display
from ipywidgets import widgets
import tqdm
from ipywidgets import Layout

url = "https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz"
download_dir = "cifar10_dataset"
processed_images_box_dir = "processed_images_boxes"
processed_images_box_dir_in_batches = "processed_images_boxes_in_batches"
processed_batches_noise_dir = "processed_images_gaussian_noise"
processed_batches_noise_dir_in_batches = "processed_images_gaussian_noise_in_batches"

grid_size = 3
box_sizes = [10, 11, 11]

def download_and_extract():
    file_path = os.path.join(download_dir, "cifar-10-python.tar.gz")
    os.makedirs(download_dir, exist_ok=True)
    if not os.path.isfile(file_path):
        print("Downloading CIFAR-10 dataset...")
        urllib.request.urlretrieve(url, file_path)
    with tarfile.open(file_path) as tar:
        tar.extractall(path=download_dir)
        print("Download and extraction completed.")

def load_cifar10_data():
    files = [
        'cifar-10-batches-py/data_batch_1', 
        'cifar-10-batches-py/data_batch_2', 
        'cifar-10-batches-py/data_batch_3', 
        'cifar-10-batches-py/data_batch_4',
        'cifar-10-batches-py/data_batch_5',
        # 'cifar-10-batches-py/test_batch'
    ]
    data = []
    labels = []
    for file in files:
        with open(os.path.join(download_dir, file), 'rb') as f:
            cifar10_data_dict = pickle.load(f, encoding='bytes')
            data.append(cifar10_data_dict[b'data'])
            labels.append(cifar10_data_dict[b'labels'])
    data = np.concatenate(data)
    labels = np.concatenate(labels)
    data = data.reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)
    print("CIFAR-10 data loaded.")
    return data, labels

def process_images_with_box(train_images):
    processed_images = []
    for i in tqdm.tqdm(range(train_images.shape[0])):
        for j in range(grid_size):
            for k in range(grid_size):
                image_copy = train_images[i].copy()
                image = Image.fromarray(image_copy)
                draw = ImageDraw.Draw(image)
                top_left = (sum(box_sizes[:j]), sum(box_sizes[:k]))
                bottom_right = (top_left[0] + box_sizes[j], top_left[1] + box_sizes[k])
                draw.rectangle([top_left, bottom_right], fill="black")
                processed_images.append(np.array(image))
    print("Images processed in memory.")
    return processed_images

def save_box_images_to_disk(processed_images):
    os.makedirs(processed_images_box_dir, exist_ok=True)
    count = 0
    for i in range(len(processed_images) // (grid_size * grid_size)):
        for j in range(grid_size):
            for k in range(grid_size):
                image_array = processed_images[count]
                image = Image.fromarray(image_array)
                image.save(os.path.join(processed_images_box_dir, f"image_{i}_{j}_{k}.png"))
                count += 1
    print("Images saved to disk.")

def save_box_images_to_disk_in_batches(processed_images, train_labels):
    os.makedirs(processed_images_box_dir_in_batches, exist_ok=True)
    processed_images = np.array(processed_images)
    num_batches = 5
    batch_size = processed_images.shape[0] // num_batches
    processed_batches = np.array_split(processed_images, num_batches)
    label_batches = np.array_split(train_labels, num_batches)
    for i, (data_batch, label_batch) in enumerate(zip(processed_batches, label_batches)):
        output_dict = {
            'batch_label': f'processed_batch_{i+1}',
            'data': data_batch,
            'labels': label_batch
        }
        with open(os.path.join(processed_images_box_dir_in_batches, f'processed_batch_{i+1}.pickle'), 'wb') as f:
            pickle.dump(output_dict, f)
    print("Processed images saved.")
    
def generate_gaussian_noise(shape, seed=None):
    if seed is not None:
        np.random.seed(seed)
    return np.random.normal(0, 1, shape)

def process_images_with_gaussian_noise(train_images):
    processed_images = []
    grid_size = 3
    box_sizes = [10, 11, 11]
    for i in tqdm.tqdm(range(train_images.shape[0])):
        for j in range(grid_size):
            for k in range(grid_size):
                image_copy = train_images[i].copy().astype(np.int32)
                gaussian_noise = generate_gaussian_noise((box_sizes[j], box_sizes[k], 3), 42)
                gaussian_noise = (gaussian_noise * 30).astype(np.int32)
                image_copy[sum(box_sizes[:j]):sum(box_sizes[:j])+box_sizes[j], sum(box_sizes[:k]):sum(box_sizes[:k])+box_sizes[k]] += gaussian_noise
                image_copy = np.clip(image_copy, 0, 255)
                processed_images.append(image_copy.astype('uint8'))
    print("Images processed with Gaussian noise in memory.")
    return processed_images

def save_gaussian_noise_images_to_disk(processed_images):
    os.makedirs(processed_batches_noise_dir, exist_ok=True)
    count = 0
    for i in range(len(processed_images) // (grid_size * grid_size)):
        for j in range(grid_size):
            for k in range(grid_size):
                image_array = processed_images[count]
                image = Image.fromarray(image_array)
                image.save(os.path.join(processed_batches_noise_dir, f"image_{i}_{j}_{k}.png"))
                count += 1
    print("Gaussian noise images saved to disk.")

def save_gaussian_noise_images_to_batches(processed_images, train_labels):
    os.makedirs(processed_batches_noise_dir_in_batches, exist_ok=True)
    processed_images = np.array(processed_images)
    num_batches = 5
    processed_batches = np.array_split(processed_images, num_batches)
    label_batches = np.array_split(train_labels, num_batches)
    for i, (data_batch, label_batch) in enumerate(zip(processed_batches, label_batches)):
        output_dict = {
            'batch_label': f'processed_batch_{i+1}',
            'data': data_batch,
            'labels': label_batch
        }
        with open(os.path.join(processed_batches_noise_dir_in_batches, f'processed_batch_{i+1}.pickle'), 'wb') as f:
            pickle.dump(output_dict, f)
    print("Processed images with Gaussian noise saved in batches.")



In [8]:
button_layout = Layout(width='300px')
button1 = widgets.Button(description="Download and extract CIFAR-10 to disk", layout=button_layout)
button1.on_click(lambda x: download_and_extract())
display(button1)

# load cifar-10 data to memory

data = None
labels = None

def load_data_wrapper(x):
    global data, labels
    data, labels = load_cifar10_data()

button2 = widgets.Button(description="Load CIFAR-10 data to memory", layout=button_layout)
button2.on_click(load_data_wrapper)
display(button2)

# process images by adding boxes

processed_images = None

def process_images_wrapper(x):
    global processed_images
    processed_images = process_images_with_box(data)

button3 = widgets.Button(description="Process box images in memory", layout=button_layout)
button3.on_click(process_images_wrapper)
display(button3)

def save_images_wrapper(x):
    save_box_images_to_disk(processed_images)

button4 = widgets.Button(description="Save box images to disk", layout=button_layout)
button4.on_click(save_images_wrapper)
display(button4)

def save_processed_images_wrapper(x):
    save_box_images_to_disk_in_batches(processed_images, labels)

button5 = widgets.Button(description="Save box images to disk in batches", layout=button_layout)
button5.on_click(save_processed_images_wrapper)
display(button5)

# process images by adding gaussian noise

gaussian_noise_images = None

def process_gaussian_noise_images_wrapper(x):
    global gaussian_noise_images
    gaussian_noise_images = process_images_with_gaussian_noise(data)

button6 = widgets.Button(description="Process images with Gaussian noise in memory", layout=button_layout)
button6.on_click(process_gaussian_noise_images_wrapper)
display(button6)

def save_gaussian_noise_images_to_disk_wrapper(x):
    save_gaussian_noise_images_to_disk(gaussian_noise_images)

button7 = widgets.Button(description="Save Gaussian noise images to disk", layout=button_layout)
button7.on_click(save_gaussian_noise_images_to_disk_wrapper)
display(button7)

def save_gaussian_noise_images_to_batches_wrapper(x):
    save_gaussian_noise_images_to_batches(gaussian_noise_images, labels)

button8 = widgets.Button(description="Save Gaussian noise images to disk in batches", layout=button_layout)
button8.on_click(save_gaussian_noise_images_to_batches_wrapper)
display(button8)

Button(description='Download and extract CIFAR-10 to disk', layout=Layout(width='300px'), style=ButtonStyle())

Button(description='Load CIFAR-10 data to memory', layout=Layout(width='300px'), style=ButtonStyle())

Button(description='Process box images in memory', layout=Layout(width='300px'), style=ButtonStyle())

Button(description='Save box images to disk', layout=Layout(width='300px'), style=ButtonStyle())

Button(description='Save box images to disk in batches', layout=Layout(width='300px'), style=ButtonStyle())

Button(description='Process images with Gaussian noise in memory', layout=Layout(width='300px'), style=ButtonS…

Button(description='Save Gaussian noise images to disk', layout=Layout(width='300px'), style=ButtonStyle())

Button(description='Save Gaussian noise images to disk in batches', layout=Layout(width='300px'), style=Button…

CIFAR-10 data loaded.


100%|██████████| 60000/60000 [00:43<00:00, 1387.00it/s]

Images processed in memory.





Processed images saved.
