<a href="https://www.kaggle.com/code/ndannnop/computer-vision?scriptVersionId=238783849" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
!pip install pybcf pysam keras-layer-normalization

In [None]:
cd /

In [None]:
# try to push the dataset to hf (didn't work)
from datasets import load_dataset, Dataset, Features, Value, Image
from huggingface_hub import login
import numpy as np
from PIL import Image as PILImage
from io import BytesIO
import os

def bcf_generator(bcf_file, label_file, num_samples=None):
    """
    Generator that yields samples one by one without loading entire file into memory
    """
    # Read labels
    with open(label_file, 'rb') as f:
        labels = np.frombuffer(f.read(), dtype=np.uint32)
    
    # Open BCF file and keep it open for streaming
    with open(bcf_file, 'rb') as f:
        # Read header
        num_images = np.frombuffer(f.read(8), dtype=np.int64)[0]
        
        # Read all image sizes (small enough to fit in memory)
        sizes_bytes = f.read(num_images * 8)
        image_sizes = np.frombuffer(sizes_bytes, dtype=np.int64)
        
        # Calculate data start offset
        data_start_offset = 8 + num_images * 8
        
        # Calculate cumulative offsets for seeking
        offsets = np.zeros(num_images + 1, dtype=np.int64)
        np.cumsum(image_sizes, out=offsets[1:])
        
        # Process only a subset if specified
        process_count = min(num_samples, num_images) if num_samples else num_images
        
        # Yield samples one by one
        for idx in range(process_count):
            offset = offsets[idx]
            size = image_sizes[idx]
            
            f.seek(data_start_offset + offset)
            image_bytes = f.read(size)
            
            # Convert to PIL Image
            try:
                img = PILImage.open(BytesIO(image_bytes)).convert('L')
                
                yield {
                    "image": img,
                    "label": int(labels[idx])
                }
            except Exception as e:
                print(f"Error processing image {idx}: {e}")
                continue

# Login to Hugging Face
login()

# Use streaming dataset approach
print("Creating streaming dataset...")

# Path to your BCF files
bcf_train = '/kaggle/input/adobe-visual-font-recognition/train.bcf'
label_train = '/kaggle/input/adobe-visual-font-recognition/train.label'

# Define features
features = Features({
    "image": Image(),
    "label": Value("int32")
})

# Create dataset from generator
dataset = Dataset.from_generator(
    generator=lambda: bcf_generator(bcf_train, label_train),
    features=features
)

# Push dataset to hub with streaming enabled
dataset.push_to_hub(
    "batmangiaicuuthegioi/VFRtrain", 
    private=False,
    max_shard_size="500MB"  # Important: splits data into manageable chunks
)

print("Dataset uploaded successfully!")

In [None]:
?BcfReader

In [None]:
# read bcf file
import os
import numpy as np
from PIL import Image
from io import BytesIO

def extract_images_from_bcf(bcf_file, label_file, output_dir="/kaggle/working/"):
    # os.makedirs(output_dir, exist_ok=True)

    # Step 1: Read label file
    with open(label_file, 'rb') as f:
        labels = np.frombuffer(f.read(), dtype=np.uint32)

    # Step 2: Read image data from bcf file
    with open(bcf_file, 'rb') as f:
        num_images = np.frombuffer(f.read(8), dtype=np.int64)[0]
        image_sizes = np.frombuffer(f.read(num_images * 8), dtype=np.int64)
        image_data = f.read()

    assert len(labels) == num_images, "Mismatch between labels and images."

    # with open(bcf_file, 'rb') as f:
    #   data = f.read()
    
    # Step 3: Extract and save images
    offset = 0
    image_arrays = []
    for i in range(num_images):
        size = image_sizes[i]
        label = labels[i]
        image_bytes = (image_data[offset:offset+size])

        img = Image.open(BytesIO(image_bytes)).convert('RGB')

        # Convert image to a numpy array (pixel-based array)
        img_array = np.array(img)

        # Append the image array and its label (if needed)
        image_arrays.append((label, img_array))
        
        # Save image under output/<label>/img_xxxx.png
        # label_dir = os.path.join(output_dir, f'{label:03d}')
        # os.makedirs(label_dir, exist_ok=True)

        # image_path = os.path.join(label_dir, f'img_{i:04d}.png')
        # with open(image_path, 'wb') as out_img:
        #     out_img.write(image_bytes)

        offset += size
    return image_arrays

