In [None]:
# %%
from pathlib import Path
from bs4 import BeautifulSoup, SoupStrainer
from bs4.diagnose import diagnose
import cchardet
import numpy as np
import re
import pandas as pd
from os import cpu_count
# Inspiration: https://github.com/honnibal/spacy-ray/pull/
# 1/files#diff-7ede881ddc3e8456b320afb958362b2aR12-R45
from asyncio import Event
from typing import Tuple
from time import sleep
from typing import Union

import ray
# For typing purposes
from ray.actor import ActorHandle
from tqdm import tqdm
import traceback

# get SEC_HEADER

In [None]:
@ray.remote
class preprocessor:
    def __init__(self, raw_text: Union[str, bytes]):
        self.raw:str = self._decode_text(raw_text)
        self.state = {'valid_header': True, 'valid_text': True, 'well_processed': True}
        self.what_document_is = 'sec-document'
        self.header, self.text = self._sectioning(self.raw)
        self.markdown = self._determine_mdtype()
        self.meta = self.metadata()
        self.text = self.process()

    def _decode_text(self, txt: Union[str, bytes]):
        if isinstance(txt, str):
            return txt
        try:
            dec = str(txt, encoding='utf-8')
            # print('utf-8')
        except UnicodeDecodeError:
            try:
                dec = str(txt, encoding='cp949')
                print('cp949')
            except UnicodeDecodeError:
                try:
                    dec = str(txt, encoding='euc-kr')
                    print('euc-kr')
                except UnicodeDecodeError:
                    dec = str(txt, encoding='utf-8', errors='ignore')
                    print('utf-8 with ignore')
        return dec

    def _sectioning(self, raw_text):
        pattern_header = re.compile(r'<sec-header>.*</sec-header>', flags=re.I | re.S)
        pattern_text = re.compile(r'<text>.*</text>', flags=re.I | re.S)
        try:
            header = pattern_header.search(raw_text).group()
        except:
            # 기록 초기 ims-header 분류
            try:
                ims_header = re.compile(r'<ims-header>.*</ims-header>', flags=re.I | re.S)
                header = ims_header.search(raw_text).group()
                self.what_document_is = 'ims-document'
                print('something is wrong')
            except:
                header = None
                self.what_document_is = 'unknown'
                self.state['valid_header'] = False
        try:
            text = pattern_text.search(raw_text).group()
        except:
            text = None
            self.state['valid_text'] = False
        return header, text

    def metadata(self):
        meta = {}
        keys = ('ACCESSIONNUMBER', 'CONFORMEDSUBMISSIONTYPE', 'FILEDASOFDATE', 'COMPANYCONFORMEDNAME', 'CENTRALINDEXKEY', 'IRSNUMBER')
        keys_consice =('acc', 'type', 'date', 'coname', 'cik', 'irs')
        space = re.compile('\s+')
        if self.state['valid_header']:
            for line in self.header.splitlines():
                line = space.sub('', line)
                for key in keys:
                    if key in line:
                        meta[key] = line.split(':')[1]
            # 다중 filer 고려 필요
            meta = {keys_consice[keys.index(k)]:v for k, v in meta.items()}
            meta['markdown'] = self.markdown
            meta = {**meta, **self.state, **{'document_type': self.what_document_is}}
        return meta

    def _determine_mdtype(self):
        """ determine markdown type of self.text """
        # 1. check if it is html or xbrl
        # 2. if not, check if it is custom xml
        # 3. if not, consider it as markdown or plain text
        text = self.text
        if text:
            html_pattern = re.compile(r'<html.*<body.*</body.*</html', flags=re.I | re.S)
            #regex english upper case
            xml_pattern = re.compile(r'<[A-Z]+>', flags=re.I)
            if html_pattern.search(text):
                return 'html'
            if len(xml_pattern.findall(text)) > 4:
                return 'xml'
            return 'plain'
        return 'unknown'

    def _process_html(self):
        html_pattern = re.compile(r'<html.*<body.*</body.*</html', flags=re.I | re.S)
        html = html_pattern.search(self.text).group()
        html = BeautifulSoup(html, 'lxml')
        # 아무래도 item1보다 더 강건한 구분자가 필요함
        # 밑의 코드는 Table of Contents(Index)에서 처음으로 나타나는 Part를 포함한 <a href> 태그를 찾아 그 text를 기반으로 part 1...을 찾음
        try:
            part = html.find('a', text=re.compile('.*PART\s*(I|1).*', re.I|re.S))
            part = part.text.strip().replace(' ', '\s*') + '.*'
            # print(part)
            part = html.find('div', recursive=True, text=re.compile(part, re.I |re.S))
            html = part.find_all_next()
        except:
            # print (traceback.format_exc())
            try:
                part, *_ = html.select('table a')[0]
                part = part.find_parent('table')
                # print('table found', part)
                html = part.find_all_next()
            except:
                try:
                    part = html.find_all('table')
                    part, *_ = (t for t in part if len(t.find_all('tr')) > 5)
                    # print('table found', part)
                    html = part.find_all_next()
                except:
                    self.state["well_processed"] = False
                    html = html.find_all()
                    pass

        # def check_and_decompose(tag: BeautifulSoup, attr, check_function: function):
        #     for i in tag.find_all():
        #         if getattr(i, attr):
        #             if i.has_attr(attr):
        #                 if check_function(i[attr]):
        #                     tag.decompose()
        blacklist = ['\&npsp\;', '\s+', '\_+', 'table\s*of\s*contents', '.*text-align:\s*center.*']
        center = re.compile(r'.*TEXT-ALIGN\s*:\s*center.*', flags=re.I)
        for i in html:
            i: BeautifulSoup    
            if i.name == 'table':
                if len(i.find_all('tr')) > 1:
                    i.decompose()
        for i in html:
            if i.attrs:
                if i.has_attr('style'):
                    if center.search(i['style']):
                        i.decompose()
        for i in html:
            if i.name == 'div':
                if i.has_attr('align'):
                    if i['align'] == 'center':
                        i.decompose()
        for i in html:
            if i.text:
                if any(re.fullmatch(x, i.text, flags=re.I) for x in blacklist):
                    i.decompose()
        for i in html:
            if not i.text.strip():
                i.decompose()

        # get all text from html
        text = '\n'.join([i.text for i in html])
        # remove all js special characters
        text = re.sub('\&\w+\;', '', text)
        return text
        
    def _process_xml(self):
        def _get_xml_schema(self, xml: str):
            # get all tag names using in the xml
            # return a list of tag names
            soup = BeautifulSoup(xml, 'xml')
            tags = soup.find_all()
            tags = [x.name for x in tags]
            tags = set(tags)
            return tags
        soup = BeautifulSoup(self.text, 'xml')
        """
        to return without <TABLE> tags, 
        need a schema to determine which tag is representing table data, which tag is  closing <TABLE>... etc
        mostly global closing tag is </C>, but not always.
        """
        # for i in soup.find_all(name=re.compile('table', re.I)):
        #     print(i)
        #     i.decompose()
        return soup.get_text()

            
    def _process_plain(self):
        # go ocr...
        # output may contains some unremoved tables, figures, markdown syntax, etc.
        return self.text
    
    def process(self):
        if self.markdown == 'html': 
            return self._process_html()
        if self.markdown == 'xml':
            return self._process_xml()
        return self._process_plain()

    def result(self):
        # return a dataframe of metadata plus self.text
        result = pd.DataFrame(self.meta, index=[0])
        result['text'] = self.text.encode('utf-8')
        return result

