# Загрузка библиотек

In [31]:
import os
import json
import torch

import numpy as np

from glob import glob
from transformers import MarkupLMFeatureExtractor, MarkupLMProcessor, MarkupLMForTokenClassification
from bs4 import BeautifulSoup
from torch.utils.data import Dataset, random_split, DataLoader
from tqdm import tqdm

# Configuration

In [32]:
classification_model_path = "title_date_tag.pth"
segmentation_model_path = "segmentation_model.pth"

allowed_labels = ["title", "short_text", "date", "time", "tag", "short_title", "author"]


class_label2id = {"OTHER" : 0,
            "title" : 1, 
            "short_text" : 0, 
            "date" : 2, 
            "time" : 2, 
            "tag" : 3, 
            "short_title" : 0, 
            "author" : 0}

class_id2label = {0: "OTHER",
            1 : "title",
            2 : "date",
            3 : "tag"}

colors = {1 : "blue",
          2 : "purple",
          3 : "brown"}


block_label2id = {"BEGIN": 1, "OTHER": 0}

block_id2label = {1: "BEGIN", 0: "OTHER"}

# Загрузка данных

# Инициалиация датасета

In [33]:
classification_model = MarkupLMForTokenClassification.from_pretrained("microsoft/markuplm-base", id2label=class_id2label, label2id=class_label2id)
segmentation_model = MarkupLMForTokenClassification.from_pretrained("microsoft/markuplm-base", id2label=block_id2label, label2id=block_label2id)

if os.path.exists(classification_model_path):
    classification_model.load_state_dict(torch.load(classification_model_path))
    print("Classification Model Loaded")
else:
    raise Exception("No model found")


if os.path.exists(segmentation_model_path):
    segmentation_model.load_state_dict(torch.load(segmentation_model_path))
    print("Segmentation Model Loaded")
else:
    raise Exception("No model found")

Some weights of MarkupLMForTokenClassification were not initialized from the model checkpoint at microsoft/markuplm-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of MarkupLMForTokenClassification were not initialized from the model checkpoint at microsoft/markuplm-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Classification Model Loaded
Segmentation Model Loaded


# TEST

In [34]:
torch.cuda.is_available()

True

In [35]:
import torch
import lxml
from torch.optim import AdamW
from tqdm.auto import tqdm
from metrics import generate_segmentation_str, path_contains
from collections import defaultdict
from lxml import etree
from urllib.request import Request, urlopen
from lxml.html.clean import Cleaner
import os
import webbrowser

optimizer = AdamW(classification_model.parameters(), lr=2e-5)

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cuda")
classification_model.to(device)
segmentation_model.to(device)
print(device)

cuda


In [36]:
import argostranslate.package
import argostranslate.translate
from argostranslate.translate import get_installed_languages
from collections.abc import Iterable

import random
import asyncio
import json
import re
import unicodedata
import lxml
import os
import sys
import time 

from lxml import etree
from lxml.html.clean import Cleaner
from tqdm import tqdm
from glob import glob

In [37]:
def clean_spaces(text):
    return " ".join(re.split(r"\s+", text.strip()))


def clean_format_str(text):
    text = "".join(ch for ch in text if unicodedata.category(ch)[0] != "C")
    text = clean_spaces(text)
    return text


def get_dom_tree(html, need_clean):
    if need_clean:
        cleaner = Cleaner()
        cleaner.scripts = True
        cleaner.javascript = True
        cleaner.comments = True
        cleaner.style = True
        cleaner.inline_style = False
        cleaner.links = False
        cleaner.meta = False
        cleaner.page_structure = False
        cleaner.processing_instructions = True
        cleaner.embedded = False
        cleaner.frames = False
        cleaner.forms = False
        cleaner.annoying_tags = True
        cleaner.remove_unknown_tags = False
        cleaner.safe_attrs_only = False
        cleaner.add_nofollow = False
        
        html = html.replace("\0", "")  # Delete NULL bytes
        html = clean_format_str(html)
        x = lxml.html.fromstring(html)
        etree_root = cleaner.clean_html(x)
        dom_tree = etree.ElementTree(etree_root)
    else:
        dom_tree = lxml.html.fromstring(html).getroottree()
    return dom_tree

def ru2en(text, translator):

    translated_text = translator.translate(text)
    return translated_text