In [None]:
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset, Subset
from sklearn.model_selection import train_test_split
from PIL import Image
from io import BytesIO
import os
import random
import math # Needed for ceiling division

# Helper function to extract patches (updated for grayscale)
def extract_patches(image_array, num_patch=3, patch_size=(105, 105)):
    """
    Extracts a specified number of random patches from a single image array.
    Handles both grayscale (2D) and color (3D) images.

    Args:
        image_array (np.ndarray): The input image (Height, Width) or (Height, Width, Channels).
        num_patch (int): The number of patches to extract.
        patch_size (tuple): The (height, width) of the patches.

    Returns:
        list[np.ndarray]: A list containing the extracted patch arrays.
                          Patches will be 2D (H, W) if input is grayscale.
                          Returns an empty list if image is smaller than patch size.
    """
    patches = []
    if image_array.ndim == 2: # Grayscale
        h, w = image_array.shape
        is_grayscale = True
    elif image_array.ndim == 3: # Color
        h, w, _ = image_array.shape
        is_grayscale = False
    else:
        print(f"Warning: Unexpected image array dimension: {image_array.ndim}. Skipping patch extraction.")
        return []

    patch_h, patch_w = patch_size

    # Check if image is large enough for at least one patch
    if h < patch_h or w < patch_w:
        # print(f"Warning: Image shape ({h}, {w}) is smaller than patch size ({patch_h}, {patch_w}). Skipping patch extraction for this image.")
        return [] # Return empty list if image is too small

    for _ in range(num_patch):
        # Ensure random coordinates are within valid bounds
        x = np.random.randint(0, w - patch_w + 1)
        y = np.random.randint(0, h - patch_h + 1)
        if is_grayscale:
            patch = image_array[y:y + patch_h, x:x + patch_w] # Shape: (patch_h, patch_w)
        else:
             patch = image_array[y:y + patch_h, x:x + patch_w, :] # Shape: (patch_h, patch_w, C) - Kept for generality but not used in this specific request
        patches.append(patch)
    return patches