In [None]:
if __name__ == '__main__':
    my_path = Path.cwd() / 'S&P 500'
    parquets = (pd.read_parquet(x, engine='pyarrow') for x in my_path.iterdir() if x.is_file())
    for idx, i in enumerate(parquets):
        if idx == 1: 
            break
        a = i['txt'].values
        for j in a:
            file = preprocessor(j)
            print(file.meta, f'length is {len(file.text)}' if len(file.text) > 300000 or len(file.text) < 3000 else '')
            # print(file.text)

In [None]:
import psutil, gc

def auto_garbage_collect(pct=80.0):
    if psutil.virtual_memory().percent >= pct:
        gc.collect()

In [None]:

if __name__ == '__main__':
    my_path = Path.cwd() / 'S&P 500' / '301-497'
    parquets = (pd.read_parquet(x, engine='pyarrow') for x in my_path.iterdir() if x.is_file())
    name_list = list(my_path.iterdir())
    print(next(parquets), type(next(parquets)))
    
    try:
        ray.init()
    except RuntimeError:
        ray.shutdown()
        ray.init()
    num_ticks = 1000
    # pb = ProgressBar(num_ticks)
    # actor = pb.actor
    # You can replace this with any arbitrary Ray task/actor.
    for idx, i in enumerate(parquets):
        tasks_pre_launch = [preprocessor.remote(x) for x in i['txt'].values]
        tasks_pre_launch = [task.result.remote() for task in tasks_pre_launch]
        tasks = ray.get(tasks_pre_launch)
        pd.concat(tasks).to_parquet(f'{name_list[idx].name.split(".")[0]}_cleared.parquet', engine='pyarrow')
        auto_garbage_collect()


In [None]:
@ray.remote
class ProgressBarActor:
    counter: int
    delta: int
    event: Event

    def __init__(self) -> None:
        self.counter = 0
        self.delta = 0
        self.event = Event()

    def update(self, num_items_completed: int) -> None:
        """Updates the ProgressBar with the incremental
        number of items that were just completed.
        """
        self.counter += num_items_completed
        self.delta += num_items_completed
        self.event.set()

    async def wait_for_update(self) -> Tuple[int, int]:
        """Blocking call.
        Waits until somebody calls `update`, then returns a tuple of
        the number of updates since the last call to
        `wait_for_update`, and the total number of completed items.
        """
        await self.event.wait()
        self.event.clear()
        saved_delta = self.delta
        self.delta = 0
        return saved_delta, self.counter

    def get_counter(self) -> int:
        """
        Returns the total number of complete items.
        """
        return self.counter

