In [None]:
from huggingface_hub import snapshot_download

In [None]:
dataset_path = snapshot_download(repo_id="biglab/webui-7k", repo_type="dataset")
dataset_path

In [None]:
import enum
import os
import zipfile
import gzip
import json
from glob import glob
import re

In [None]:
import numpy as np
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt

In [None]:
# dead code
class WidType(enum.StrEnum):
    IMG = "img",
    FIGURE = "figure",
    S_TEXT = "StaticText",
    SEP = "separator",
    BR = "LineBreak", # <br>
    LINK = "link",
    LIST_ITEM = "listitem",
    HEADING = "heading",
    PARAGRAPH = "paragraph"



In [None]:
# Create output dir
d1_path = os.path.join(dataset_path, "dataset1")
if not os.path.exists(d1_path):
    os.mkdir(d1_path)

## Dataset decompression

In [None]:
# Merge 2 zip files
files = glob(os.path.join(dataset_path, "*.zip*"))
output = os.path.join(d1_path, "dataset.zip")

os.system("cat " + ' '.join(files) + " > " + output)

In [None]:
from glob import glob
parts = sorted(glob(os.path.join(dataset_path, "*.zip*")))
print("Number of parts:", len(parts))
print(parts[:5])

In [None]:
import shutil, pathlib

parts   = sorted(glob(os.path.join(dataset_path, "*.zip*")))
target  = pathlib.Path(d1_path) / "dataset.zip"
target.parent.mkdir(parents=True, exist_ok=True)

with target.open("wb") as w:
    for p in parts:
        with open(p, "rb") as r:
            shutil.copyfileobj(r, w)

print("Size:", target.stat().st_size, "bytes")

In [None]:
import zipfile
with zipfile.ZipFile(target) as z:
    print("Number of files:", len(z.namelist()))
    assert z.testzip() is None, "error"

In [None]:
d1_zip = os.path.realpath(output)
zipfile.is_zipfile(d1_zip)

In [None]:
with zipfile.ZipFile(output, 'r') as z:
    z.extractall(d1_path)

## Loading files

In [None]:
#files = os.path.join(d1_path, "train_split_web7k")
files = r"C:\Users\70133\.cache\huggingface\hub\datasets--biglab--webui-7k\snapshots\60f7b3c4b9409f75551664adc1564625dfc33c2e\dataset1\train_split_web7k"
print(files)
print("files number", len(os.listdir(files)))

In [None]:
# Load pages metadata
os.chdir(r"C:\Users\70133\Documents\GitHub\jarvis-core")

with open('webui/metadata/screenclassification/class_map_enrico.json', 'r') as f:
    labels = json.load(f)["idx2Label"].values()
#dtypes = {**{'page_id': object, 'file': object}, **dict([(x, np.float64) for x in labels])}
dtypes = {**{'page_id': np.int64, 'file': object}, **dict([(x, np.float64) for x in labels])}

df_c = pd.read_csv('webui/metadata/screenclassification/silver_webui-multi_topic.csv', sep=r'[,\\]', names=['page_id', 'file', *labels], dtype=dtypes, engine='python')

In [None]:
df_c.head(5)

In [None]:
# Take max of each screenshot and save it
df_c['label_max'] = df_c[labels].idxmax(axis=1)
df_c['certainty'] = df_c[labels].max(axis=1)

df_cf = (df_c.set_index(['page_id', 'file']).sort_index())      # index = (page_id, file)
assert df_cf.index.is_unique   

In [None]:
df_cf.head(5)

In [None]:
df_cf["label_max"].value_counts()

## Dataset parsing

In [None]:
class FileType(enum.StrEnum):
    """
    Enum containing all filetypes
    """

    AXTree = "axtree.json.gz",
    BB = "bb.json.gz",
    Box = "box.json.gz",
    Class = "class.json.gz",
    HTML = "html.html",
    Links = "links.json",
    ScreenFull = "screenshot-full.webp",
    Screen = "screenshot.webp",
    Style = "style.json.gz",
    URL = "url.txt",
    Viewport = "viewport.json.gz"