# Custom Dataset for lazy-loading from BCF
class BCFImagePatchDataset(Dataset):
    """
    PyTorch Dataset for loading images from a custom BCF file format lazily
    and extracting patches on the fly. Loads images as grayscale.
    """
    def __init__(self, bcf_file, label_file, num_patch=3, patch_size=(105, 105)):
        """
        Initializes the dataset by reading metadata but not image data.

        Args:
            bcf_file (str): Path to the BCF file.
            label_file (str): Path to the label file.
            num_patch (int): Number of patches to extract per image.
            patch_size (tuple): (height, width) of patches.
        """
        self.bcf_file = bcf_file
        self.label_file = label_file
        self.num_patch = num_patch
        self.patch_size = patch_size # Store patch_size for use in collate_fn reference

        self.labels = None
        self.num_images = 0
        self.image_sizes = None
        self.image_offsets = None
        self.data_start_offset = 0 # Byte offset in BCF where actual image data begins

        self._read_metadata()

    def _read_metadata(self):
        """Reads labels and image size/offset information from the files."""
        try:
            # Read label file
            with open(self.label_file, 'rb') as f:
                self.labels = np.frombuffer(f.read(), dtype=np.uint32)
                print(f"Read {len(self.labels)} labels.")

            # Read BCF header
            with open(self.bcf_file, 'rb') as f:
                self.num_images = np.frombuffer(f.read(8), dtype=np.int64)[0]
                print(f"BCF header indicates {self.num_images} images.")

                # Check for consistency
                if len(self.labels) != self.num_images:
                    raise ValueError(f"Mismatch between number of labels ({len(self.labels)}) and images in BCF header ({self.num_images}).")

                # Read all image sizes
                sizes_bytes = f.read(self.num_images * 8)
                self.image_sizes = np.frombuffer(sizes_bytes, dtype=np.int64)
                print(f"Read {len(self.image_sizes)} image sizes.")

                # Calculate the starting offset of the actual image data blob
                self.data_start_offset = 8 + self.num_images * 8 # 8 bytes for num_images + 8 bytes per size

                # Calculate cumulative offsets for seeking
                # Offset[i] is the starting byte of image i relative to data_start_offset
                self.image_offsets = np.zeros(self.num_images + 1, dtype=np.int64)
                np.cumsum(self.image_sizes, out=self.image_offsets[1:])
                print("Calculated image offsets.")

        except FileNotFoundError as e:
            print(f"Error: File not found - {e}")
            raise
        except Exception as e:
            print(f"Error reading metadata: {e}")
            raise

    def __len__(self):
        """Returns the total number of images in the dataset."""
        return self.num_images

    def __getitem__(self, idx):
        """
        Loads one image as grayscale, extracts patches, and returns patches with the label.

        Args:
            idx (int): The index of the image to retrieve.

        Returns:
            tuple: (list[np.ndarray], int): A tuple containing:
                     - A list of NumPy arrays, each representing a patch (H, W).
                     - The integer label for the image.
               Returns ([], -1) if image reading or patch extraction fails.
        """
        if idx >= self.num_images or idx < 0:
            raise IndexError(f"Index {idx} out of bounds for {self.num_images} images.")

        label = self.labels[idx]
        offset = self.image_offsets[idx]
        size = self.image_sizes[idx]

        try:
            # Open the BCF file, seek, read only the required bytes
            with open(self.bcf_file, 'rb') as f:
                f.seek(self.data_start_offset + offset)
                image_bytes = f.read(size)

            # Convert bytes to image (grayscale) and then to numpy array
            # Use 'L' for grayscale conversion
            img = Image.open(BytesIO(image_bytes)).convert('L')
            img_array = np.array(img) # Shape: (H, W)

            # Extract patches from this single grayscale image
            patches = extract_patches(img_array, self.num_patch, self.patch_size)

            return patches, label # Return list of patches and the single label

        except FileNotFoundError:
            print(f"Error: BCF file not found during __getitem__ for index {idx}.")
            return [], -1 # Indicate error
        except Exception as e:
            print(f"Error processing image index {idx}: {e}")
            return [], -1 # Indicate error


# Custom collate function for the DataLoader (updated for grayscale)
def patch_collate_fn(batch, patch_size_tuple):
    """
    Collates data from the BCFImagePatchDataset (handling grayscale).

    Takes a batch of [(patches_list_img1, label1), (patches_list_img2, label2), ...],
    flattens the patches, converts them to a tensor, adds a channel dimension,
    normalizes, and returns a single batch tensor for patches and labels.

    Args:
        batch (list): A list of tuples, where each tuple is the output
                      of BCFImagePatchDataset.__getitem__.
        patch_size_tuple (tuple): The (height, width) of patches, needed for empty tensor shape.


    Returns:
        tuple: (torch.Tensor, torch.Tensor): A tuple containing:
                 - Patches tensor (BatchSize * NumPatches, 1, Height, Width)
                 - Labels tensor (BatchSize * NumPatches)
    """
    all_patches = []
    all_labels = []
    valid_batch_items = 0

    for item in batch:
        patches, label = item
        # Ensure item is valid (e.g., image wasn't too small, no read errors)
        if patches and label != -1:
             # Only add patches if the list is not empty
            all_patches.extend(patches)
            # Repeat the label for each patch extracted from the image
            all_labels.extend([label] * len(patches))
            valid_batch_items += 1
        # else:
            # Optionally print a warning if an item was skipped
            # print(f"Skipping item in collate_fn due to previous error or no patches.")

    # If no valid patches were collected in the batch (e.g., all images too small)
    if not all_patches:
        # Return empty tensors of appropriate type but 0 size in the batch dimension
        # Shape for grayscale: (0, 1, H, W)
        patch_h, patch_w = patch_size_tuple
        return torch.empty((0, 1, patch_h, patch_w), dtype=torch.float), torch.empty((0,), dtype=torch.long)

    # Convert list of NumPy arrays (each H, W) to a single NumPy array
    patches_np = np.array(all_patches) # Shape: (TotalPatches, H, W)

    # Convert to PyTorch tensor, normalize
    patches_tensor = torch.tensor(patches_np).float() / 255.0 # Shape: (TotalPatches, H, W)

    # Add channel dimension: (TotalPatches, H, W) -> (TotalPatches, 1, H, W)
    patches_tensor = patches_tensor.unsqueeze(1)

    # Convert labels to PyTorch tensor
    labels_tensor = torch.tensor(all_labels, dtype=torch.long) # Use long for classification labels

    # print(f"Collate - Input Batch Size: {len(batch)}, Valid Items: {valid_batch_items}, Output Patches Shape: {patches_tensor.shape}, Output Labels Shape: {labels_tensor.shape}")

    return patches_tensor, labels_tensor


