In [6]:
from google.colab import files
import os
from PIL import Image, ImageOps
import numpy as np
import imgaug.augmenters as iaa
import shutil

# Step 1: Upload the dataset
uploaded = files.upload()

# Step 2: Remove existing directory if it exists to avoid conflicts
if os.path.exists('/content/dataset-loz2'):
    shutil.rmtree('/content/dataset-loz2')

# Step 3: Unzip the uploaded dataset (assuming the dataset is a zip file) with overwrite option
!unzip -o dataset-loz2.zip -d dataset-loz2

# Step 4: Define paths for input and output folders
input_folder = '/content/dataset-loz2'
output_folder = '/content/processed_dataset-loz2'

# Ensure output folder exists
os.makedirs(output_folder, exist_ok=True)

# Augmentation sequence (excluding scaling as per instructions)
augmentation_sequence = iaa.Sequential([
    iaa.Fliplr(0.5), # horizontal flips
    iaa.Affine(rotate=(-45, 45)), # rotation
    iaa.AdditiveGaussianNoise(scale=(0.1*255, 0.2*255)), # medium noise
    iaa.ElasticTransformation(alpha=50, sigma=5), # distortions
    iaa.Multiply((0.8, 1.2)), # change brightness
    iaa.LinearContrast((0.75, 1.5)) # change contrast
])

def convert_and_autocontrast(image_path):
    """Convert image to grayscale and apply auto contrast."""
    image = Image.open(image_path).convert('L') # Convert to grayscale
    image = ImageOps.autocontrast(image) # Apply auto contrast
    return image

def cut_image_into_fragments(image, fragment_size=(128, 128)):
    """Cut image into smaller fragments."""
    fragments = []
    image_width, image_height = image.size
    for i in range(0, image_width, fragment_size[0]):
        for j in range(0, image_height, fragment_size[1]):
            box = (i, j, i + fragment_size[0], j + fragment_size[1])
            fragment = image.crop(box)
            fragments.append(fragment)
    return fragments

def augment_and_save_fragments(fragments, output_path, augmentations):
    """Apply augmentations to image fragments and save them."""
    for idx, fragment in enumerate(fragments):
        # Convert to numpy array
        fragment_array = np.array(fragment)

        # Apply augmentations
        images_aug = augmentations(images=[fragment_array])

        # Save augmented images with 5-6 rotations
        for i, img_aug in enumerate(images_aug):
            for rot in range(5, 7): # 5-6 rotations
                img_rot = iaa.Affine(rotate=(rot * 60)).augment_image(img_aug) # rotate each by 60 degrees
                img_rot = Image.fromarray(img_rot)
                fragment_filename = f"fragment_{idx}_aug_{i}_rot_{rot}.jpg"
                img_rot.save(os.path.join(output_path, fragment_filename))

# Process all images in the input folder
for subdir, _, files in os.walk(input_folder):
    for file in files:
        file_path = os.path.join(subdir, file)
        output_subdir = os.path.join(output_folder, os.path.relpath(subdir, input_folder))
        os.makedirs(output_subdir, exist_ok=True)

        # Convert and apply auto contrast
        processed_image = convert_and_autocontrast(file_path)

        # Cut into fragments
        fragments = cut_image_into_fragments(processed_image)

        # Apply augmentations and save
        augment_and_save_fragments(fragments, output_subdir, augmentation_sequence)

print("Image processing and augmentation completed.")

# Create a zip archive of the processed dataset
shutil.make_archive('processed_dataset-loz2', 'zip', '/content/processed_dataset-loz2')

# Ensure correct import of the files module and download the zip file
import google.colab.files as gfiles
gfiles.download('processed_dataset-loz2.zip')


KeyboardInterrupt



Код сверху запускался, файл загружается. Он в последний момент слетел, а заново долго грузить, не стал перезапускать

In [1]:
import os

# List the contents of the current working directory
print(os.listdir('/content'))

# Verify the file path
zip_file_path = '/content/processed_dataset-loz2.zip'
if os.path.exists(zip_file_path):
    print(f"File found: {zip_file_path}")
else:
    print(f"File not found: {zip_file_path}")


