In [None]:
#import libraries
import os
import numpy as np
import random
import cv2
import math
import matplotlib.pyplot as plt
import tensorflow.keras.backend as K
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import pickle
from sklearn.metrics import confusion_matrix
import seaborn as sns
from collections import Counter
import pandas as pd
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.utils import Sequence
import shutil
from tensorflow.keras.optimizers.legacy import Adam
import time
import json


**Tokenization**

In [None]:
tokens=[
    '<PAD>',
    '{',
    '}',
    'row',
    'header',
    'footer',
    'container',
    'text',
    'text-r',
    'text-c',
    'flex-sb',
    'flex',
    'flex-c',
    'flex-r',
    'image',
    'carousel',
    'paragraph',
    'div-3',
    'div-6',
    'div-12',
    'div-9',
    'input',
    'nav',
    'logodiv',
    'navlink',
    'table',
    'button',
    'button-c',
    'button-r',
    'card',
    '<END>',
    '<START>'
]

In [None]:
t2v={}
v2t={}
for i in range(len(tokens)):
    v2t[i]=tokens[i]
    t2v[tokens[i]]=i


In [None]:
#functions to convert token to text
def dsltotoken(dsl):
    tks=['<START>']
    for tk in [i.strip() for i in dsl.strip().split('\n')]:
      if tk=="": continue
      if(tk.endswith("{")):
        tks.append(tk[:-1])
        tks.append(tk[-1])
      else:
        tks.append(tk)
    tks.append('<END>')
    return [t2v[tokens] for tokens in tks]

def tokentodsl(tokens):
    tokens=[v2t[vec] for vec in tokens]
    tokens.pop(0)
    tokens.pop()
    txt=""
    stack=[]

    for i in tokens:
      if(i=="{"):
        txt+="{"
        stack.append("{")
        continue
      elif(i=="}"):
        stack.pop()

      txt+='\n'+'\t'*len(stack)+i
    txt=txt.strip()
    return txt

In [None]:
#Load Dataset
class Data:
    def __init__(self,image,dsl):
        self.image=image
        self.dsl=dsl


data=[]
folder="/kaggle/input/sketch2htmldata"
for fol in os.listdir("folder"):
    if fol.lower()=='dsl':
        dsl=folder+"/"+fol
    if fol.lower()=='sketch':
        sketch=folder+"/"+fol
    for sk in os.listdir(sketch):
        im=Data(sketch+"/"+sk,dsl+"/"+sk.split('.')[0]+'.dsl')
        data.append(im)
print(len(data))

In [None]:
#splitting data for training and testing
np.random.seed(10)
np.random.shuffle(data)
n=int(0.7*len(data))
train_data=data[0:n]
test_data=data[n:]
np.random.seed(None)

**ConvolutionalTokenizer**

In [None]:
class ConvolutionalTokenizer(layers.Layer):
    """
    Creates Convolutional Tokens of images for feeding to Transformer Encoder.
    """
    def __init__(self,kernel_size=3,stride=1,padding=1,pooling_kernel_size=3,pooling_stride=2,conv_layers=2,num_output_channels=[32, 64],**kwargs,):
        super(ConvolutionalTokenizer, self).__init__(**kwargs)

        # Creating a Sequential Keras Model for Tokenizing images
        self.conv_model = keras.Sequential()
        self.conv_model.add(layers.Conv2D(32,7,1,padding="valid",use_bias=False,activation="relu"))
        self.conv_model.add(layers.ZeroPadding2D(1))
        self.conv_model.add(layers.MaxPool2D(3, 2, "same"))
        self.conv_model.add(layers.Conv2D(64,5,1,padding="valid",use_bias=False,activation="relu"))
        self.conv_model.add(layers.ZeroPadding2D(1))
        self.conv_model.add(layers.MaxPool2D(3, 2, "same"))
        self.conv_model.add(layers.Dropout(0.1))
        self.conv_model.add(layers.Conv2D(64,3,1,padding="valid",use_bias=False,activation="relu"))
        self.conv_model.add(layers.ZeroPadding2D(1))
        self.conv_model.add(layers.MaxPool2D(3, 2, "same"))
        self.conv_model.add(layers.Conv2D(128,3,1,padding="valid",use_bias=False,activation="relu"))
        self.conv_model.add(layers.ZeroPadding2D(1))
        self.conv_model.add(layers.MaxPool2D(3, 2, "same"))
        self.conv_model.add(layers.Dropout(0.1))
        self.conv_model.add(layers.Conv2D(128,3,1,padding="valid",use_bias=False,activation="relu"))
        self.conv_model.add(layers.ZeroPadding2D(1))
        self.conv_model.add(layers.MaxPool2D(3, 2, "same"))



    def call(self, images):
        # Reshaping the outputs by flattening them
        outputs = self.conv_model(images)
        Flattened = tf.reshape(
            outputs,
            (-1, tf.shape(outputs)[1] * tf.shape(outputs)[2], tf.shape(outputs)[3]),
        )
        return Flattened

    # Adding Learnable Positional Embeddings
    def pos_embeddings(self, image_size):
        inp = tf.ones((1, image_size[0], image_size[1],1))
        out = self.call(inp)
        seq_len = tf.shape(out)[1]
        projection_dim = tf.shape(out)[-1]
        embed_layer = layers.Embedding(
            input_dim=seq_len, output_dim=projection_dim
        )
        return embed_layer, seq_len