# --- Main Execution ---

# Example usage:
bcf_train = '/kaggle/input/deepfont-unlab/VFR_syn_train/train.bcf'
bcf_val = '/kaggle/input/deepfont-unlab/VFR_syn_val/val.bcf'
bcf_test = '/kaggle/input/deepfont-unlab/VFR_real_test/vfr_large.bcf'

label_train = '/kaggle/input/deepfont-unlab/VFR_syn_train/train.label'
label_val = '/kaggle/input/deepfont-unlab/VFR_syn_val/val.label'
label_test = '/kaggle/input/deepfont-unlab/VFR_real_test/vfr_large.label'

BATCH_SIZE = 1024 # Adjust as needed for your GPU memory
NUM_PATCHES_PER_IMAGE = 1
PATCH_SIZE = (105, 105) # Define patch size tuple
NUM_WORKERS = 4 # Adjust based on your CPU cores, helps speed up loading

# 1. Create the full dataset instance
try:
    train_dataset = BCFImagePatchDataset(
        bcf_file=bcf_train,
        label_file=label_train,
        num_patch=NUM_PATCHES_PER_IMAGE,
        patch_size=PATCH_SIZE # Pass patch_size to dataset
    )

    val_dataset = BCFImagePatchDataset(
        bcf_file=bcf_val,
        label_file=label_val,
        num_patch=NUM_PATCHES_PER_IMAGE,
        patch_size=PATCH_SIZE # Pass patch_size to dataset
    )

    test_dataset = BCFImagePatchDataset(
        bcf_file=bcf_test,
        label_file=label_test,
        num_patch=NUM_PATCHES_PER_IMAGE,
        patch_size=PATCH_SIZE # Pass patch_size to dataset
    )

    # 2. Create indices for splitting
    # Ensure labels were loaded before stratifying
    if train_dataset.labels is None:
         raise ValueError("Labels could not be loaded. Cannot stratify split.")

    # 4. Create DataLoaders using the custom collate function
    # We need to pass the PATCH_SIZE to the collate function. functools.partial is good for this.
    from functools import partial
    collate_wrapper = partial(patch_collate_fn, patch_size_tuple=PATCH_SIZE)

    train_loader = DataLoader(
        train_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        num_workers=NUM_WORKERS,
        collate_fn=collate_wrapper, # Use the wrapper
        pin_memory=True # Set to True if using GPU for faster data transfer
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        num_workers=NUM_WORKERS,
        collate_fn=collate_wrapper, # Use the wrapper
        pin_memory=True # Set to True if using GPU for faster data transfer
    )

    test_loader = DataLoader(
        test_dataset,
        batch_size=BATCH_SIZE,
        shuffle=False,
        num_workers=NUM_WORKERS,
        collate_fn=collate_wrapper, # Use the wrapper
        pin_memory=True
    )

    # 5. Example loop through the train loader
    print("\nTesting DataLoader...")
    num_batches_to_test = 5
    for i, (batch_patches, batch_labels) in enumerate(train_loader):
        if batch_patches.numel() == 0: # Check if the batch is empty
             print(f"Batch {i+1}: Skipped (likely due to all images being too small or read errors)")
             continue

        print(f"Batch {i+1}: Patches shape: {batch_patches.shape}, Labels shape: {batch_labels.shape}")
        # Example: Check channel dimension is 1
        if batch_patches.shape[1] != 1:
             print(f"Error: Unexpected channel dimension: {batch_patches.shape[1]}")
        # print(f"Batch {i+1}: Labels: {batch_labels}") # Optional: print labels

        # --- Your training code would go here ---
        # model(batch_patches) # Ensure your model expects input shape (B, 1, H, W)
        # loss = criterion(outputs, batch_labels)
        # ...
        # ----------------------------------------

        if i >= num_batches_to_test - 1:
            break

    print("\nDataLoader setup complete and test loop finished.")