def translate_html(html_str, translator, need_clean=False, from_code='auto', to_code="en"):
    tree = get_dom_tree(html_str, need_clean)
    tasks = []
    for e in tree.iter():
        if e.text:
            node = unicodedata.normalize('NFKD', e.text)
            e.text = ru2en(node, translator)
        if e.tail:
            node = unicodedata.normalize('NFKD', e.tail)
            e.tail = ru2en(node, translator)
            
    return lxml.html.tostring(tree, doctype="<!DOCTYPE html>", encoding='unicode')

from_code = "ru"
to_code = "en"
argostranslate.package.update_package_index()
available_packages = argostranslate.package.get_available_packages()
package_to_install = next(
    filter(
        lambda x: x.from_code == from_code and x.to_code == to_code, available_packages
    )
)
argostranslate.package.install_from_path(package_to_install.download())

ru, en = get_installed_languages()
translator = en.get_translation(ru)

In [38]:
classification_model.eval()
segmentation_model.eval()

extractor = MarkupLMFeatureExtractor()
valid_processor = MarkupLMProcessor.from_pretrained("microsoft/markuplm-base")
valid_processor.parse_html = False


experiment_results = defaultdict(int)

all_true_results = 0
all_extracted_results = 0
ious = []
while(True):
    "Insert your url:"
    url = input()
    print(url)
    req = Request(
        url=url, 
        headers={'User-Agent': 'Mozilla/5.0'}
    )
    html_response = urlopen(req).read().decode()
    
    tree = lxml.html.fromstring(html_response)
    html_response = translate_html(html_response, translator)
    item = extractor(html_response)
    nodes, xpaths = item['nodes'], item['xpaths']
    

    block_encoding = valid_processor(nodes=nodes, xpaths=xpaths, stride=0,
                               padding="max_length", truncation=True, return_tensors="pt", 
                               return_overflowing_tokens=True, return_offsets_mapping=True)
    
    inputs = {k:v.to(device) for k,v in block_encoding.items()}
    
    inputs.pop("overflow_to_sample_mapping")
    offset_mapping = inputs.pop("offset_mapping")
    
    with torch.no_grad():
        segmentation_output = segmentation_model(**inputs)
    segmentation_predictions = segmentation_output.logits.argmax(dim=-1)
    
    
    pred_block_xpaths = []
    
    pred_block_xpaths = []
    
    statistic = {k: defaultdict(int) for k in block_id2label.keys()}
    
    for idx in range(len(segmentation_predictions)):
        for pred_id, word_id, offset in zip(segmentation_predictions[idx].tolist(), block_encoding.word_ids(idx), offset_mapping[idx].tolist()):
            if word_id is not None and offset[0] == 0:
                if pred_id == 1:
                    suffix = xpaths[0][word_id]
                    suffix = re.sub(r"\[\d*\]", "", suffix)
                    statistic[pred_id][suffix] += 1
                    
    allowed_suffix = dict()
    
    for label in statistic.keys():
        suffix = max(statistic[label], key=statistic[label].get, default="")
        allowed_suffix[suffix] = label
              
    for idx in range(len(segmentation_predictions)):
        for pred_id, word_id, offset in zip(segmentation_predictions[idx].tolist(), block_encoding.word_ids(idx), offset_mapping[idx].tolist()):
            if word_id is not None and offset[0] == 0:
                suffix = xpaths[0][word_id]
                suffix = re.sub(r"\[\d*\]", "", suffix)             
                
                if (suffix in allowed_suffix):
                    pred_block_xpaths += [xpaths[0][word_id]]

    pred_block_prefix = generate_segmentation_str(pred_block_xpaths)
    for xpath in pred_block_prefix:
        # print(xpath)
        try:
            element = tree.xpath(xpath)[0]
            element.set("style", "border:dashed; border-color: green")
        except Exception:
            pass
    print(pred_block_prefix)
    print("Segmentation Done")
    
    # ^^^^^ Main result of segmentation
    
    #CLASSIFICATION
    nodes, xpaths = item['nodes'], item['xpaths']
    

    class_encoding = block_encoding
    
    inputs = {k:v.to(device) for k,v in class_encoding.items()}
    
    inputs.pop("overflow_to_sample_mapping")
    offset_mapping = inputs.pop("offset_mapping")
    
    with torch.no_grad():
        classification_output = classification_model(**inputs)
    classification_predictions = classification_output.logits.argmax(dim=-1)
    print("Classification Done")
    
    predicted_entities = defaultdict(list)

    out_xpaths = defaultdict(dict)
    
    statistic = {k: defaultdict(int) for k in class_id2label.keys()}
    
    for idx in range(len(classification_predictions)):
        for pred_id, word_id, offset in zip(classification_predictions[idx].tolist(), class_encoding.word_ids(idx), offset_mapping[idx].tolist()):
            if word_id is not None and offset[0] == 0:
                
                in_predicted_blocks = [path_contains(block_xpath.split('/'), xpaths[0][word_id].split('/')) for block_xpath in pred_block_prefix]

                if pred_id != 0 and any(in_predicted_blocks):
                    
                    suffix = xpaths[0][word_id]
                    suffix = re.sub(r"\[\d*\]", "", suffix)
                    statistic[pred_id][suffix] += 1
                    
    allowed_suffix = dict()
    
    for label in statistic.keys():
        suffix = max(statistic[label], key=statistic[label].get, default="")
        allowed_suffix[suffix] = label
    
    
    for idx in range(len(classification_predictions)):
        for pred_id, word_id, offset, probability in zip(classification_predictions[idx].tolist(), class_encoding.word_ids(idx), offset_mapping[idx].tolist(), classification_output.logits[idx]):
            if word_id is not None and offset[0] == 0:
                
                in_predicted_blocks = [path_contains(block_xpath.split('/'), xpaths[0][word_id].split('/')) for block_xpath in pred_block_prefix]

                suffix = re.sub(r"\[\d*\]", "", xpaths[0][word_id])

                if (suffix in allowed_suffix) and any(in_predicted_blocks):
                    try:
                        element = tree.xpath(xpaths[0][word_id])[0]
                        element.set("style", f"border:dashed; border-color: {colors[pred_id]}")
                        element.set("probs", f"{probability.tolist()}")
                    except Exception:
                        pass
    
    # for idx in range(len(classification_predictions)):
    #     for pred_id, word_id, offset in zip(classification_predictions[idx].tolist(), class_encoding.word_ids(idx), offset_mapping[idx].tolist()):
    #         if word_id is not None and offset[0] == 0:
                
    #             in_predicted_blocks = [path_contains(block_xpath.split('/'), xpaths[0][word_id].split('/')) for block_xpath in pred_block_prefix]

    #             if pred_id != 0 and any(in_predicted_blocks):
    #                 try:
    #                     element = tree.xpath(xpaths[0][word_id])[0]
    #                     element.set("style", f"border:dashed; border-color: {colors[pred_id]}")
                        
    #                     statistic[pred_id].append(xpaths[0][word_id].split('/')[-4:])
    #                 except Exception:
    #                     pass
    
    
    
    
    
    path = os.path.abspath('temp.html')
    url = 'file://' + path


    with open(path, 'w') as f:
        # print(etree.tostring(tree, pretty_print=True).decode("utf-8"))
        f.write(lxml.html.tostring(tree, pretty_print=True, encoding='unicode', doctype="<!DOCTYPE html>"))
        # f.write(html)
    webbrowser.open(url)