# Helper functions

def ft_is_gz(ft: FileType) -> bool:
    """
    Check if a FileType is a gz file
    """
    return ft in [FileType.AXTree, FileType.BB, FileType.Box, FileType.Class, FileType.Style, FileType.Viewport]

def ft_is_json(ft: FileType) -> bool:
    """
    Check if a FileType is in json format
    """
    return ft in [FileType.AXTree, FileType.BB, FileType.Box, FileType.Class, FileType.Links, FileType.Style, FileType.Viewport]

def ft_is_webp(ft: FileType) -> bool:
    """
    Check if a FileType is a webp image
    """
    return ft in [FileType.ScreenFull, FileType.Screen]

In [None]:
class Page():
    """
    A class which loads page content (screen)
    """
    def __init__(self, path, screen_type: str, debug: bool = True):
        self.screen_type = screen_type
        self.path = path
        self.skip = False

        # Parse screen type
        self.desktop = screen_type.startswith("default")
        if self.desktop:
            (self.width, self.height) = tuple(map(int, screen_type.split('_')[1].split('-')))
        else:
            (self.width, self.height) = (0, 0)

        # Load filenames
        #files = list(filter(lambda x: x.startswith(screen_type), os.listdir(path)))
        self.fnames = dict()
        self.files = dict()
        missing = False

        for ft in FileType:
            file_path = screen_type + "-" + ft.value
            if os.path.exists(os.path.join(self.path, file_path)):
                self.fnames[ft] = file_path
            else:
                missing = True


        if missing:
            # Some files are missing
            if debug:
                for ft, v in self.fnames.items():
                    assert ft_is_webp(ft), "Page::__init__() : non-webp file in partial download : {}".format(v)
            self.skip = True


    def load(self, debug: bool = True, *args):
        if self.skip:
            return
        if len(args) == 0:
            ftypes = FileType
        else:
            ftypes = args

        for ft in ftypes:
            fname = os.path.join(self.path, self.fnames[ft])
            if debug:
                print("Page::load() : loading file {}...".format(self.fnames[ft]))

            if ft_is_webp(ft):
                # Check if the file is empty
                if os.path.getsize(fname) == 0:
                    if debug:
                        print("Page::load() : {} is empty".format(self.fnames[ft]))
                    continue
                self.files[ft] = Image.open(fname)

            elif ft_is_gz(ft):
                # gzip
                with gzip.open(fname) as f:
                    if ft_is_json(ft):
                        self.files[ft] = json.load(f)
                    else:
                        # plaintext
                        self.files[ft] = f.read()
            else:
                # not compressed
                #with open(fname) as f:
                with open(fname, encoding="utf-8", errors="replace") as f:
                    if ft_is_json(ft):
                        self.files[ft] = json.load(f)
                    else:
                        # plaintext
                        self.files[ft] = f.read()
                        

class PageLoader():
    """
    A class which loads multiple pages (with different resolutions). Loads only labels of the specified class
    """
    def __init__(self, path, debug: bool = True, *args):
        self.path = path
        self.page_id = np.int64(os.path.basename(path))

        #print('\nID:', self.page_id)
        self.skip = False

        if debug:
            print("PageLoader()::__init__() : opening", self.page_id)
            #print(os.listdir(self.path))

        # Extract pages
        prefixes = map(lambda x: '-'.join(x.split('-')[:-1]), os.listdir(self.path))
        screen_types = list(filter(lambda x: x.find("screenshot") == -1, list(set(prefixes))))

        # Load pages
        self.pages = dict()
        for s in screen_types:
            page = Page(self.path, s, debug)
            if not page.skip:
                self.pages[s] = page
                self.pages[s].load(debug, *args) # load all

        # Find best page(with bigest width)
        self.best = next(
                (p for p in sorted(self.pages.values(), reverse=True, key=lambda p: p.width)
                 if p.files.get(FileType.ScreenFull)), None)
        
        
        # set screen_type, label and certainty for best Page
        try:
            self.screen_type = self.best.screen_type
        except AttributeError:
            self.screen_type = ''
        try:
            self.label = df_cf.loc[(self.page_id, self.best.screen_type + '-screenshot.webp'), 'label_max']
        except (AttributeError, KeyError):
            self.label = ''
        try:
            self.certainty = df_cf.loc[(self.page_id, self.best.screen_type + '-screenshot.webp'), 'certainty']
        except (AttributeError, KeyError):
            self.certainty = 0.0

    
    def image(self) -> [Image.Image, None]:
        """
        Find and return the largest page image width. Desktop images always have the largest priority
        """
        if self.best and self.best.files.get(FileType.ScreenFull) is not None:
            return self.best.files[FileType.ScreenFull]
        return None