except Exception as e:
    print(f"\nAn error occurred during dataset/dataloader setup: {e}")
    import traceback
    traceback.print_exc() # Print detailed traceback
    # Depending on the error, you might want to investigate file paths,
    # file formats, or permissions.




In [None]:
a = next(iter(train_loader))
a.shape
# len(train_loader)

In [None]:
# push to hf
from huggingface_hub import notebook_login
notebook_login()

In [None]:
# model
import torch
import torch.nn as nn
"""
First we train the SCAE model on the patches.
It contains two conv and 2 deconv layers.
- conv1: 60x60, 64 filters, stride 1, padding 1 (on all four directions up down left right)
- pool1: stride 2, kernel size 2, padding 0 (on all four directions up down left right)
- conv2: 3x3, 128 filters, stride 1, padding 1 (on all four directions up down left right)
- deconv1: (from 24x24x128 to 24x24x64), 64 filters, stride 1, padding 1, kernel size 3 (on all four directions up down left right)
- unpool1: (from 24x24x64 to 48x48x64), 64 filters, stride 2, kernel size 2, padding 0 (on all four directions up down left right)
- deconv2: (from 48x48x64 to 105x105x3), 3 filters, kernel size 60, stride 1, padding 1, dilation 0, output_padding 0 
"""

class SCAE(nn.Module):
    def __init__(self, normalization_type="batch_norm", use_dropout=False, dropout_prob=0.3, activation="leaky_relu"):
        super(SCAE, self).__init__()

        def norm_layer(num_features):
            if normalization_type == "batch_norm":
                return nn.BatchNorm2d(num_features)
            elif normalization_type == "group_norm":
                return nn.GroupNorm(num_groups=8, num_channels=num_features)
            elif normalization_type == "layer_norm":
                return nn.LayerNorm([num_features, 26, 26])  # nếu input là 26x26
            else:
                return nn.Identity()

        def activation_layer():
            return nn.LeakyReLU(inplace=True) if activation == "leaky_relu" else nn.ReLU(inplace=True)

        def dropout_layer():
            return nn.Dropout2d(dropout_prob) if use_dropout else nn.Identity()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=60, stride=1, padding=1),
            norm_layer(64),
            activation_layer(),
            dropout_layer(),

            nn.MaxPool2d(2, 2),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            norm_layer(128),
            activation_layer(),
            dropout_layer(),

            nn.MaxPool2d(2, 2)
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.Upsample(scale_factor=2, mode='nearest'),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=1, padding=1),
            norm_layer(64),
            activation_layer(),
            dropout_layer(),

            nn.Upsample(scale_factor=2, mode='nearest'),
            nn.ConvTranspose2d(64, 1, kernel_size=60, stride=1, padding=1),
            activation_layer(),
            
        )

    def forward(self, x):
        for layer in self.encoder:
            x = layer(x)
            # print(x.shape)
        for layer in self.decoder:
            x = layer(x)
            # print(x.shape)
        return x
        
# test with the first patch
model = SCAE()
model.eval()

In [None]:
len(test_loader)

In [None]:
torch.cuda.empty_cache()
del model, optimizer, criterion

In [None]:
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score, f1_score
from tqdm import tqdm

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model = SCAE().to(device)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    # Unpack the tuple yielded by the DataLoader
    for patches, labels in tqdm(train_loader): # Unpack here
        # Skip empty batches if any occurred
        if patches.numel() == 0:
            continue

        patches = patches.to(device)
        # Note: labels are loaded but not used for the autoencoder training
        # labels = labels.to(device) # Optional: move labels to device if needed later

        optimizer.zero_grad()
        outputs = model(patches) # Use patches as input
        loss = criterion(outputs, patches) # Use patches as target for reconstruction
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    # Avoid division by zero if train_loader is empty or only contained empty batches
    torch.save(model.state_dict(), f"/kaggle/working/checkpoint{i+3}")
    if len(train_loader) > 0:
         avg_loss = running_loss / len(train_loader) # Or better: divide by number of non-empty batches processed
         print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')
    else:
         print(f"Epoch [{epoch+1}/{num_epochs}], No batches processed.")