**Feed Forward Network**

In [None]:

def mlp(x, hidden_units, dropout):
    """
    Creates A Feed Forward Network`

    Args:
        hidden_units: Number of hidden units in MLP
        dropout: The Rate of dropout which is to be applied.
    """
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout)(x)
    return x


**Transformer Encoder**

In [None]:
def Transformer_Encoder(L,embedded_patches,num_heads,projection_dim,transformer_units):
    """
    Transformer Encoder Block

    Args:
        L: number of transformer_layers

        embedded_patches: Patches from the Convolutional Tokenizer block

        num_heads: Number of Attention Heads

        projection_dim: Size of each attention head for query and key

        transformer_units: hidden units of MLP
    """


    # Iterating over the number of transformer layers
    for i in range(L):
        # Normalizing the input patches
        norm = layers.LayerNormalization(epsilon=1e-5)(embedded_patches)
        # Feeding to MHA
        attention_output = layers.MultiHeadAttention(num_heads=num_heads, key_dim=projection_dim, dropout=0.1)(norm,norm)
        # Shortcut skip connection
        skip1 = layers.Add()([attention_output, embedded_patches])
        # Normalizing
        norm2= layers.LayerNormalization(epsilon=1e-5)(skip1)

        # Feed Forward MLP
        ffn = mlp(norm2, hidden_units=transformer_units, dropout=0.1)

        # Shortcut skip connection
        embedded_patches = layers.Add()([ffn, skip1])

    return embedded_patches

In [None]:
def create_causal_mask(seq_length):
    """
    Creates a causal (look-ahead) mask for sequence length `seq_length`.
    This mask ensures that each position can only attend to previous positions and itself.

    Args:
        seq_length: The length of the sequence.

    Returns:
        A tensor of shape (1, 1, seq_length, seq_length) with the causal mask.
    """
    # Create a boolean mask
    mask = tf.linalg.band_part(tf.ones((seq_length, seq_length)), -1, 0)

    # Convert boolean mask to 0 and -inf (for masking)
    mask = tf.cast(mask, dtype=tf.float32)
    mask = tf.expand_dims(mask, 0)  # Add batch dimension
    mask = tf.expand_dims(mask, 0)  # Add head dimension

    return mask








In [None]:
def positional_encoding(length, depth):
    depth = depth/2
    positions = tf.range(length, dtype=tf.float32)[:, tf.newaxis]     # (seq, 1)
    depths = tf.range(depth, dtype=tf.float32)[tf.newaxis, :]/depth   # (1, depth)
    angle_rates = 1 / (10000**depths)               # (1, depth)
    angle_rads = positions * angle_rates            # (seq, depth)
    pos_encoding = tf.concat([tf.sin(angle_rads), tf.cos(angle_rads)], axis=-1)
    return tf.cast(pos_encoding, dtype=tf.float32)

