#### module imports

In [1]:
from typing import overload, Optional, Union, List, Dict

In [2]:
import asyncio
import time
import sys
import os
import json
import logging
from datetime import datetime

In [3]:
import aiohttp
from bs4 import BeautifulSoup

#### logging 

Use `A.B` logger inheritance to control logging level from one entry point

In [4]:
import logging

In [5]:
MAIN_FMT = '[{name}][{asctime} {msecs:0>7.3f}][{levelname}]-pid:{process} th_id:{thread}- {message!s}'
DATE_FMT = '%y/%m/%d %H:%M:%S'
LOGGER_NAME = 'cnR_async'
OUTPUT_PATH = 'cnReuters_output'

In [6]:
def get_main_logger(base_name: str = LOGGER_NAME, 
                    level: int = logging.DEBUG) -> logging.Logger:
    logger = logging.getLogger(base_name)
    logger.setLevel(level)
    # only add handlers if there is none, prevent duplicate outputs
    if not logger.hasHandlers():
        handler = logging.StreamHandler(stream=sys.stdout)
        handler.setFormatter(
            logging.Formatter(fmt=MAIN_FMT, datefmt=DATE_FMT, style='{')
        )
        handler.setLevel(level)
        logger.addHandler(handler)
    return logger


def get_logger(name: str, base_name: str = LOGGER_NAME) -> logging.Logger:
    logger = logging.getLogger(f'{base_name}.{name}')
    return logger

In [7]:
test_logger = get_main_logger('test', logging.DEBUG)

In [8]:
test_logger.debug('test')

[test][20/04/23 14:51:47 899.773][DEBUG]-pid:18068 th_id:15612- test


#### Utility functions and classes

In [9]:
def _replace_to_empty(s: str, *to_replace: str) -> str:
    """del multiple chars in a string"""
    for chars in to_replace:
        s = s.replace(chars, '')
    return s


def _replace(s: str) -> str:
    return _replace_to_empty(s, '\n', '\t')

In [10]:
def check_path(path: str, logger: logging.Logger) -> None:
    if path in os.listdir():
        logger.info(f'path {path} already exists')
    else:
        os.mkdir(os.path.join('.', path))
        logger.info(f"{os.path.join(os.path.abspath('.'), path)} created")

In [11]:
class SetLogger:
    
    logger: logging.Logger
        
    @classmethod
    def set_logger(cls, name: str) -> None:
        cls.logger = get_logger(name)

#### async_request with aiohttp

In [12]:
# headers constants
HEADERS = {
    'sec-fetch-dest': 'document',
    'sec-fetch-mode': 'navigate',
    'sec-fetch-site': 'cross-site',
    'sec-fetch-user': '?1',
    'upgrade-insecure-requests': '1',
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36'
}
BASE_URL = 'https://cn.reuters.com/news/archive/topic-cn-top-news?view=page&page={}&pageSize=10'

In [13]:
import aiohttp

Initiate an async request
* provide async session as param
* catch exceptions related to connection and request
* return text   

Delay should be realised in the worker function with async queue

In [14]:
@overload
async def async_get(url: str, 
                    msg: str, 
                    headers: dict, 
                    session: aiohttp.ClientSession, 
                    logger: logging.Logger) -> Optional[bytes]: ...


@overload
async def async_get(url: str, 
                    msg: str, 
                    headers: dict, 
                    session: aiohttp.ClientSession, 
                    logger: logging.Logger,
                    *,
                    text: bool) -> Optional[str]: ...


async def async_get(url: str, 
                    msg: str, 
                    headers: dict, 
                    session: aiohttp.ClientSession, 
                    logger: logging.Logger,
                    *,
                    text: Optional[bool] = None) -> Optional[Union[str, bytes]]:
    t1 = time.time()
    returned: Union[bytes, str]
    try:
        async with session.get(url, headers=headers) as resp:
            if not text:
                # binary mode
                returned = await resp.read()
            else:
                returned = await resp.text(encoding='utf-8')
            if resp.status >= 400:
                logger.warning(f'{msg} [{resp.status} {resp.reason} {time.time()-t1:.5f}s] ')
            else:
                logger.info(f'{msg} [{resp.status} {resp.reason} {time.time()-t1:.5f}s] ')
            return returned
    except aiohttp.ClientError as ce:
        logger.exception(ce)
        return None    

