In [19]:
from pathlib import Path
from os import cpu_count
from asyncio import Event
from time import sleep
from typing import Union, Tuple
from traceback import format_exc

import numpy as np
import pandas as pd
import re
import cchardet
import ray
from ray.actor import ActorHandle
from tqdm import tqdm
from bs4 import BeautifulSoup, SoupStrainer
from bs4.diagnose import diagnose
# Inspiration: https://github.com/honnibal/spacy-ray/pull/
# 1/files#diff-7ede881ddc3e8456b320afb958362b2aR12-R45

In [26]:
def decode_text(txt: Union[str, bytes]) -> str:
    if isinstance(txt, str):
        return txt
    try:
        return txt.decode(cchardet.detect(txt)['encoding'])
    except UnicodeDecodeError:
        try:
            if result := txt.decode('utf-8', errors='ignore'):
                return result
        except:
            pass
        return None

def get_header(txt: str) -> str:        
    pattern_header = re.compile(r'<sec-header>.*</sec-header>', flags=re.I | re.S)
    try:
        header = pattern_header.search(txt).group()
    except:
        try:
            pattern_header = re.compile(r'<ims-header>.*</ims-header>', flags=re.I | re.S)
            header = pattern_header.search(txt).group()
        except:
            return None
    return header

def get_text(txt: str) -> str:
    pattern_text = re.compile(r'<text>.*</text>', flags=re.I | re.S)
    try:
        return pattern_text.search(txt).group()
    except:
        return None

def get_meta_from_header(header: str) -> dict:
    if not header:
        return None
    meta = {}
    keys = ('ACCESSIONNUMBER', 'CONFORMEDSUBMISSIONTYPE', 'FILEDASOFDATE', 'COMPANYCONFORMEDNAME', 'CENTRALINDEXKEY', 'IRSNUMBER')
    keys_consice =('acc', 'type', 'date', 'coname', 'cik', 'irs')
    space = re.compile('\s+')
    for line in header.splitlines():
        line = space.sub('', line)
        for key in keys:
            if key in line:
                meta[key] = line.split(':')[1]
    # 다중 filer 고려 필요
    meta = {keys_consice[keys.index(k)]:v for k, v in meta.items()}
    return meta

def determine_mdtype(document: str) -> str:
        if document:
            html_pattern = re.compile(r'<html.*<body.*</body.*</html', flags=re.I | re.S)
            xml_pattern = re.compile(r'<[a-z]+>', flags=re.I)
            if html_pattern.search(document):
                return 'html'
            if len(xml_pattern.findall(document)) > 4:
                return 'xml'
            return 'plain'
        return None

def process_html(text: str):
        try:
            html_pattern = re.compile(r'<html.*<body.*</body.*</html', flags=re.I | re.S)
            html = html_pattern.search(text).group()
            html = BeautifulSoup(html, 'lxml')
            # 아무래도 item1보다 더 강건한 구분자가 필요함
            # 밑의 코드는 Table of Contents(Index)에서 처음으로 나타나는 Part를 포함한 <a href> 태그를 찾아 그 text를 기반으로 part 1...을 찾음
            try:
                part = html.find('a', text=re.compile('.*PART\s*(I|1).*', re.I|re.S))
                part = part.text.strip().replace(' ', '\s*') + '.*'
                part = html.find('div', recursive=True, text=re.compile(part, re.I |re.S))
                html = part.find_all_next()
            except:
                print(format_exc())
                try:
                    part, *_ = html.select('table a')[0]
                    part = part.find_parent('table')
                    # print('table found', part)
                    html = part.find_all_next()
                except:
                    try:
                        part = html.find_all('table')
                        part, *_ = (t for t in part if len(t.find_all('tr')) > 5)
                        # print('table found', part)
                        html = part.find_all_next()
                    except:
                        html = html.find_all()
                        pass

            # def check_and_decompose(tag: BeautifulSoup, attr, check_function: function):
            #     for i in tag.find_all():
            #         if getattr(i, attr):
            #             if i.has_attr(attr):
            #                 if check_function(i[attr]):
            #                     tag.decompose()
            blacklist = ['\&npsp\;', '\s+', '\_+', 'table\s*of\s*contents', '.*text-align:\s*center.*']
            center = re.compile(r'.*TEXT-ALIGN\s*:\s*center.*', flags=re.I)
            for i in html:
                i: BeautifulSoup    
                if i.name == 'table':
                    if len(i.find_all('tr')) > 1:
                        i.decompose()
            for i in html:
                if i.attrs:
                    if i.has_attr('style'):
                        if center.search(i['style']):
                            i.decompose()
            for i in html:
                if i.name == 'div':
                    if i.has_attr('align'):
                        if i['align'] == 'center':
                            i.decompose()
            for i in html:
                if i.text:
                    if any(re.fullmatch(x, i.text, flags=re.I) for x in blacklist):
                        i.decompose()
            for i in html:
                if not i.text.strip():
                    i.decompose()

            # get all text from html
            text = '\n'.join([i.text for i in html])
            # remove all js special characters
            text = re.sub('\&\w+\;', '', text)
            return text
        except:
            return None
        