https://life.ru/s/novosti
['/html/body/div[2]/div/div[3]/div[2]/div[2]/div/div[1]/div/div[4]/div/div/a[1]/div[2]/div[1]/ul/li[1]', '/html/body/div[2]/div/div[3]/div[2]/div[2]/div/div[1]/div/div[4]/div/div/a[1]/div[2]/div[1]/ul/li[2]', '/html/body/div[2]/div/div[3]/div[2]/div[2]/div/div[1]/div/div[4]/div/div/a[2]/div[2]/div[1]/ul/li[1]', '/html/body/div[2]/div/div[3]/div[2]/div[2]/div/div[1]/div/div[4]/div/div/a[2]/div[2]/div[1]/ul/li[2]', '/html/body/div[2]/div/div[3]/div[2]/div[2]/div/div[1]/div/div[4]/div/div/a[2]/div[2]/div[1]/ul/li[3]', '/html/body/div[2]/div/div[3]/div[2]/div[2]/div/div[1]/div/div[4]/div/div/a[3]/div[2]/div[1]/ul/li[1]', '/html/body/div[2]/div/div[3]/div[2]/div[2]/div/div[1]/div/div[4]/div/div/a[3]/div[2]/div[1]/ul/li[2]', '/html/body/div[2]/div/div[3]/div[2]/div[2]/div/div[1]/div/div[4]/div/div/a[3]/div[2]/div[1]/ul/li[3]', '/html/body/div[2]/div/div[3]/div[2]/div[2]/div/div[1]/div/div[4]/div/div/a[3]/div[2]/div[1]/ul/li[4]', '/html/body/div[2]/div/div[3]/div[2]/

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Opening in existing browser session.



ValueError: unknown url type: ''

: 