In [None]:
class PatchEmbedding(layers.Layer):
    def __init__(self, patch_size, projection_dim):
        super(PatchEmbedding, self).__init__()
        self.projection_dim = projection_dim
        self.patch_size = patch_size

        self.projection = layers.Dense(projection_dim)
        self.flatten = layers.Reshape((-1, projection_dim))

    def call(self, images):
        # Split image into patches
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        # Reshape patches and project them
        patches_shape = tf.shape(patches)
        patches = tf.reshape(patches, (patches_shape[0], -1, patches_shape[-1]))
        return self.projection(patches)

In [None]:
def Model(model=0,image_size=32,num_classes=29,input_shape=(32, 32, 3),projection_dim=128,num_heads=2,L=2,transformer_units=[128,128],vocab_size=29,max_seq_length=100):

    encoder_inputs = layers.Input(input_shape)
    if model==1:
        """
        VIT + decoder
        """
        patch_embedding = PatchEmbedding(patch_size=16, projection_dim=projection_dim)
        embedded_patches = patch_embedding(encoder_inputs)
        pos_encoding = positional_encoding(53*38, projection_dim)
        embedded_patches += pos_encoding

    else:
        """
        CCT or Convolution + decoder
        """
        conv_tokenizer = ConvolutionalTokenizer()
        embedded_patches = conv_tokenizer(encoder_inputs)
        pos_embed, seq_length = conv_tokenizer.pos_embeddings(image_size)
        positions = tf.range(start=0, limit=seq_length, delta=1)
        position_embeddings = pos_embed(positions)
        embedded_patches+=position_embeddings

    if model==2:#for convolution only
        encoder_output=embedded_patches
    else:
        encoder_output=Transformer_Encoder(L,embedded_patches,num_heads=num_heads,projection_dim=projection_dim,transformer_units=transformer_units)

    #decoder input
    decoder_inputs = layers.Input(shape=(max_seq_length,))
    decoder_embedding = layers.Embedding(vocab_size, projection_dim)(decoder_inputs)
    pos_encoding2 = positional_encoding(max_seq_length, projection_dim)
    decoder_embedding += pos_encoding2
    causal_mask = create_causal_mask(tf.shape(decoder_inputs)[1]) #casual masking
    x=decoder_embedding


    #decoder layers
    for _ in range(L):
            x1 = layers.LayerNormalization(epsilon=1e-6)(x)

            attn_output1 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=projection_dim)(x1, x1,attention_mask=causal_mask)
            x = layers.Add()([x, attn_output1])

            x2 = layers.LayerNormalization(epsilon=1e-6)(x)
            attn_output2 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=projection_dim)(x2, encoder_output)
            x = layers.Add()([x, attn_output2])

            x3 = layers.LayerNormalization(epsilon=1e-6)(x)
            ffn = mlp(x3, hidden_units=transformer_units, dropout=0.1)
            x = layers.Add()([x, ffn])

    decoder_outputs = layers.Dense(vocab_size-1)(x)

    model = keras.Model(inputs=[encoder_inputs, decoder_inputs], outputs=decoder_outputs)
    return model

In [None]:

def get_rect(image):
        coords=cv2.findNonZero(image)
        x, y, w, h = cv2.boundingRect(coords)
        return x,y,w,h