In [None]:
def label_certainty_thresh(c: np.float64) -> bool:      
    return c >= 0.6

In [None]:
def preview(img: Image.Image, width: int) -> Image.Image:
    return img.resize((width, int(img.height / img.width * width)))

In [None]:
_ = plt.figure(figsize=(36, 20)) # 1920/1080 scaled to 360p
(nrows, ncols) = (5, 5) # Set the grid size
start = nrows*ncols # Start index
skip = 0

files_l = sorted(os.listdir(files))
# for index in range(0, nrows*ncols):
for index in range(0, nrows*ncols):
    while True: # Find matching
        fpath = files_l[index + start + skip]
        # Parse the page
        loader = PageLoader(os.path.join(files, fpath), debug=False)
        if not label_certainty_thresh(loader.certainty):
            skip += 1
            continue # Skip

        # Create a subplot and render an image
        ax = plt.subplot(nrows, ncols, index+1)
        _ = plt.title(loader.label)
        _, _ = plt.xticks([]), plt.yticks([]);
        image = loader.image()
        if image is not None:
            _ = plt.imshow(preview(image, 360))
            print(loader.page_id, end=' ')
            if index % ncols == 4:
                print('')
        else:
            skip += 1
            continue # Load the next sample
        # Else found
        break
_ = plt.show()

## Bounding Boxes

In [None]:
# Take 1655885631145 since it is in huggingface dataset
path = r"C:\Users\70133\.cache\huggingface\hub\datasets--biglab--webui-7k\snapshots\60f7b3c4b9409f75551664adc1564625dfc33c2e\dataset1\train_split_web7k\1655885631145"
loader = PageLoader(path, debug=False)

# Show
print(loader.pages.keys())
print('screen_type:', loader.screen_type, 'page_id:', loader.page_id, 'label:', loader.label)
plt.imshow(loader.image())
plt.axis("off")
plt.show()

In [None]:
# AXTree
nodes = loader.best.files[FileType.AXTree]["nodes"]
print(json.dumps(nodes, indent=2, ensure_ascii=False))

In [None]:
# BB
bb = loader.best.files.get(FileType.BB)
for b_id, b in bb.items():
    print(b_id, b)

In [None]:
import cv2
from pathlib import Path

bb = loader.best.files.get(FileType.BB) # Boxes
pil_img = loader.image()                                    # WebPImageFile
img_rgb = np.array(pil_img)                                 # RGB
img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)          # BGR for OpenCV

# Draw boxes
for b_id, b in bb.items():
    if b is None or b["width"] == 0 or b["height"] == 0:
        continue
        
    x1, y1 = int(b["x"]), int(b["y"])
    x2 = int(b["x"] + b["width"])
    y2 = int(b["y"] + b["height"])
    cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2) # Green, width = 2px
    # cv2.putText(img_bgr, b_id, (x1, max(y1 - 8, 0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) # Text

img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) # Back to RGB

# Show
plt.imshow(img_rgb)
plt.axis("off")
plt.show()

# Save
out_dir = Path(r"C:\Users\70133\.cache\huggingface\hub\datasets--biglab--webui-7k\snapshots\60f7b3c4b9409f75551664adc1564625dfc33c2e\dataset1\bb")
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "boxed.png"

if not cv2.imwrite(str(out_path), img_bgr):
    raise RuntimeError("error")
print("Image saved in:", out_path)