def process_xml(text: str):
    def _get_xml_schema(xml: str):
        # get all tag names using in the xml
        # return a list of tag names
        soup = BeautifulSoup(xml, 'xml')
        tags = soup.find_all()
        tags = [x.name for x in tags]
        tags = set(tags)
        return tags
    soup = BeautifulSoup(text, 'xml')
    """
    to return without <TABLE> tags, 
    need a schema to determine which tag is representing table data, which tag is  closing <TABLE>... etc
    mostly global closing tag is </C>, but not always.
    """
    return soup.get_text()

        
def process_plain(text: str):
    # go ocr...
    # output may contains some unremoved tables, figures, markdown syntax, etc.
    return text
    
def process(text: str, mdtype: str):
    if mdtype not in ['html', 'xml', 'plain']:
        raise ValueError('mdtype must be html, xml, or plain')
    if mdtype == 'html': 
        return process_html(text)
    if mdtype == 'xml': 
        return process_xml(text)
    if mdtype == 'plain': 
        return process_plain(text)
    return None

def run(parquet: Path) -> None:
    new_path = parquet.parent.parent / 'processed'
    new_path.mkdir(exist_ok=True)
    processed = new_path / ('_'.join([parquet.stem, 'processed.parquet']))
    df = pd.read_parquet(parquet)
    df.columns = ['raw_text']
    df['raw_text'] = df.raw_text.apply(decode_text)
    df[['acc', 'type', 'date', 'coname', 'cik', 'irs']] = pd.DataFrame(df.raw_text.apply(get_meta_from_header).tolist())
    df['preprocessed'] = df.date.apply(lambda x: int(x) > 20100000)
    df = df[df.preprocessed]
    df[['text', 'mdtype']] = None
    df.loc[df.preprocessed, 'text'] = df.loc[df.preprocessed, 'raw_text'].apply(get_text)
    df.loc[df.text.notnull(), 'mdtype'] = df.loc[df.text.notnull(), 'text'].apply(determine_mdtype)
    df.loc[df.mdtype.notnull(), 'text'] = df.loc[df.mdtype.notnull()].apply(lambda x: process(x.text, x.mdtype), axis=1)
    df.loc[df.text.notnull(), 'text'] = df.loc[df.text.notnull(), 'text'].apply(lambda x: x.replace('\n', ' ').replace('\r', ' '))
    df.loc[df.text.isnull(), 'preprocessed'] = False
    print(df)
    df.to_parquet(processed)
    return 

if __name__ == '__main__':
    parquet_list = (x for x in Path(r'C:\Users\wonhyeong\workings\data\S&P 500\301-497').iterdir() if x.suffix == '.parquet')
    for parquet in parquet_list:
        run(parquet)
        break

Traceback (most recent call last):
  File "C:\Users\wonhyeong\AppData\Local\Temp\ipykernel_7460\3240999587.py", line 72, in process_html
    part = part.text.strip().replace(' ', '\s*') + '.*'
AttributeError: 'NoneType' object has no attribute 'text'

Traceback (most recent call last):
  File "C:\Users\wonhyeong\AppData\Local\Temp\ipykernel_7460\3240999587.py", line 72, in process_html
    part = part.text.strip().replace(' ', '\s*') + '.*'
AttributeError: 'NoneType' object has no attribute 'text'

Traceback (most recent call last):
  File "C:\Users\wonhyeong\AppData\Local\Temp\ipykernel_7460\3240999587.py", line 72, in process_html
    part = part.text.strip().replace(' ', '\s*') + '.*'
AttributeError: 'NoneType' object has no attribute 'text'

Traceback (most recent call last):
  File "C:\Users\wonhyeong\AppData\Local\Temp\ipykernel_7460\3240999587.py", line 72, in process_html
    part = part.text.strip().replace(' ', '\s*') + '.*'
AttributeError: 'NoneType' object has no attribute 

In [None]:


del_elements = [f'.//table', f'//*[@id="DSPFPageNumber"]', f'//*[@id="PGBRK"]', "//@style='text-align:center'", f'.//head', "//div[@style='display:none']"]
stop_words = ['☐','☒']


# delete all attributes
for i in text.find_all():
    i.attrs = {}

whitelist = ['html', 'head', 'body', 'p', 'div', 'span', 'a', 'img', 'nav', 'ul', 'li', 'ol']
# delete all tags not Regular HTML
for i in text.find_all():
    if i.name not in whitelist:
        i.decompose()

# delete all meaningless tags
for i in text.find_all():
    if not i.text:
        i.decompose()
    if not i.text.stirp():
        i.decompose()

# delete attributes
for i in text.find_all():
    i.attrs = {}

# delete tags with text == 'table of contents'
for i in text.find_all():
    if i.text.lower() == 'Table of Contents'.lower():
        i.decompose()
        
# delete overlapping tags
for i in text.find_all():
    if i.find_all():
        for j in i.find_all():
            j.decompose()

# find all unique tag name
tags = [x.name for x in html.find_all()]
tags = set(tags)
tags

# find regular html tags in name
whitelist = ['html', 'head', 'body', 'p', 'div', 'span', 'a', 'b', 'img', 'nav', 'ul', 'li', 'ol']