In [None]:
torch.save(model.state_dict(), f"/kaggle/working/checkpoint{12+3}")

In [None]:
!pip install transformers datasets


In [None]:
from huggingface_hub import notebook_login
notebook_login()

In [None]:
model_repo = "batmangiaicuuthegioi/SCAE"  # Replace this with your repo
filename = "2_epoch"  # Replace with the actual filename (e.g., pytorch_model.bin)

# Download the file
model_path = hf_hub_download(repo_id=model_repo, filename=filename)

# Check the path of the downloaded model
print(f"Model downloaded to: {model_path}")

In [None]:
model = SCAE()
model.load_state_dict(torch.load(model_path,  weights_only=True))
model.to(device)

In [None]:
from huggingface_hub import hf_hub_download
import torch

model_name = "batmangiaicuuthegioi/SCAE"  # Replace with your repository name

# Download the model checkpoint (this will download the .lfs model)
model_path = hf_hub_download(repo_id=model_name, filename="2_epoch.bin")  # or other checkpoint name

# Load the model from the checkpoint
model = torch.load(model_path)

In [None]:
model_path = '/kaggle/working/SCAE_model'
torch.save(model.state_dict(), model_path)


In [None]:
loaded_model = SCAE()
loaded_model.load_state_dict(torch.load("/kaggle/working/SCAE_model", weights_only=True))
loaded_model.eval()

In [None]:
from transformers import AutoModelForImageClassification, AutoTokenizer, pipeline
from huggingface_hub import HfApi, HfFolder
from pathlib import Path

model_name = "batmangiaicuuthegioi/SCAE"  # Change this to your desired Hugging Face model name
model_path = './SCAE_model'

# Create the repository (first time only)
api = HfApi()
api.create_repo(model_name, exist_ok=True)

# Push the model
api.upload_folder(
    folder_path=model_path,
    path_in_repo=".",
    repo_id=model_name,
)

# Optionally, you can push the tokenizer (if you have a tokenizer to upload)
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")  # Replace with your own tokenizer if applicable
tokenizer.push_to_hub(model_name)


In [None]:
# delete all subfolders
from pathlib import Path
import shutil

# Set the path to the working directory
working_directory = Path('/kaggle/working/')

# Loop over all subfolders and delete them
for subfolder in working_directory.iterdir():
    if subfolder.is_dir():
        shutil.rmtree(subfolder)
        print(f"Deleted: {subfolder}")


In [None]:
# !mkdir -p ~/.kaggle
! mv /kaggle.json ~/.kaggle/
# chmod 600 ~/.kaggle/kaggle.json

In [None]:
from matplotlib.pyplot import imshow
import matplotlib.cm as cm
import matplotlib.pylab as plt
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import numpy as np
import PIL
from PIL import ImageFilter
import cv2
import itertools
import random
import keras
import imutils
from imutils import paths
import os
from keras import optimizers
from keras.preprocessing.image import img_to_array
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical
from keras import callbacks
from keras.models import Sequential
from tensorflow.keras.layers import BatchNormalization
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D , UpSampling2D ,Conv2DTranspose
from keras import backend as K

%matplotlib inline

In [None]:
def pil_image(img_path):
    pil_im =PIL.Image.open(img_path).convert('L')
    pil_im=pil_im.resize((105,105))
    #imshow(np.asarray(pil_im))
    return pil_im

# Augumentation Steps 
1) Noise
2) Blur
3) Perpective Rotation
4) Shading
5) Variable Character Spacing
6) Variable Aspect Ratio

In [None]:
def noise_image(pil_im):
    # Adding Noise to image
    img_array = np.asarray(pil_im)
    mean = 0.0   # some constant
    std = 5   # some constant (standard deviation)
    noisy_img = img_array + np.random.normal(mean, std, img_array.shape)
    noisy_img_clipped = np.clip(noisy_img, 0, 255)
    noise_img = PIL.Image.fromarray(np.uint8(noisy_img_clipped)) # output
    #imshow((noisy_img_clipped ).astype(np.uint8))
    noise_img=noise_img.resize((105,105))
    return noise_img