['.config', 'dataset-loz2.zip', '.ipynb_checkpoints', 'processed_dataset-loz2.zip', 'sample_data']
File found: /content/processed_dataset-loz2.zip


In [3]:
from google.colab import files

# Upload the zip file
uploaded = files.upload()

# Check the uploaded file
zip_file_path = list(uploaded.keys())[0]
print(f"Uploaded file: {zip_file_path}")


Saving processed_dataset-loz2.zip to processed_dataset-loz2 (1).zip
Uploaded file: processed_dataset-loz2 (1).zip


In [4]:
import os
import zipfile
import shutil

# Define the extraction folder
extracted_folder_path = '/content/processed_dataset-loz2'

# Check if the uploaded file is a valid zip file
try:
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        print("File is a valid zip file.")

        # Remove existing directory if it exists to avoid conflicts
        if os.path.exists(extracted_folder_path):
            shutil.rmtree(extracted_folder_path)

        # Extract the uploaded zip file
        zip_ref.extractall(extracted_folder_path)

        # List the contents of the extracted folder
        print(os.listdir(extracted_folder_path))
except zipfile.BadZipFile:
    print("Error: The file is not a valid zip file.")


File is a valid zip file.
['dataset2']


In [7]:
from google.colab import files
import os
import zipfile
import shutil


# Get the uploaded file name
zip_file_path = list(uploaded.keys())[0]
print(f"Uploaded file: {zip_file_path}")

# Step 2: Define the extraction folder
extracted_folder_path = '/content/processed_dataset-loz2'

# Remove existing directory if it exists to avoid conflicts
if os.path.exists(extracted_folder_path):
    shutil.rmtree(extracted_folder_path)

# Check if the uploaded file is a valid zip file and extract it
try:
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        print("File is a valid zip file.")

        # Extract the uploaded zip file
        zip_ref.extractall(extracted_folder_path)

        # List the contents of the extracted folder
        print("Contents of the extracted folder:")
        for root, dirs, files in os.walk(extracted_folder_path):
            for name in dirs:
                print(f"Directory: {os.path.join(root, name)}")
            for name in files:
                print(f"File: {os.path.join(root, name)}")
except zipfile.BadZipFile:
    print("Error: The file is not a valid zip file.")

# Step 3: Function to count the number of images in a directory
def count_images_in_directory(directory, extensions=['.jpg', '.jpeg', '.png']):
    count = 0
    for root, dirs, files in os.walk(directory):
        for file in files:
            if any(file.lower().endswith(ext) for ext in extensions):
                count += 1
    return count

# Count the number of images in the processed_dataset-loz2 directory
num_images = count_images_in_directory(extracted_folder_path)
print(f'Total number of images in processed_dataset-loz2: {num_images}')