In [None]:
# Back on the local node, once you launch your remote Ray tasks, call
# `print_until_done`, which will feed everything back into a `tqdm` counter.


class ProgressBar:
    progress_actor: ActorHandle
    total: int
    description: str
    pbar: tqdm

    def __init__(self, total: int, description: str = ""):
        # Ray actors don't seem to play nice with mypy, generating
        # a spurious warning for the following line,
        # which we need to suppress. The code is fine.
        self.progress_actor = ProgressBarActor.remote()  # type: ignore
        self.total = total
        self.description = description

    @property
    def actor(self) -> ActorHandle:
        """Returns a reference to the remote `ProgressBarActor`.

        When you complete tasks, call `update` on the actor.
        """
        return self.progress_actor

    def print_until_done(self) -> None:
        """Blocking call.

        Do this after starting a series of remote Ray tasks, to which you've
        passed the actor handle. Each of them calls `update` on the actor.
        When the progress meter reaches 100%, this method returns.
        """
        pbar = tqdm(desc=self.description, total=self.total)
        while True:
            delta, counter = ray.get(self.actor.wait_for_update.remote())
            pbar.update(delta)
            if counter >= self.total:
                pbar.close()
                return

In [None]:
if __name__ == '__main__':
    my_path = Path.cwd() / 'S&P 500'
    parquets = (pd.read_parquet(x, engine='pyarrow') for x in my_path.iterdir() if x.is_file())
    print(next(parquets), type(next(parquets)))
    
    try:
        ray.init()
    except RuntimeError:
        ray.shutdown()
        ray.init()
    num_ticks = 1000
    pb = ProgressBar(num_ticks)
    actor = pb.actor
    # You can replace this with any arbitrary Ray task/actor.
    for idx, i in enumerate(parquets):
        actor.update.remote(1)
        tasks_pre_launch = [preprocessor.remote(x) for x in i['txt'].values]
        tasks_pre_launch = [task.result.remote() for task in tasks_pre_launch]
        pb.print_until_done()
        tasks = ray.get(tasks_pre_launch)
        tasks == list(range(num_ticks))
        num_ticks == ray.get(actor.get_counter.remote())
        result = pd.concat(tasks)
        result.to_parquet(f'result_{idx}.parquet', engine='pyarrow')
        if idx > 9:
            break



# get Inner Text


In [None]:
blacklist = ['\&npsp\;', '\s+', '\_+', 'table\s*of\s*contents', '.*text-align:\s*center.*']
text = BeautifulSoup(tt, 'lxml')
center = re.compile(r'.*TEXT-ALIGN\s*:\s*center.*', flags=re.I)
for i in text.find_all():
    if i.name == 'table':
        if len(i.find_all('tr')) > 1:
            i.decompose()
        else:
            text = i.text
            # replace table as <table>text</table>
            # p = <p>text</p>
            # i.replace_with(p)

            i.replace_with(text)


for i in text.find_all():
    i: BeautifulSoup
    # get style
    if i.attrs:
        if i.has_attr('style'):
            if center.search(i['style']):
                i.decompose()

# get <div align="center"><font size="2">...</font></div>
for i in text.find_all():
    if i.name == 'div':
        if i.has_attr('align'):
            if i['align'] == 'center':
                i.decompose()

# remove if text is not empty or not in blacklist
for i in text.find_all():
    if i.text:
        if any(re.fullmatch(x, i.text, flags=re.I) for x in blacklist):
            i.decompose()

for i in text:
    if not i.text.strip():
        i.decompose()
# remove all empty tags

# center = [x.text.strip() for x in center]
# unique = set(center)
# print(len(unique))
# for i in sorted(unique):
#     print(i)

aa = text.get_text()
item1 = re.compile(r'item\s*1', flags=re.I)
pos = item1.search(aa).span()
aa = aa[pos[0]:]
print(aa)


In [None]:


del_elements = [f'.//table', f'//*[@id="DSPFPageNumber"]', f'//*[@id="PGBRK"]', "//@style='text-align:center'", f'.//head', "//div[@style='display:none']"]
stop_words = ['☐','☒']


# delete all attributes
for i in text.find_all():
    i.attrs = {}

whitelist = ['html', 'head', 'body', 'p', 'div', 'span', 'a', 'img', 'nav', 'ul', 'li', 'ol']
# delete all tags not Regular HTML
for i in text.find_all():
    if i.name not in whitelist:
        i.decompose()

# delete all meaningless tags
for i in text.find_all():
    if not i.text:
        i.decompose()
    if not i.text.stirp():
        i.decompose()

# delete attributes
for i in text.find_all():
    i.attrs = {}

# delete tags with text == 'table of contents'
for i in text.find_all():
    if i.text.lower() == 'Table of Contents'.lower():
        i.decompose()
        
# delete overlapping tags
for i in text.find_all():
    if i.find_all():
        for j in i.find_all():
            j.decompose()

# find all unique tag name
tags = [x.name for x in html.find_all()]
tags = set(tags)
tags

# find regular html tags in name
whitelist = ['html', 'head', 'body', 'p', 'div', 'span', 'a', 'b', 'img', 'nav', 'ul', 'li', 'ol']