In [15]:
# test
_test_url = 'https://httpbin.org/status/404'
async with aiohttp.ClientSession() as s:
    _test_return = await async_get(_test_url, 'test p1', HEADERS, s, test_logger, text=True)



In [16]:
_test_return

''

#### Page/Entry object and Url construct

##### Entry

`Entry` represents each title in a page, includes the following:
* should have these attributes:
 * date, time (not necessary, only for latest news), title, content, tag    
 
Rules for keys:   
* if *time* exits, use today's date
* tag:
 * configurable classification processor
 * one and only one tag for each `Entry`
 * if cannot be classified, fill tag with `'untagged'`

Structure of a json configuration file:   
* tag_name: \[specific words to mark a tag\]
```
{
    tag_conf: [
        {tag_name_1: [word1, word2, ...]},
        ...
        {tag_name_n: [word1, word2, ...]}
    ]
}
```

In [17]:
class Entry(SetLogger):
    
    page: int
    index: int
    time: str
    date: str
    title: str
    content: str
    tag: str
    configure: dict = dict()
    
    def __init__(self, page: int, index: int, 
                 time_: str, date: str, title: str, content: str):
        self.page = page
        self.index = index
        self.time = time_
        self.date = date
        self.title = title
        self.content = content
        # default empty tag
        self.tag = ''
    
    def get_tag(self) -> None:
        tag_dict: Dict[str, List[str]]
        try:
            if self.configure:
                for tag_dict in self.configure['tag_conf']:
                    tag_name: str 
                    tag_words: List[str]
                    tag_name, tag_words = list(tag_dict.items())[0]
                    for word in tag_words:
                        if word in self.title:
                            self.tag = tag_name
                            break
                    if self.tag:
                        break
                else:
                    # if tag cannot be found, mark it as `untagged`
                    self.tag = 'untagged'
                self.logger.debug(f'Page {self.page} Entry {self.index} ' \
                                  f'{self.date} tagged "{self.tag}"')
        except Exception as e:
            # exception here will not terminate the program
            logger.exception(e)
    
    @classmethod
    def get_tag_configure(cls, conf_file: Optional[str]) -> None:
        if conf_file:
            with open(conf_file, 'r', encoding='utf-8') as conf:
                try:
                    cls.configure = json.loads(conf.read())
                    cls.logger.info(f'{conf_file} read for tag configure')
                except Exception as e:
                    cls.logger.exception(e)
                    #cls.configure = dict()
        else:
            cls.logger.warning(f'no tag configure file provided, no tag process')
    
    def __str__(self):
        s = f'Page {self.page} Entry {self.index}\n' \
            f'Tag: {self.tag}\n' \
            f'Date: {self.date} {self.time}\n' \
            f'{self.title}\n' \
            f'{self.content}\n'
        return s
    
    __repr__ = __str__

##### Page

`Page` acts like a task. Each `Page` represents one page and contains the following:
* use a page index to construct
* url for request the page
* method to request the page use `async_get()`
* attribute to store both parsed and unparsed returns
* attribute to contain `Entry` objects

In [18]:
class Page(SetLogger):
    
    page: int
    url: str
    unparsed_text: str
    #parsed_test: str
    entries: List[Entry]
    
    def __init__(self, page: int):
        self.page = page
        self.url = BASE_URL.format(page)
        self.entries: List[Entry] = []
    
    async def request(self, 
                      session: aiohttp.ClientSession, 
                      logger: Optional[logging.Logger] = None) -> None:
        if not logger:
            logger = self.logger
        self.unparsed_text = await async_get(
            self.url, f'Page({self.page})', HEADERS, session, logger, text=True)
    
    def extract(self) -> None:
        """fill self.entries with extracted Entry objects"""
        try:
            html = BeautifulSoup(self.unparsed_text, 'lxml')
            articles = html.find('section', class_='module-content') \
                           .find_all('article', class_='story')
            for i, atcl in enumerate(articles):
                try:
                    title = atcl.find('h3', class_='story-title').text
                    content = atcl.find('p').text
                    time_ = atcl.find('span', class_='timestamp').text
                    e = self._create_Entry(i, time_, title, content)
                    e.get_tag()
                    self.entries.append(e)
                except Exception as e:
                    # prevent terminating
                    self.logger.exception(e)
        except Exception as e:
            # page scale prevent terminating other tasks
            self.logger.exception(e)
            
    
    def _create_Entry(self, index: int, 
                      date_or_time: str, title: str, content: str) -> Entry:
        title = _replace(title)
        content = _replace(content)
        if ':' in date_or_time:
            time_ = date_or_time
        else:
            time_ = ''
            date = date_or_time
        # add today as date if time is present
        if time_:
            td = datetime.today()
            date = f'{td.year}年 {td.month}月 {td.day}日'
        return Entry(self.page, index, time_, date, title, content)
    
    def __str__(self):
        s = ''
        for entry in self.entries:
            s += str(entry) + '\n'
        return s