In [None]:
def blur_image(pil_im):
    #Adding Blur to image 
    blur_img = pil_im.filter(ImageFilter.GaussianBlur(radius=3)) # ouput
    #imshow(blur_img)
    blur_img=blur_img.resize((105,105))
    return blur_img

In [None]:
def affine_rotation(img):
    
    #img=cv2.imread(img_path,0)
    rows, columns = img.shape

    point1 = np.float32([[10, 10], [30, 10], [10, 30]])
    point2 = np.float32([[20, 15], [40, 10], [20, 40]])

    A = cv2.getAffineTransform(point1, point2)

    output = cv2.warpAffine(img, A, (columns, rows))
    affine_img = PIL.Image.fromarray(np.uint8(output)) # affine rotated output
    #imshow(output)
    affine_img=affine_img.resize((105,105))
    return affine_img
   

In [None]:
def gradient_fill(image):
    #image=cv2.imread(img_path,0)
    laplacian = cv2.Laplacian(image,cv2.CV_64F)
    laplacian = cv2.resize(laplacian, (105, 105))
    return laplacian

## Preparing Dataset

In [None]:
cd /

In [None]:
data_path = "/kaggle/input/font-patch/font_patch/"
data=[]
labels=[]
imagePaths = sorted(list(paths.list_images(data_path)))
random.seed(42)
random.shuffle(imagePaths)

In [None]:
imagePaths

In [None]:
def conv_label(label):
    if label == 'Lato':
        return 0
    elif label == 'Raleway':
        return 1
    elif label == 'Roboto':
        return 2
    elif label == 'Sansation':
        return 3
    elif label == 'Walkway':
        return 4

In [None]:
augument=["blur","noise","affine","gradient"]
a=itertools.combinations(augument, 4)

for i in list(a): 
    print(list(i))

In [None]:
counter=0
for imagePath in imagePaths:
    label = imagePath.split(os.path.sep)[-2]
    label = conv_label(label)
    pil_img = pil_image(imagePath)
    #imshow(pil_img)
    
    # Adding original image
    org_img = img_to_array(pil_img)
    #print(org_img.shape)
    data.append(org_img)
    labels.append(label)
    
    augument=["noise","blur","affine","gradient"]
    for l in range(0,len(augument)):
    
        a=itertools.combinations(augument, l+1)

        for i in list(a): 
            combinations=list(i)
            temp_img = pil_img
            for j in combinations:
            
                if j == 'noise':
                    # Adding Noise image
                    temp_img = noise_image(temp_img)
                    
                elif j == 'blur':
                    # Adding Blur image
                    temp_img = blur_image(temp_img)
                    #imshow(blur_img)
                    
    
                elif j == 'affine':
                    open_cv_affine = np.array(pil_img)
                    # Adding affine rotation image
                    temp_img = affine_rotation(open_cv_affine)

                elif j == 'gradient':
                    open_cv_gradient = np.array(pil_img)
                    # Adding gradient image
                    temp_img = gradient_fill(open_cv_gradient)
  
            temp_img = img_to_array(temp_img)
            data.append(temp_img)
            labels.append(label)

In [None]:
len(labels)

In [None]:
data = np.asarray(data, dtype="float") / 255.0
labels = np.array(labels)
print("Success")
# partition the data into training and testing splits using 75% of
# the data for training and the remaining 25% for testing
(trainX, testX, trainY, testY) = train_test_split(data,
	labels, test_size=0.25, random_state=42)

In [None]:
# convert the labels from integers to vectors
trainY = to_categorical(trainY, num_classes=5)
testY = to_categorical(testY, num_classes=5)

In [None]:
aug = ImageDataGenerator(rotation_range=30, width_shift_range=0.1,height_shift_range=0.1, shear_range=0.2, zoom_range=0.2,horizontal_flip=True)

In [None]:
K.set_image_data_format('channels_last')


