In [23]:
import re
from typing import List, Tuple
from bs4 import BeautifulSoup


class HTMLCleanser:
    def __init__(self, valid_tags: List[str] = None, valid_attrs: List[str] = None):
        self.__valid_tags = DEFAULT_VALID_TAGS if valid_tags is None else valid_tags
        self.__valid_attrs = DEFAULT_VALID_ATTRS if valid_attrs is None else valid_attrs
        print(f"""valid_tags: {self.__valid_tags}\nvalid_attrs: {self.__valid_attrs}""")

    @property
    def valid_tags(self):
        return self.__valid_tags

    @property
    def valid_attrs(self):
        return self.__valid_attrs

    @valid_tags.setter
    def valid_tags(self, valid_tags):
        self.__valid_tags = valid_tags

    @valid_attrs.setter
    def valid_attrs(self, valid_attrs):
        self.__valid_attrs = valid_attrs

    def add_valid_tags(self, tags: List[str]):
        tag_sets = set(self.valid_tags)
        tag_sets.update(tags)
        self.valid_tags = list(tag_sets)
        return self.valid_tags

    def add_valid_attrs(self, attrs: List[str]):
        attr_sets = set(self.valid_attrs)
        attr_sets.update(attrs)
        self.valid_attrs = list(attr_sets)
        return self.valid_attrs

    def remove_valid_tags(self, tags: List[str]):
        self.valid_tags = [tag for tag in self.valid_tags if tag not in tags]
        return self.valid_tags

    def remove_valid_attrs(self, attrs: List[str]):
        self.valid_attrs = [attr for attr in self.valid_attrs if attr not in attrs]
        return self.valid_attrs

    def _get_invalid_tags_attrs(self, soup: BeautifulSoup) -> Tuple[list, list]:
        tags, attrs = set(), set()
        for tag in soup.find_all():
            tags.add(tag.name)
            attrs.update(list(tag.attrs.keys()))

        invalid_tags = [tag for tag in tags if tag not in self.valid_tags]
        invalid_attrs = [attr for attr in attrs if attr not in self.valid_attrs]
        return (invalid_tags, invalid_attrs)

    def _cleanse_soup_tags(self, doc: str) -> str:
        pat = re.compile(r"<.*html>|<*.body>")
        doc = re.sub(pat, "", doc)
        return doc.strip()

    def cleanse_html(self, soup: BeautifulSoup) -> str:
        """Cleanse out the invalid_tags and invalid_attrs inside the html (BeautifulSoup object)."""

        invalid_tags, invalid_attrs = self._get_invalid_tags_attrs(soup)

        # remove invalid_tags
        for tag in invalid_tags:
            for match in soup.findAll(tag):
                match.unwrap()

        # remove invalid_attrs
        for tag in soup():
            for attr in invalid_attrs:
                del tag[attr]

        # remove empty tags
        for tag in soup.find_all():
            # As img tag generally has no content in it
            if tag.name != "img" and len(tag.get_text(strip=True)) == 0:
                tag.extract()

        doc = soup.prettify()
        doc = self._cleanse_soup_tags(doc)

        return doc


DEFAULT_VALID_TAGS = ["table", "tr", "td", "th"]
DEFAULT_VALID_ATTRS = ["rowspan", "colspan"]