In [20]:
# test for Page object
_p1 = Page(1)
_p1.logger = test_logger
async with aiohttp.ClientSession() as s:
    await _p1.request(s)

[test][20/04/23 14:52:56 554.825][INFO]-pid:18068 th_id:15612- Page(1) [200 OK 0.27139s] 


In [None]:
#_p1.unparsed_text

In [None]:
s.closed

##### Document object

`Document`   
* store and sort `Page` objects
 * `Page.index` will be used
* output to file
 * one file for all content, ordered by `Page.index`
 * Each file for a tag, if tag exists

In [21]:
class Document(SetLogger):
    
    pages: List[Page]
    path: str
    _timestamp: str
    
    fmt = '%Y-%m-%d_%H-%M-%S'
    
    def __init__(self, path: str):
        self.pages: List[Page] = []
        self.path = path
        self._timestamp = ''
    
    def add(self, page: Page) -> None:
        self.pages.append(page)
    
    def output(self, tag: bool = True) -> None:
        # sort by Page.page
        self.pages.sort(key=lambda p: p.page)
        self._timestamp = datetime.now().strftime(self.fmt)
        
        self._output_total()
        if tag:
            self._output_tag()
    
    def _output_total(self) -> None:
        # use timestamp as file name
        f = os.path.join(self.path, self._timestamp+'.txt')
        with open(f, 'w', encoding='utf-8') as output:
            for p in self.pages:
                output.write(str(p))
        self.logger.info(f'total content output to {f}')
    
    def _output_tag(self) -> None:
        tag_entry_dict: Dict[str, str] = dict()
        # iterate through all Entry in all Page
        for p in self.pages:
            for e in p.entries:
                if e.tag in tag_entry_dict.keys():
                    tag_entry_dict[e.tag] += str(e) + '\n'
                else:
                    tag_entry_dict[e.tag] = str(e) + '\n'
        # one file for each tag
        for tag, entries_str in tag_entry_dict.items():
            f = os.path.join(self.path, f'{self._timestamp}_{tag}.txt')
            with open(f, 'a+', encoding='utf-8') as tag_output:
                tag_output.write(entries_str)
            self.logger.info(f'Tag {tag} output to {f}')
        self.logger.info(f'All tags output finished')

#### worker functions

##### dispatch task

Every Page works as a task, created by `dispatch()` and distributed to async queue

In [22]:
def dispatch(page_count: int, 
             task_q: asyncio.Queue, 
             logger: logging.Logger) -> None:
    for i in range(page_count):
        p = Page(i)
        task_q.put_nowait(p)
    logger.info(f'{page_count} Page created')

`downloader` function:
* coroutine
* get a Page from async queue
 * implement `asyncio.wait_for()`, prevent deadlock
* implement `q.task_done()` in `try: ... except: ...`

In [23]:
async def downloader(index: int, 
                     task_q: asyncio.Queue,
                     output_q: asyncio.Queue,
                     session: aiohttp.ClientSession,
                     delay: float) -> None:
    t1 = time.time()
    logger = get_logger(f'downloader_{index}')
    logger.info(f'Worker{index} starts')
    while task_q.qsize() > 0:
        try:
            page: Page = await asyncio.wait_for(task_q.get(), timeout=0.5)
            await page.request(session, logger)
            output_q.put_nowait(page)
            task_q.task_done()
            await asyncio.sleep(delay)
        except asyncio.TimeoutError:
            pass
        except Exception as e:
            task_q.task_done()
            logger.exception(e)
    logger.info(f'Worker{index} finished, {time.time()-t1:.5f}s')

`processor` function:
* get `Page` object from output queue
* `Page.extract()`
* store extracted pages to a `Document` object
 * as a single thread async program, no race condition exits
* should use `await asyncio.sleep()` to cede control back to the eventloop
* *determine the exit condition before doing anything else*