In [None]:
def create_model():
  model=Sequential()

  # Cu Layers 
  model.add(Conv2D(64, kernel_size=(48, 48), activation='relu', input_shape=(105,105,1)))
  model.add(BatchNormalization())
  model.add(MaxPooling2D(pool_size=(2, 2)))

  model.add(Conv2D(128, kernel_size=(24, 24), activation='relu'))
  model.add(BatchNormalization())
  model.add(MaxPooling2D(pool_size=(2, 2)))

  model.add(Conv2DTranspose(128, (24,24), strides = (2,2), activation = 'relu', padding='same', kernel_initializer='uniform'))
  model.add(UpSampling2D(size=(2, 2)))

  model.add(Conv2DTranspose(64, (12,12), strides = (2,2), activation = 'relu', padding='same', kernel_initializer='uniform'))
  model.add(UpSampling2D(size=(2, 2)))

  #Cs Layers
  model.add(Conv2D(256, kernel_size=(12, 12), activation='relu'))

  model.add(Conv2D(256, kernel_size=(12, 12), activation='relu'))

  model.add(Conv2D(256, kernel_size=(12, 12), activation='relu'))

  model.add(Flatten())

  model.add(Dense(4096, activation='relu'))

  model.add(Dropout(0.5))

  model.add(Dense(4096,activation='relu'))

  model.add(Dropout(0.5))

  model.add(Dense(2383,activation='relu'))

  model.add(Dense(5, activation='softmax'))
 
  return model

In [None]:
from tensorflow.keras.optimizers.schedules import ExponentialDecay
lr_schedule = ExponentialDecay(
    initial_learning_rate=0.01,
    decay_steps=10000,  # Number of steps before applying decay
    decay_rate=0.9,     # Decay rate
    staircase=True)

batch_size = 128
epochs = 50
model= create_model()
sgd = optimizers.SGD(learning_rate=lr_schedule,
    momentum=0.9,
    nesterov=True)
model.compile(loss='mean_squared_error', optimizer=sgd, metrics=['accuracy'])

In [None]:
early_stopping=callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=0, mode='min')

filepath="top_model.keras"

checkpoint = callbacks.ModelCheckpoint(filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')

callbacks_list = [early_stopping,checkpoint]

In [None]:
model.fit(trainX, trainY,shuffle=True,
          batch_size=batch_size,
          epochs=epochs,
          verbose=1,
          validation_data=(testX, testY),callbacks=callbacks_list)

In [None]:
score = model.evaluate(testX, testY, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [None]:
from keras.models import load_model
model = load_model('top_model.h5')

In [None]:
score = model.evaluate(testX, testY, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [None]:
def predict(img_path):
    
    # img_path="/kaggle/input/font-patch/font_patch/Roboto/0HMk9Wef_925.jpg"
    pil_im = PIL.Image.open(img_path).convert('L')
    pil_im = blur_image(pil_im)
    org_img = img_to_array(pil_im)

    data=[]
    data.append(org_img)
    data = np.asarray(data, dtype="float") / 255.0
    
    predictions = model.predict(data)  # Get the predicted probabilities
    predicted_classes = np.argmax(predictions, axis=-1)
    
    label = rev_conv_label(int(predicted_classes[0]))
    fig, ax = plt.subplots(1)
    ax.imshow(pil_img, interpolation='nearest', cmap=cm.gray)
    ax.text(5, 5, label , bbox={'facecolor': 'white', 'pad': 10})
    plt.show()

In [None]:
predict("/kaggle/input/font-patch/font_patch/Roboto/0k_530.jpg")

In [None]:
def rev_conv_label(label):
    if label == 0 :
        return 'Lato'
    elif label == 1:
        return 'Raleway'
    elif label == 2 :
        return 'Roboto'
    elif label == 3 :
        return 'Sansation'
    elif label == 4:
        return 'Walkway'

In [None]:
data=[]
data.append(org_img)
data = np.asarray(data, dtype="float") / 255.0

In [None]:
predictions = model.predict(data)  # Get the predicted probabilities
predicted_classes = np.argmax(predictions, axis=-1)

In [None]:
predicted_classes

In [None]:
label = rev_conv_label(int(predicted_classes[0]))
fig, ax = plt.subplots(1)
ax.imshow(pil_img, interpolation='nearest', cmap=cm.gray)
ax.text(5, 5, label , bbox={'facecolor': 'white', 'pad': 10})
plt.show()