#Data Augmentation
def get_augment(image):
    choice=np.random.random()
    if(choice<0.1):
        return image
    _,thresh=cv2.threshold(image,5,255,cv2.THRESH_BINARY)
    x,y,w,h=get_rect(thresh)
    sc=False

    if((int(choice*100))%2==0 and (sc==True or w<input_shape[1]-80)):
        angle=np.random.uniform(low=-3,high=3)
        (h, w) = image.shape[:2]
        center = (w // 2, h // 2)
        M = cv2.getRotationMatrix2D(center, angle, 1)
        image = cv2.warpAffine(image, M, (w, h))
        _,thresh=cv2.threshold(image,5,255,cv2.THRESH_BINARY)
        x,y,w,h=get_rect(thresh)
    try:
       if(choice>0.5):
        #translate
        newimage=np.zeros_like(image)
        transx=x
        transy=y
        if h<input_shape[0]-200:
            transy+=np.random.randint(0,np.max(input_shape[0]-(y+h)-150,1))
        if w<input_shape[1]-100:
#
            transx+=np.random.randint(-x+20,np.max(input_shape[1]-(x+w)-20,1))
        newimage[transy:transy+h,transx:transx+w]=image[y:y+h,x:x+w]
        image=newimage.copy()
    except:
        pass
    return image

def load_image(img,augment=False):
    with tf.device(tf.test.gpu_device_name()):
        im=cv2.imread(img,cv2.IMREAD_GRAYSCALE)
        resized=cv2.resize(im,(input_shape[1],input_shape[0]),interpolation=cv2.INTER_AREA)
        thresh=resized

        thresh = thresh.astype(np.float32)
        thresh /= 255
        thresh = np.expand_dims(thresh, axis=-1)
        return thresh
def get_tokens(name):
    labs=[]
    toks=[]
    f=open(name,'r')
    tkns=dsltotoken(f.read())
    return tkns



In [None]:
#Dataset Generator
class TrainDataGenerator(Sequence):
    def __init__(self, data,batch_size):
        self.datas =[]
        self.batch_size=batch_size
        for obj in data:
            tkns=get_tokens(obj.dsl)
            for i in range(len(tkns)-1,len(tkns)):

                t=tkns[:i+1]
                t1=tkns[:-1]
                t3=tkns[1:]
                t1+=[0 for _ in range(MAX_SEQ_LEN-len(t1))]
                t2=[0 for _ in range(MAX_SEQ_LEN)]
                t2[len(t)-2]=t[-1]

                t1+=[0 for _ in range(MAX_SEQ_LEN-len(t1))]
                t3+=[0 for _ in range(MAX_SEQ_LEN-len(t3))]
                for _ in range(1):
                    self.datas.append([obj.image,t1,t3])

        self.on_epoch_end()

    def __len__(self):
        return len(self.datas) // self.batch_size

    def __getitem__(self, index):
        indices = range(index * self.batch_size, (index + 1) * self.batch_size)
        image_data=[]
        token_data=[]
        label=[]
        for i in indices:
            data=self.datas[i]
            image_data.append(load_image(data[0],augment=True))

            token_data.append(np.array(data[1]))
            label.append(np.array(data[2]))

        return (np.array(image_data),np.array(token_data)),np.array(label)



    def on_epoch_end(self):
        np.random.shuffle(self.datas)


In [None]:
#checkpoints
class CustomCheckpoint(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        try:
            with open(f'/kaggle/working/training.log','a') as f:

                f.write(f"{logs['loss']},{logs['masked_accuracy']},{logs['val_loss']},{logs['val_masked_accuracy']}\n")

        except:


            with open(f'/kaggle/working/training.log','w') as f:
                    f.write(f"{logs['loss']},{logs['masked_accuracy']},{logs['val_loss']},{logs['val_masked_accuracy']}\n")

        self.model.save(f"model{epoch}.h5")
    def on_batch_end(self,batch,logs=None):
        try:
            with open(f'/kaggle/working/batch_training.log','a') as f:
                f.write(f"{logs['loss']},{logs['masked_accuracy']}\n")

        except:


            with open(f'/kaggle/working/batch_training.log','w') as f:
                    f.write(f"{logs['loss']},{logs['masked_accuracy']}\n")


**Model Training**

In [None]:
def masked_accuracy(label, pred):
  """
  Calculates the masked accuracy between the true labels and predicted labels.

  Args:
      label: A tensor of shape (batch_size, seq_length) containing the true labels.
      pred: A tensor of shape (batch_size, seq_length, target_vocab_size) containing the predicted labels.

  Returns:
      A scalar tensor representing the masked accuracy value.

  """
  pred = tf.argmax(pred, axis=2)
  label = tf.cast(label, pred.dtype)
  match = label == pred

  mask = label != 0

  match = match & mask

  match = tf.cast(match, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
  return tf.reduce_sum(match)/tf.reduce_sum(mask)
def masked_loss(label, pred):
  """
  Calculates the masked sparse categorical cross-entropy loss between the true labels and predicted labels.

  Args:
      label: A tensor of shape (batch_size, seq_length) containing the true labels.
      pred: A tensor of shape (batch_size, seq_length, target_vocab_size) containing the predicted labels.

  Returns:
      A scalar tensor representing the masked loss value.

  """
  mask = label != 0
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
  loss = loss_object(label, pred)

  mask = tf.cast(mask, dtype=loss.dtype)
  loss *= mask

  loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
  return loss



In [None]:
batch_size=32
epochs=20
input_shape=(848,608,1)
L=3
num_heads=7
MAX_SEQ_LEN=120

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():


    model = Model(model=0,image_size=input_shape,input_shape=input_shape,L=L,num_heads=num_heads,vocab_size=len(t2v),max_seq_length=MAX_SEQ_LEN)
    model.summary()
    model.compile(
            optimizer=tf.optimizers.AdamW(learning_rate=0.001,weight_decay=0.0001),
            loss=masked_loss,
            metrics=[masked_accuracy])
    model.fit(
        TrainDataGenerator(train_data,batch_size),
        validation_data=TrainDataGenerator(test_data,batch_size),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[CustomCheckpoint()],
    )


In [None]:
#function to predict from trained model
def predict(names,model):
    images=[]
    tokens=[]
    batch_size=len(names)
    for obj in names:
        images.append(load_image(obj.image))
        tokens.append(get_tokens(obj.dsl))
    st=np.zeros((batch_size,MAX_SEQ_LEN))
    ends=np.zeros(batch_size)
    loc=0
    while ends.sum()!=batch_size:
        cur=np.argmax(model.predict((np.array(images),st),verbose=0),axis=2)[:,loc]
        for i in range(len(cur)):
            if ends[i]==0:
                if cur[i]==t2v['<END>']:
                    ends[i]=1
                elif loc==MAX_SEQ_LEN-2:
                    ends[i]=1
                    cur[i]=t2v['<END>']

                st[i][loc+1]=cur[i]

        loc+=1
    return [st[i][1:list(st[i]).index(t2v['<END>'])] for i in range(batch_size)],[token[1:-1] for token in tokens]


In [None]:
#Loading Test Dataset
datas=[sketch.split(".")[0] for sketch in  os.listdir(f'/kaggle/input/mymodels/handmade/sketch')]
test_data=[Data(f'/kaggle/input/mymodels/handmade/sketch/{i}.jpg',f'/kaggle/input/mymodels/handmade/dsl/{i}.dsl') for i in datas]
print([data.image for data in test_data])
ref=[]
hypo=[]
batch_size=32
for i in range(0,len(test_data),batch_size):
    print(i)
    if(i+batch_size)>=len(test_data):
           predicted,tokens= predict(test_data[i:],model)
    else:
        predicted,tokens=predict(test_data[i:i+batch_size],model)
    ref+=list(predicted)
    hypo+=list(tokens)

#Dumping Result
with open("result.json",'w') as f:
    json.dump({"ref":[list(ref[i]) for i in range(len(ref))],"predicted":hypo},f)






In [None]:
#BLEU Metrics
from nltk.translate.bleu_score import sentence_bleu
total=[]
def calculate_bleu_scores(references, hypotheses, n_gram):
    weights = tuple([1.0 / n_gram] * n_gram + [0] * (4 - n_gram))
    scores = []

    for reference, hypothesis in zip(references, hypotheses):
        score = sentence_bleu([reference], hypothesis, weights=weights)
        scores.append(score)
    total.append(scores)

    return np.average(scores)
for n in range(1,11):
    print(calculate_bleu_scores([list(ref[i]) for i in range(len(ref))],hypo,n))

In [None]:
#Rouge Metric
from rouge_score import rouge_scorer
sc=['rouge1', 'rouge2','rouge5','rougeL']
scorer = rouge_scorer.RougeScorer(sc, use_stemmer=True)
rouges={x:{'precision':[],'recall':[],'f1':[]} for x in sc}
# Calculate ROUGE scores for each reference-candidate pair
for rf, cand in zip([list(ref[i]) for i in range(len(ref))],hypo):
    rf=' '.join([v2t[e] for e in rf])
    cand=' '.join([v2t[e] for e in cand])
    scores = scorer.score(rf, cand)
    for key, value in scores.items():
        rouges[key]['precision'].append(value.precision)
        rouges[key]['recall'].append(value.recall)
        rouges[key]['f1'].append(value.fmeasure)

for rg in rouges.keys():
    print(np.argmax(rouges[rg]['f1']),np.argmin(rouges[rg]['f1']))

    for m in ['precision','recall','f1']:

        rouges[rg][m]=sorted(rouges[rg][m])

        print('scores'+f"({m}):",np.average( rouges[rg][m]))
