# Test Pix2Struct model on Pix2Code HTML dataset

## Setup Envirnoment

In [1]:
!pip install transformers==4.33.1

Collecting transformers==4.33.1
  Downloading transformers-4.33.1-py3-none-any.whl (7.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.6/7.6 MB[0m [31m15.2 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.15.1 (from transformers==4.33.1)
  Downloading huggingface_hub-0.17.3-py3-none-any.whl (295 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m295.0/295.0 kB[0m [31m27.5 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1 (from transformers==4.33.1)
  Downloading tokenizers-0.13.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m44.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting safetensors>=0.3.1 (from transformers==4.33.1)
  Downloading safetensors-0.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [

In [2]:
#!pip install --upgrade git+https://github.com/huggingface/transformers

## Import necessary libraries

In [3]:
from google.colab import drive
import os
import zipfile
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import re
from transformers import Pix2StructForConditionalGeneration, AutoProcessor
import torch
from torch.nn import functional as F
from pathlib import Path
from nltk import edit_distance
import numpy as np
from tqdm import tqdm
from nltk.translate.bleu_score import corpus_bleu, sentence_bleu, SmoothingFunction
from torch.utils.data import random_split
import random

## Define variables and parameters

In [11]:
G_DRIVE_FOLDER = '/content/drive/MyDrive/Datasets/'
G_DRIVE_FOLDER_CHECKPOINTS = '/content/drive/MyDrive/Checkpoints/'
DATASET_NAME = 'pix2code_web_with_html'
ZIP_NAME = DATASET_NAME + '.zip'
DESTINATION_FOLDER= '/content/data/'
DATASET_FOLDER = DESTINATION_FOLDER + 'web_with_html/' # unzipped name
OUTPUT_FOLDER = '/content/drive/MyDrive/Testing_output/' + DATASET_NAME

EXPERIMENT_NAME = "Pix2Struct_Pix2Code_HTML_FULL_TEST"

MAX_SENTENCE_LEN = 1024

MAX_PATCHES = 1024

DEBUG = False
VERBOSE = True

BATCH_SIZE = 10

TRAIN_SET_PERCENTAGE = 0.89
VALID_SET_PERCENTAGE = 0.01
# TEST_SET_PERCENTAGE is 1 - TRAIN_SET_PERCENTAGE - VALID_SET_PERCENTAGE # Use 1000 for test

RANDOM_SEED = 100

LOAD_FROM_CHECKPOINT = True
LAST_CHECKPOINT_NAME = "Pix2Struct_Pix2Code_HTML_FULL_epoch[9].pth"

In [5]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


## Load Pix2Code Dataset

### Mount Google Drive

In [6]:
drive.mount('/content/drive')

Mounted at /content/drive


### Import zip file from Google Drive

In [7]:
os.makedirs(DESTINATION_FOLDER, exist_ok=True)

with zipfile.ZipFile(G_DRIVE_FOLDER + ZIP_NAME, "r") as zf:
    zf.extractall(DESTINATION_FOLDER)

## Load Model and Processor

In [8]:
repo_id = "google/pix2struct-base"

processor = AutoProcessor.from_pretrained(repo_id)
model = Pix2StructForConditionalGeneration.from_pretrained(repo_id, is_encoder_decoder=True)

Downloading (…)rocessor_config.json:   0%|          | 0.00/231 [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/2.61k [00:00<?, ?B/s]

Downloading spiece.model:   0%|          | 0.00/851k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/3.27M [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/2.20k [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/4.92k [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/1.13G [00:00<?, ?B/s]

## Create Dataset class

In [9]:
def preprocess_html_file(html_text):
    text_without_header = re.sub(r'<header>.*?</header>', '', html_text, flags=re.DOTALL)
    text_without_footer = re.sub(r'<footer class="footer">.*?</footer>', '', text_without_header, flags=re.DOTALL)
    text_without_script = re.sub(r'<script .*?</script>', '', text_without_footer, flags=re.DOTALL)
    text_without_linebreaks = text_without_script.replace('\n', ' ')
    text_without_multiple_spaces = re.sub(r'\s+', ' ', text_without_linebreaks)
    return text_without_multiple_spaces

### Filter files with less tokens than 1024 and add new unknown tokens

In [12]:
# Get a list of all files in root_dir
files = os.listdir(DATASET_FOLDER)

# Find only html files
all_html_files = [file for file in files if file.endswith('.html')]

In [13]:
# Find max length
max_length = 0

bigger_than_1024 = 0
lower_than_1024 = 0

html_files_filtered = []

tokens_to_add = set()

for html_file_path in all_html_files:
    with open(DATASET_FOLDER + "/" + html_file_path, "r") as reader:
        preprocessed_text = preprocess_html_file(reader.read())
        splitted_text = processor.tokenizer(preprocessed_text).tokens()
        if len(splitted_text) > 1024:
            bigger_than_1024 += 1
        else:
            lower_than_1024 += 1
            html_files_filtered.append(html_file_path)
            tokens_to_add = tokens_to_add.union(set(splitted_text))

print("bigger_than_1024= ", bigger_than_1024)
print("lower_than_1024= ", lower_than_1024)

newly_added_num = processor.tokenizer.add_tokens(list(tokens_to_add))
print(f"Number of new tokens = {newly_added_num}")

# Resize the model's token embeddings if there are new tokens
if newly_added_num > 0:
    model.decoder.resize_token_embeddings(len(processor.tokenizer), pad_to_multiple_of=8)

bigger_than_1024=  9
lower_than_1024=  1733
Number of new tokens = 0


In [14]:
print(len(html_files_filtered))

1733


In [15]:
random.seed(RANDOM_SEED)

# Use the same seed, so that parts remain the same
random.shuffle(html_files_filtered)

train_len = int(TRAIN_SET_PERCENTAGE * len(html_files_filtered))
valid_len = int(VALID_SET_PERCENTAGE * len(html_files_filtered))

train_paths = html_files_filtered[:train_len]
valid_paths = html_files_filtered[train_len:train_len+valid_len]
test_paths = html_files_filtered[train_len+valid_len:]

print(f"TRAIN_SET size = {len(train_paths)}")
print(f"VALID_SET size = {len(valid_paths)}")
print(f"TEST_SET size = {len(test_paths)}")

TRAIN_SET size = 1542
VALID_SET size = 17
TEST_SET size = 174


In [16]:
class Pix2CodeDataset(Dataset):
    def __init__(self, root_dir, transform, text_files_paths):

        self.root_dir = root_dir
        self.transform = transform
        self.text_files_paths = text_files_paths

        self.max_patches = MAX_PATCHES
        self.max_length = MAX_SENTENCE_LEN
        self.ignore_id = -100

        self.encodings = []

        for text_file in tqdm(text_files_paths):
            image_file = text_file.replace('.html', '.png')

            # Directly process the text files, and save them in the ram
            # Do the same also for images, if there is enough space in memory
            text_file_path = os.path.join(root_dir, text_file)
            image_file_path = os.path.join(root_dir, image_file)

            # Load image
            image = Image.open(image_file_path).convert('RGB')

            if DEBUG:
                image.show()

            if self.transform:
                image = self.transform(image)

            encoding = processor(images=image, max_patches=self.max_patches, return_tensors="pt")
            encoding = {k:v.squeeze() for k,v in encoding.items()}

            # Load text
            with open(text_file_path, 'r') as f:
                text = f.read()
                text_cleaned = preprocess_html_file(text)

            if DEBUG:
              print("text:")
              print(text)
              print("\n\n\ntext_cleaned:")
              print(text_cleaned)

            input_ids = processor.tokenizer(
                text_cleaned,
                max_length=self.max_length,
                padding="max_length",
                truncation=True,
                return_tensors="pt",
            ).input_ids

            labels = input_ids.squeeze().clone()
            labels[labels == processor.tokenizer.pad_token_id] = self.ignore_id  # model doesn't need to predict pad token

            encoding["labels"] = labels

            # For each sample save directly the encoding of both text and image
            self.encodings.append(encoding)

    def __len__(self):
        return len(self.encodings)

    def __getitem__(self, idx):
        return self.encodings[idx], self.text_files_paths[idx].replace(".html", "")

In [17]:
# Transformations for the image
transform = transforms.Compose([
    transforms.ToTensor(),  # convert PIL Image to PyTorch Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # normalize for pretrained models
])

# Instantiate the CustomDataset
test_dataset = Pix2CodeDataset(DATASET_FOLDER, transform, test_paths)

# Use DataLoader for batching and shuffling
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

100%|██████████| 174/174 [00:14<00:00, 11.93it/s]


In [18]:
print(f"test_dataloader size = {len(test_dataloader)}")

test_dataloader size = 18


In [19]:
batch = next(iter(test_dataloader))

In [20]:
encoding, text_file_paths = batch

In [21]:
print(text_file_paths)

('A3BDB090-DFB9-4191-89B3-4ACF951532CE', '596668CC-D14E-44E9-BFF5-8624F0363AC9', '0CF69A80-60FE-4B72-9AD3-5DA1C733A492', '1A9C290F-2EAD-4DBD-93B7-6DD2CF005700', '50C81911-47A8-4A74-9E88-EC9288A10B62', 'E2BD5710-88B3-4886-A9E1-DE3D6EBB37C2', '0D864C09-F659-47FE-A0EA-7347E2963397', 'A73BE0E2-5131-4C23-B89F-0C0CF4470F6D', '4B12C02E-E854-41C3-B66E-06887B1505CB', 'D670AD8D-DD4D-4C72-9E75-8DA532391FA7')


In [22]:
encoding

{'flattened_patches': tensor([[[ 1.0000,  1.0000, -0.0120,  ..., -0.0120,  0.2906,  0.6467],
          [ 1.0000,  2.0000, -0.0120,  ..., -5.8958, -5.7246, -5.3417],
          [ 1.0000,  3.0000, -0.0120,  ..., -5.8819, -5.6682, -5.2385],
          ...,
          [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000]],
 
         [[ 1.0000,  1.0000, -0.1426,  ..., -0.1426,  0.2291,  0.6664],
          [ 1.0000,  2.0000, -0.1426,  ..., -6.6604, -4.6235, -2.1096],
          [ 1.0000,  3.0000, -0.1426,  ..., -6.5305, -4.5251, -2.0528],
          ...,
          [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
          [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000]],
 
         [[ 1.0000,  1.0000, -0.0407,  ..., -0.0407,  0.2863,  0.6712],
       

In [23]:
encoding["flattened_patches"][0]

tensor([[ 1.0000,  1.0000, -0.0120,  ..., -0.0120,  0.2906,  0.6467],
        [ 1.0000,  2.0000, -0.0120,  ..., -5.8958, -5.7246, -5.3417],
        [ 1.0000,  3.0000, -0.0120,  ..., -5.8819, -5.6682, -5.2385],
        ...,
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000]])

In [24]:
encoding["labels"]

tensor([[50190, 50227,   411,  ...,  -100,  -100,  -100],
        [50190, 50227,   411,  ...,  -100,  -100,  -100],
        [50190, 50227,   411,  ...,  -100,  -100,  -100],
        ...,
        [50190, 50227,   411,  ...,  -100,  -100,  -100],
        [50190, 50227,   411,  ...,  -100,  -100,  -100],
        [50190, 50227,   411,  ...,  -100,  -100,  -100]])

In [25]:
encoding["labels"][0]

tensor([50190, 50227,   411,  ...,  -100,  -100,  -100])

In [26]:
labels_list = encoding["labels"][0].tolist()

# Filter out the -100 values
filtered_labels = [token for token in labels_list if token != -100]

# Decode the cleaned list of tokens
decoded_text_example = processor.tokenizer.batch_decode([filtered_labels], skip_special_tokens=True)[0]


In [27]:
decoded_text_example



In [28]:
for k,v in encoding.items():
    print(k,v.shape)

flattened_patches torch.Size([10, 1024, 770])
attention_mask torch.Size([10, 1024])
labels torch.Size([10, 1024])


### Main Testing function

In [29]:
START_TOKEN_ID = PAD_TOKEN_ID = processor.tokenizer.pad_token_id

In [30]:
def testing_loop(testing_dataloader, model, processor, config, description):
    model.eval()
    bleu_scores = []

    with torch.no_grad():
        test_loop = tqdm(enumerate(testing_dataloader), total=len(testing_dataloader), desc=description)
        for i, batch in test_loop:
            encoding, text_file_paths = batch
            encoding = move_to_device(encoding)
            labels, flattened_patches, attention_mask = encoding["labels"], encoding["flattened_patches"], encoding["attention_mask"]

            outputs = model.generate(flattened_patches=flattened_patches, attention_mask=attention_mask, max_new_tokens=MAX_SENTENCE_LEN)

            predictions = processor.tokenizer.batch_decode(outputs, skip_special_tokens=True)

            labels[labels == -100] = 0
            answers = processor.tokenizer.batch_decode(labels, skip_special_tokens=True)

            for pred, answer, text_file_path in zip(predictions, answers, text_file_paths):
                with open(f"{OUTPUT_FOLDER}/{text_file_path}_pred.txt", "w") as f:
                    print(pred, file=f)

                with open(f"{OUTPUT_FOLDER}/{text_file_path}_answer.txt", "w") as f:
                    print(answer, file=f)

    return


In [31]:
config = {
          "verbose": VERBOSE,
}

In [32]:
def validate_config(config):
    # Check required keys
    required_keys = [
        "verbose"
    ]
    for key in required_keys:
        if key not in config:
            raise ValueError(f"Key '{key}' must be present in the configuration.")

    # Check that values are in expected ranges
    if not isinstance(config["verbose"], bool):
        raise ValueError("verbose must be a boolean value.")

In [33]:
validate_config(config)
print(config)

{'verbose': True}


### Utility functions

In [34]:
def move_to_device(data):
    if isinstance(data, (list,tuple)):
        return [move_to_device(x) for x in data]
    elif isinstance(data, dict):
        return {k: move_to_device(v) for k, v in data.items()}
    elif isinstance(data, torch.Tensor):
        return data.to(DEVICE)
    else:
        return data

## Test the model

In [35]:
def test_model(config, processor, model):
    print("Loading model from checkpoint: ", LAST_CHECKPOINT_NAME)
    checkpoint = torch.load(G_DRIVE_FOLDER_CHECKPOINTS + LAST_CHECKPOINT_NAME)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(DEVICE)
    testing_loop(test_dataloader, model, processor, config, f"Test loop")

In [36]:
test_model(config, processor, model)

Loading model from checkpoint:  Pix2Struct_Pix2Code_HTML_FULL_epoch[9].pth


Test loop: 100%|██████████| 18/18 [36:50<00:00, 122.83s/it]