[1;30;43mВыходные данные были обрезаны до нескольких последних строк (5000).[0m
File: /content/processed_dataset-loz2/dataset2/-35/M/fragment_114_aug_0_rot_5.jpg
File: /content/processed_dataset-loz2/dataset2/-35/M/fragment_130_aug_0_rot_5.jpg
File: /content/processed_dataset-loz2/dataset2/-35/M/fragment_249_aug_0_rot_6.jpg
File: /content/processed_dataset-loz2/dataset2/-35/M/fragment_161_aug_0_rot_5.jpg
File: /content/processed_dataset-loz2/dataset2/-35/M/fragment_25_aug_0_rot_5.jpg
File: /content/processed_dataset-loz2/dataset2/-35/M/fragment_119_aug_0_rot_5.jpg
File: /content/processed_dataset-loz2/dataset2/-35/M/fragment_51_aug_0_rot_5.jpg
File: /content/processed_dataset-loz2/dataset2/-35/M/fragment_150_aug_0_rot_5.jpg
File: /content/processed_dataset-loz2/dataset2/-35/M/fragment_134_aug_0_rot_5.jpg
File: /content/processed_dataset-loz2/dataset2/-35/M/fragment_55_aug_0_rot_6.jpg
File: /content/processed_dataset-loz2/dataset2/-35/M/fragment_197_aug_0_rot_6.jpg
File: /content/proc

In [8]:
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# Define the classes and the nested structure
classes = ['G', 'GP', 'T', 'M', 'clear']
sub_folders = ['-20', '-25', '-30', '-35']

# Initialize lists to hold images and labels
images = []
labels = []

# Load images and labels
for sub_folder in sub_folders:
    for label, class_name in enumerate(classes):
        class_folder = os.path.join(extracted_folder_path, 'dataset2', sub_folder, class_name)
        for filename in os.listdir(class_folder):
            if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
                img_path = os.path.join(class_folder, filename)
                img = Image.open(img_path).convert('L')  # Convert to grayscale
                img = img.resize((128, 128))  # Resize images to a consistent size
                img_array = np.array(img)
                images.append(img_array)
                labels.append(label)

# Convert lists to numpy arrays
images = np.array(images)
labels = np.array(labels)

# Normalize images
images = images / 255.0

# Reshape images for training
images = np.expand_dims(images, -1)

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# Convert labels to categorical
y_train_cat = to_categorical(y_train, num_classes=len(classes))
y_test_cat = to_categorical(y_test, num_classes=len(classes))


Define the VAE architecture:

In [10]:
import tensorflow as tf
from tensorflow.keras import layers, models

# Define the encoder
latent_dim = 64

class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

encoder_inputs = layers.Input(shape=(128, 128, 1))
x = layers.Conv2D(32, (3, 3), activation='relu', strides=2, padding='same')(encoder_inputs)
x = layers.Conv2D(64, (3, 3), activation='relu', strides=2, padding='same')(x)
x = layers.Flatten()(x)
x = layers.Dense(128, activation='relu')(x)
z_mean = layers.Dense(latent_dim, name='z_mean')(x)
z_log_var = layers.Dense(latent_dim, name='z_log_var')(x)
z = Sampling()([z_mean, z_log_var])
encoder = models.Model(encoder_inputs, [z_mean, z_log_var, z], name='encoder')

# Define the decoder
latent_inputs = layers.Input(shape=(latent_dim,))
x = layers.Dense(32 * 32 * 64, activation='relu')(latent_inputs)
x = layers.Reshape((32, 32, 64))(x)
x = layers.Conv2DTranspose(64, (3, 3), activation='relu', strides=2, padding='same')(x)
x = layers.Conv2DTranspose(32, (3, 3), activation='relu', strides=2, padding='same')(x)
decoder_outputs = layers.Conv2DTranspose(1, (3, 3), activation='sigmoid', padding='same')(x)
decoder = models.Model(latent_inputs, decoder_outputs, name='decoder')

# Define the VAE
vae_outputs = decoder(encoder(encoder_inputs)[2])
vae = models.Model(encoder_inputs, vae_outputs, name='vae')

# Define the VAE loss
reconstruction_loss = tf.keras.losses.binary_crossentropy(encoder_inputs, vae_outputs)
reconstruction_loss *= 128 * 128
kl_loss = 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
kl_loss = tf.reduce_mean(kl_loss) * -0.5
vae_loss = tf.reduce_mean(reconstruction_loss + kl_loss)
vae.add_loss(vae_loss)
vae.compile(optimizer='adam')


Train the VAE:

In [11]:
# Train the VAE
vae.fit(X_train, epochs=20, batch_size=64, validation_data=(X_test, None))

# Save the trained VAE model
vae.save('vae_model.h5')

# Generate synthetic images
num_synthetic_images = 10000
latent_samples = np.random.normal(size=(num_synthetic_images, latent_dim))
synthetic_images = decoder.predict(latent_samples)


Epoch 1/20
Epoch 2/20

KeyboardInterrupt: 

Classify the synthetic images:

In [None]:
# Prepare labels for synthetic images (randomly for this example)
synthetic_labels = np.random.randint(0, len(classes), num_synthetic_images)

# Define a simple CNN classifier
classifier = models.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', input_shape=(128, 128, 1)),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(128, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dense(len(classes), activation='softmax')
])

classifier.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Train the classifier
classifier.fit(X_train, y_train, epochs=20, batch_size=64, validation_data=(X_test, y_test))

# Save the classifier model
classifier.save('classifier_model.h5')

# Predict the classes of synthetic images
synthetic_images = np.expand_dims(synthetic_images, -1)
predictions = classifier.predict(synthetic_images)

# Convert predictions to class labels
predicted_labels = np.argmax(predictions, axis=1)