txt = '안녕하세요\n<table>\n   <tr>\n    <td>\n     [공동운항편 예약시 대고객 안내 문구]\n√ XXXX - 운항사\n     <table>\n      <tr>\n       <td>\n        1) 대한항공 항공기가 아닌 XXXX로 운항하는 공동운항편 입니다. (아래 가. 항 참조) 2) 공동운항편의 운임은 XXXX에서 구입 시의 운임과 다를 수 있습니다. 3) 공동운항편의 탑승수속은 실제 운항하는 항공사의 터미널과 탑승수속 카운터를 이용하셔야 합니다. 탑승수속 마감시간은\n운항사 규정에 따라 다를 수 있으니 반드시 운항사로 확인하여 주시기 바랍니다. (아래 나. 항 참조) 4) 공동운항편의 제반 서비스는 항공사별로 상이하며 유/무료 사전좌석배정, 아기바구니, 특별기내식, 좌석승급, 스카이패스\n 우수회원 혜택, 웹/모바일/키오스크 체크인, 휠체어, 반려동물 동반 등의 서비스가 제공되지 않을 수 있습니다.\n필요한 부가서비스가 있으신 고객께서는 예약 및 발권을 진행하시기 전에 제공 가능 여부를 확인하여 주시기 바랍니다. (아래 다.항 참조) 5) 운항사의 규정에 따라 만15세 미만의 승객이 성인 보호자 없이 여행하거나, 동반하는 보호자가 만 18세 미만인 경우는\n 탑승이 제한될 수 있으니 운항사로 확인하여 주시기 바랍니다. (아래 다.항 참조) 6) 수하물은 공동운항 협정에 따라 운항사 또는 판매사의 규정이 적용되므로 사전에 확인하여 주시기 바랍니다. (아래 다.항 참조)\n       </td>\n      </tr>\n     </table>\n    </td>\n   </tr>\n   <tr>\n    <td>\n     실제 운항사 고지\n    </td>\n   </tr>\n   <tr>\n    <td>\n     ○ 고객이 탑승하게 될 실제 항공사명 안내\n○ 미국 교통부 (U.S DOT) 규정에 따라 공동운항편 예약시 운항사 고지 의무가 있으며, 운항사의 관계사(DBA:Doing Business As)가 실제 운항편인 경우\n이에 대한 안내도 해야하므로, 편명 하단에 "DBA" 가 표시된 구간 예약시 실제 운항편으로 안내 (관련 화면 보기 ☞ Click Here)\n     <table>\n      <tr>\n       <td>\n        운항사\n       </td>\n       <td>\n        대상편 표준안내문구\n       </td>\n      </tr>\n      <tr>\n       <td>\n        AS\n       </td>\n       <td>\n        AS(ALSKA AIRLINES) 관계사인 QX(Horizon Air) 항공기로 운항하는 공동운항편입니다.\n       </td>\n      </tr>\n      <tr>\n       <td>\n        EY\n       </td>\n       <td>\n        EY(ETIHAD AIRWAYS) 관계사인 9W(Jet Airways) 외장을 가진 항공기로 운항하는 공동운항편입니다.\n       </td>\n      </tr>\n      <tr>\n       <td>\n        KL\n       </td>\n       <td>\n        KL(KLM-ROYAL DUTCH AIRLINES) 관계사인 WA(KLM Cityhopper) 항공기로 운항하는 공동운항편입니다.\n       </td>\n      </tr>\n      <tr>\n       <td>\n        LA\n       </td>\n       <td>\n        LA(LAN AIRLINES) 관계사인 LP(Lan Peru) 항공기로 운항하는 공동운항편입니다.\n       </td>\n      </tr>\n      <tr>\n       <td>\n        AR\n       </td>\n       <td>\n        AR(AEROLINEAS ARGENTINAS) 관계사인 AU (Austral Lineas Aereas) 항공기로 운항하는 공동운항편입니다.\n       </td>\n      </tr>\n     </table>\n     ○ 실제 운항 기재, 승무원 등 운송 및 운항 관련 서비스는 실제 운항사 기준으로 적용됨을 안내\n    </td>\n   </tr>\n   <tr>\n    <td>\n     탑승터미널 위치, 탑승수속 마감시간 확인\n    </td>\n   </tr>\n   <tr>\n    <td>\n     ○ 탑승 절차 관련하여 탑승 터미널 위치, 탑승 수속 마감시간 및 운항사 카운터 이용 안내\n - 탑승 터미널 위치\n: 스케줄 조회화면 또는 편명 더블클릭하여 Flight Schedule 팝업 내 Description 에서 확인 가능\n- 탑승 수속 마감시간 : 편명 더블클릭하여 Flight Schedule 팝업 내 Description에서 확인 또는 좌석 확보시 팝업에서\n " CHKIN CLOSE TIME CHECK WITH OPERATING CARR " 문구 확인됨\n[스케줄 조회 화면]\n [좌석 확보시 팝업]\n [Flight Schedule 팝업]\n    </td>\n   </tr>\n   <tr>\n    <td>\n     Special Service 관련\n    </td>\n   </tr>\n   <tr>\n    <td>\n     ○ KE Marketing 승객에게 제공되는 Special Meal 및 Special Service는 기본적으로 운항사의 제공 기준에 따르므로,\n파트너사별 서비스 제공 여부 및 신청 가능 시점이 상이함\n따라서, Special Meal 및 Special Service 신청시, 서비스 제공 가능 여부 및 신청 가능시점 등은\n반드시 Checklist - Checklist Information - Codeshare INFO 확인 또는 [Tools] - [Help&amp;Information] - Topic에서 CODESHARE INFO 선택 - 항공사 조회하여 안내\n√ FMLY CARE SVC, FRES 등 KE 고유 서비스 신청 불가\n√ CHML 신청시 메뉴 선택 불가\n√ 공동운항편은 UM 신청 불가, 운항사의 UM 적용 연령은 상이할수 있으므로 미성년자 예약 시 반드시 확인 요망\n - 공동운항편에 미성년자 예약시 유의사항 ☞ Click Here\n☞ 상기와 같은 사유로 고객의 요청 서비스를 제공할수 없는 경우, 정중히 안내 후 대체 가능한 운항편으로 유도\n○ 공동운항 수하물은 협정에 따라 운항사 또는 판매사 규정이 적용 됨으로 Codeshare Info 내 항공사를 조회하여 안내\n(단, 수하물 협약사항에는 FRA 및 EBC 규정만 있으므로 협약사항에 없는 스포츠 수하물/악기 등과 같은 특수 수하물은 운항사의 규정을 따름)\n    </td>\n   </tr>\n   <tr>\n    <td>\n     Excess Baggage Auth 적용 관련\n    </td>\n   </tr>\n   <tr>\n    <td>\n     ○ KE ON-LINE FLT에서 KE Marketing 공동운항편으로 변경 시 Excess Baggage Auth가 있는 경우 혜택 제공 불가\n단, KE/OK 공동운항편의 경우 Excess Baggage Auth 적용 가능하므로 PNR 내 해당 Auth 확인 요망(Click)\n    </td>\n   </tr>\n  </table>\n  <table><tr>안녕하세요</tr><tr><img src="favicon144.png" alt="visit the MDN site" />Is it True?</tr></table>'
soup = BeautifulSoup(txt)