In [24]:
async def processor(index: int, 
                    output_q: asyncio.Queue, 
                    downloader_list: List[asyncio.Task],
                    document: Document) -> None:
    t1 = time.time()
    logger = get_logger(f'processor_{index}')
    logger.info(f'Processor{index} starts')
    while True:
        try:
            # determine the exit condition before doing anything else
            if all([task.done() for task in downloader_list]) and output_q.qsize() == 0:
                break
            page: Page = await asyncio.wait_for(output_q.get(), 0.5)
            page.extract()
            document.add(page)
            # make sure to cede control back to the eventloop
            await asyncio.sleep(0.1)
        except asyncio.TimeoutError:
            pass
        except Exception as e:
            logger.exception(e)
    logger.info(f'Processor{index} finished, {time.time()-t1:.5f}s')

##### main()

`main()` coroutine:
* check path
* set main logger with base name first
 * then set logger for Entry, Page and Document
 * must set logger before having any operations on classes or their instances
* reset Entry tag configure
* create async queue objects
* dispatch `Page`s as tasks
* create downloader and processor `Task` objects
* `aiohttp.ClientSession`
 * `try: ... finally: ...`
* await task queue
* gather all tasks, 
 * to ensure all task is done before main thread exits
* generate `Document` and output

In [27]:
async def main(page_count: int, 
               delay: float, 
               downloader_count: int, 
               output_path: Optional[str],
               tag: bool, 
               conf_file: Optional[str], 
               level: int) -> None:
    logger = get_main_logger(base_name=LOGGER_NAME, level=level)
    t1 = time.time()
    
    # check output path
    if not output_path:
        output_path = OUTPUT_PATH
    check_path(output_path, logger)
    
    # set class logger 
    Entry.set_logger('Entry')
    Page.set_logger('Page')
    Document.set_logger('Document')
    
    # reset Entry tag configure
    Entry.get_tag_configure(conf_file)
    
    # generate queues
    task_q: asyncio.Queue = asyncio.Queue()
    output_q: asyncio.Queue = asyncio.Queue()
    
    # generate Pages
    dispatch(page_count, task_q, logger)
    
    # await client session
    s = aiohttp.ClientSession()
    
    # create downloader task
    downloader_list: asyncio.Task = []
    try:
        for i in range(downloader_count):
            downloader_list.append(
                asyncio.create_task(
                    downloader(i, task_q, output_q, s, delay)
                )
            )
        # create processor task and global Document object
        doc = Document(output_path)
        processor_task = asyncio.create_task(
            processor(1, output_q, downloader_list, doc)
        )
        # wait for task queue to be emptied, all created tasks will be 
        # scheduled and awaited implicitly
        await task_q.join()
        
        # await until all tasks are done
        task_list = downloader_list + [processor_task]
        await asyncio.gather(*task_list)
        
        # output extracted pages and entries to file
        doc.output()
        
        
    finally:
        await s.close()
        logger.info('ClientSession closed.')
        logger.info(f'all done, total time {time.time()-t1:.5f}s')
        

In [28]:
await main(30, 2, 3, None, True, 'tag_configure.json', logging.INFO)

[cnR_async][20/04/23 14:55:34 943.424][INFO]-pid:18068 th_id:15612- path cnReuters_output already exists
[cnR_async.Entry][20/04/23 14:55:34 945.420][INFO]-pid:18068 th_id:15612- tag_configure.json read for tag configure
[cnR_async][20/04/23 14:55:34 946.416][INFO]-pid:18068 th_id:15612- 30 Page created
[cnR_async.downloader_0][20/04/23 14:55:34 947.414][INFO]-pid:18068 th_id:15612- Worker0 starts
[cnR_async.downloader_1][20/04/23 14:55:34 947.414][INFO]-pid:18068 th_id:15612- Worker1 starts
[cnR_async.downloader_2][20/04/23 14:55:34 948.411][INFO]-pid:18068 th_id:15612- Worker2 starts
[cnR_async.processor_1][20/04/23 14:55:34 948.411][INFO]-pid:18068 th_id:15612- Processor1 starts
[cnR_async.downloader_1][20/04/23 14:55:35 003.865][INFO]-pid:18068 th_id:15612- Page(1) [200 OK 0.05446s] 
[cnR_async.downloader_2][20/04/23 14:55:35 393.180][INFO]-pid:18068 th_id:15612- Page(2) [200 OK 0.44275s] 
[cnR_async.downloader_0][20/04/23 14:55:35 393.180][INFO]-pid:18068 th_id:15612- Page(0) [200