# usage of cleanser
cleanser = HTMLCleanser()
print(cleanser.add_valid_tags(["p", "img", "a", "span"]))
print(cleanser.remove_valid_tags(["span"]))
print(cleanser.add_valid_attrs(["href", "src", "alt", "font"]))
print(cleanser.remove_valid_attrs(["font"]))

txt = cleanser.cleanse_html(soup)

valid_tags: ['table', 'tr', 'td', 'th']
valid_attrs: ['rowspan', 'colspan']
['a', 'table', 'span', 'tr', 'p', 'td', 'th', 'img']
['a', 'table', 'tr', 'p', 'td', 'th', 'img']
['colspan', 'rowspan', 'font', 'alt', 'href', 'src']
['colspan', 'rowspan', 'alt', 'href', 'src']


In [25]:
from __future__ import annotations
import re
from collections import defaultdict
from typing import Tuple, Literal, Optional, List, Any
from dataclasses import dataclass, field
from uuid import uuid4


@dataclass
class Tag:
    indice: Tuple[int, int]
    """Indice tuple of start_idx and end_idx.
    start_idx is the first index of the target tag ('<' of '<table>'),
    end_idx is the last index of the target tag ('>' of '<table>')."""
    name: str
    """Matched part. (e.g., <table>, </p>)"""
    metadata: dict = field(default_factory=dict)
    "Arbitrary metadata about the match (e.g., source, attributes, relationships to other tags, etc.)"


@dataclass
class Node:
    indice: Tuple[int, int]
    """Indice tuple of start_idx and end_idx.
    start_idx is the first index of the target tag ('<' of '<table>'),
    end_idx is the last index of the closed target tag ('<' of '</table>')."""
    name: str
    """Name of tag. e.g., <table>, </p>"""
    child: List[Tag] = field(default_factory=list)
    """List of child tags of the tag."""
    parent: List[Tag] = field(default_factory=list)
    """List of parent tags of the tag."""
    metadata: dict = field(default_factory=dict)
    "Arbitrary metadata about the tag (e.g., source, attributes, relationships to other tag, etc.)"


@dataclass
class Chunk:
    indice: Tuple[int, int]
    """Indice tuple of chunk, which is one of the divided parts of the document."""
    content: str
    """String content between the indice."""
    metadata: dict = field(default_factory=dict)
    """Arbitrary metadata about the chunk (e.g., source, attributes, relationships to other chunk, etc.)"""


def find_nodes(txt, tag):
    tag_counter = defaultdict(int)
    nodes = []

    pat = re.compile(rf"<.*{tag}>")
    match = pat.search(txt)

    tags = []
    while match:
        # get matched_tag. e.g. <table>, </table>
        matched_tag = match.group(0)
        # count depth of matched_tag to do pairing later
        tag_counter[matched_tag] += 1
        depth = tag_counter[matched_tag]
        tags += [Tag(indice=match.span(), name=matched_tag, metadata={"depth": depth})]
        # get pair_tag. e.g, <table> -> </table>
        pair_matched_tag = (
            rf"</{tag}>" if r"/" not in matched_tag else matched_tag.replace(r"/", "")
        )

        # recognize as Node if depth tags with each other
        if depth == tag_counter[pair_matched_tag]:
            # pairing tags according to the matching depth
            tags = sorted(tags, key=lambda x: x.metadata["depth"])
            start, end = tags[0], tags[-1]
            indice = start.indice + end.indice
            start_idx, end_idx = min(indice), max(indice)
            tag_name = start.name
            id = str(uuid4())
            nodes += [
                Node(indice=(start_idx, end_idx), name=tag_name, metadata={"id": id})
            ]
            # reset after chunking done
            tags = []
            tag_counter[matched_tag], tag_counter[pair_matched_tag] = 0, 0
        match = pat.search(txt, match.start() + 1)

    return nodes


def add_relationship_to_node(node, nodes) -> None:
    start, end = node.indice

    parent_cands = []
    _nodes = [_node for _node in nodes if _node != node and _node.name != node.name]
    for _node in _nodes:
        _start, _end = _node.indice
        if _start < start and _end > end:
            parent_cands += [_node]

    # get neareast parent
    if len(parent_cands) > 0:
        parent = parent_cands[0]
        min_gap = abs(parent.indice[0] - start)
        for cand in parent_cands:
            _gap = abs(cand.indice[0] - start)
            if _gap < min_gap:
                parent = cand
                min_gap = _gap

        parent.child += [node.metadata["id"]]
        node.parent += [parent.metadata["id"]]


def get_node_from_id(id, nodes):
    for node in nodes:
        if id == node.metadata["id"]:
            return node


def update_tags(nodes):
    for node in nodes:
        childs = [get_node_from_id(id, nodes) for id in node.child]
        parents = [get_node_from_id(id, nodes) for id in node.parent]
        childs = sorted(childs, key=lambda x: x.indice[0], reverse=False)
        parents = sorted(parents, key=lambda x: x.indice[0], reverse=False)

        node.child = childs
        node.parent = parents


def get_no_parent_nodes(tags):
    _tags = [_tag for _tag in tags if len(_tag.parent) == 0]
    _tags = sorted(_tags, key=lambda x: x.indice[0], reverse=False)
    return _tags


def get_chunks(txt, nodes):
    start = 0
    chunks = []
    for node in nodes:
        _start, _end = node.indice
        if start != _start:
            chunks += [Chunk(indice=(start, _start), content=txt[start:_start])]
        chunks += [Chunk(indice=(_start, _end), content=txt[_start:_end])]
        start = _end
    return chunks


nodes = []
for html_tag in ["table", "tr", "td", "p", "img"]:
    nodes.extend(find_nodes(txt, html_tag))

for node in nodes:
    add_relationship_to_node(node, nodes)
update_tags(nodes)
nodes = get_no_parent_nodes(nodes)
chunks = get_chunks(txt, nodes)

In [26]:
chunks

[Chunk(indice=(0, 15), content='<p>\n 안녕하세요\n</p>', metadata={}),
 Chunk(indice=(15, 16), content='\n', metadata={}),
 Chunk(indice=(16, 3342), content='<table>\n <tr>\n  <td>\n   [공동운항편 예약시 대고객 안내 문구]\n√ XXXX - 운항사\n   <table>\n    <tr>\n     <td>\n      1) 대한항공 항공기가 아닌 XXXX로 운항하는 공동운항편 입니다. (아래 가. 항 참조) 2) 공동운항편의 운임은 XXXX에서 구입 시의 운임과 다를 수 있습니다. 3) 공동운항편의 탑승수속은 실제 운항하는 항공사의 터미널과 탑승수속 카운터를 이용하셔야 합니다. 탑승수속 마감시간은\n운항사 규정에 따라 다를 수 있으니 반드시 운항사로 확인하여 주시기 바랍니다. (아래 나. 항 참조) 4) 공동운항편의 제반 서비스는 항공사별로 상이하며 유/무료 사전좌석배정, 아기바구니, 특별기내식, 좌석승급, 스카이패스\n 우수회원 혜택, 웹/모바일/키오스크 체크인, 휠체어, 반려동물 동반 등의 서비스가 제공되지 않을 수 있습니다.\n필요한 부가서비스가 있으신 고객께서는 예약 및 발권을 진행하시기 전에 제공 가능 여부를 확인하여 주시기 바랍니다. (아래 다.항 참조) 5) 운항사의 규정에 따라 만15세 미만의 승객이 성인 보호자 없이 여행하거나, 동반하는 보호자가 만 18세 미만인 경우는\n 탑승이 제한될 수 있으니 운항사로 확인하여 주시기 바랍니다. (아래 다.항 참조) 6) 수하물은 공동운항 협정에 따라 운항사 또는 판매사의 규정이 적용되므로 사전에 확인하여 주시기 바랍니다. (아래 다.항 참조)\n     </td>\n    </tr>\n   </table>\n  </td>\n </tr>\n <tr>\n  <td>\n   실제 운항사 고지\n  </td>\n </tr>\n <tr>\n  <td>\n   ○ 고객이 탑승하게

In [27]:
assert "".join([chunk.content for chunk in chunks]) == txt