In [97]:
# Tested in Jupyter.

!pip install selenium
!pip install nest_asyncio
!pip install tqdm
!pip install aiohttp
!pip install bs4

# imports and global variables
import selenium as se
import csv
try:
    from tqdm import tqdm
except (ImportError, ModuleNotFoundError):
    def tqdm(x):
        return x


In [98]:
SEARCH_BASE_LINK = r'https://kookbang.dema.mil.kr/newsWeb/search.do'
# Categories to allow
CATEGORIES = {
    '국방', '기획연재', '무기백과'
}

In [99]:
# using selenium to get the page source
from selenium import webdriver
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.remote.webelement import WebElement
page = webdriver.Chrome()



In [100]:
# Debug purpose. Everything will be defined as function in the future

In [101]:
# Standard method to inspect, use F12 -> select element -> copy XPATH.
import nest_asyncio
nest_asyncio.apply()

In [102]:
from selenium.common.exceptions import NoSuchElementException
class PageManager:
    def __init__(self, page: WebDriver):
        self.page = page

    def get_element_by_xpath(self, xpath):
        return self.page.find_element('xpath', xpath)

    def get_element_and_click(self, xpath):
        elem = self.get_element_by_xpath(xpath)
        elem.click()
        return elem

    def get_element_and_send_keys(self, xpath, keys):
        elem = self.get_element_by_xpath(xpath)
        elem.send_keys(keys)
        return elem

    def get_element_by_selector(self, selector):
        try:
            return self.page.find_element('css selector', selector)
        except NoSuchElementException:
            return None

class KookbangPageManager(PageManager):

    WAIT_TIME = 1
    def __init__(self, page: WebDriver):
        super().__init__(page)
        self.reset()

    def reset(self):
        self.page.get(SEARCH_BASE_LINK)

    def search(self, keyword: str):
        #  input the search word, find #kwd
        self.get_element_and_send_keys('//*[@id="kwd"]', keyword)
        self.get_element_and_click('//*[@id="container"]/div[2]/div/form/div/div[2]/button')
        self.page.implicitly_wait(self.WAIT_TIME)

    def find_category_elems(self) -> [WebElement]:
        for i in range(9):
            selector = f'#container > div.full_search_box > div > ul > li:nth-child({i}) > a'
            elem = self.get_element_by_selector(selector)
            if elem is not None:
                if elem.text in CATEGORIES:
                    yield elem
    def get_list_of_news_category(self):
        # get next page
        # 3 -> 1 page, 4 -> 2 page, ...
        # if title is "다음페이지" then stop
        for pages in range(100):
            if pages != 0:
                page_selector = f"#container > div.full_search_box > div > div.pagination > a:nth-child({pages + 3})"
                # starts from 2 page, until we get to the last page
                page_elem = self.get_element_by_selector(page_selector)
                if page_elem is None or page_elem.get_attribute('title') == '다음페이지':
                    break
                page_elem.click()
                self.page.implicitly_wait(self.WAIT_TIME)
            for idx in range(16): # single page may have 15 news
                selector = f'#container > div.full_search_box > div > div.box > ul > li:nth-child({idx}) > a'
                elem = self.get_element_by_selector(selector)
                if elem is None:
                    continue
                yield elem
        #container > div.full_search_box > div > div.pagination > a:nth-child(4)

    def get_news_genexpr(self, keyword : str):
        self.reset()
        self.search(keyword)
        for categoryClickButtons in self.find_category_elems():
            categoryClickButtons.click()
            self.page.implicitly_wait(self.WAIT_TIME)
            for news in self.get_list_of_news_category():
                yield news.get_attribute('href')



In [103]:
# searchExample = KookbangPageManager(page)
# for i in searchExample.get_news_genexpr('USV'):
#     print(i)

In [104]:
import asyncio
from bs4 import BeautifulSoup
import aiohttp
class AsyncJupyterChecker:
    # wrapper class that wraps run_until_complete
    # run_until_complete is not allowed in jupyter notebook. This class will check if it is in jupyter notebook
    def __init__(self, loop: asyncio.AbstractEventLoop):
        self.loop = loop

    def run_until_complete(self, coroutine):
        if self.loop.is_closed():
            self.loop = asyncio.new_event_loop()
        if AsyncJupyterChecker.is_jupyter():
            task = self.loop.create_task(coroutine)
            self.loop.run_until_complete(task)
            return task.result()
        else:
            return self.loop.run_until_complete(coroutine)

    @staticmethod
    def is_jupyter():
        try:
            import IPython
            return True
        except ImportError:
            return False

    def __getattr__(self, item):
        return getattr(self.loop, item)

class NewsData:  # (date, title, content, url)
    def __init__(self, date: str, title: str, content: str, url: str):
        self.date = date
        self.title = title
        self.content = content
        self.url = url

    # Accepts csv writer object, writes data to csv
    def write_to_csv(self, csv_writer: csv.writer):
        csv_writer.writerow([self.date, self.title, self.content, self.url])

class PageParser:
    # url -> returns NewsData
    # static semaphores
    ASYNC_LOOP = AsyncJupyterChecker(asyncio.get_event_loop())  # we will use this loop for async
    semaphores = asyncio.Semaphore(10)  # only allow 10 concurrent requests
    def __init__(self, url):
        if PageParser.ASYNC_LOOP.is_closed():
            PageParser.ASYNC_LOOP = AsyncJupyterChecker(asyncio.new_event_loop())
        # get date and index from url
        self.url = url
        # url may be https://kookbang.dema.mil.kr/newsWeb/20230411/4/ATCE_CTGR_0010010000/view.do
        # then extract 20230411, 4
        self.date = url.split('/')[4]
        self.index = url.split('/')[5]
        self.content: NewsData|None = None

    def parse(self, text: str) -> NewsData | None:
        soup = BeautifulSoup(text, 'html.parser')
        # title : <meta property="og:title" content="$content">
        # content : <meta property="og:description" content="$content">
        # we don't modify date even if there were fixes for content
        title = soup.find('meta', property='og:title')
        if title is None:
            print(f'Error parsing {self.url}')
            return None
        title = title['content']
        contents = soup.find_all('meta', property='og:description')
        if contents is None:
            merged_string = ""
        else:
            merged_string = ""
            for content in contents:
                merged_string += content['content']
        return NewsData(self.date, title, merged_string, self.url)

    async def get_html(self) -> str:
        async with PageParser.semaphores:
            async with aiohttp.ClientSession() as session:
                async with session.get(self.url) as response:
                    if response.status != 200:
                        raise ConnectionError('Error')
                    return await response.text()

    async def get_content(self) -> NewsData:
        text = await self.get_html()
        return self.parse(text)

    def get_content_sync(self) -> NewsData:
        return PageParser.ASYNC_LOOP.run_until_complete(self.get_content())

    @staticmethod
    def get_content_sync_from_url(url: str) -> NewsData:
        return PageParser(url).get_content_sync()

    @staticmethod
    def get_content_sync_from_urls(urls: [str]) -> [NewsData]:
        for url in urls:
            yield PageParser.get_content_sync_from_url(url)

    @staticmethod
    async def get_content_async_from_urls(urls: [str]) -> [NewsData]:
        result = []
        for contents in asyncio.as_completed([PageParser(url).get_content() for url in urls]):
            result.append(await contents)
        return result

    @staticmethod
    def get_contents_from_urls(urls: [str]) -> [NewsData]:
        generator = PageParser.get_content_async_from_urls(urls)
        result = PageParser.ASYNC_LOOP.run_until_complete(generator)
        return result


In [105]:
import csv
import os



In [106]:
# example url
example_url = 'https://kookbang.dema.mil.kr/newsWeb/20230417/2/ATCE_CTGR_0010040000/view.do'
# usage -> PageParser(url).get_content_sync()
# test_parser = PageParser(example_url)
# content = test_parser.get_content_sync()
# print(content.content)
def crawl_kookbang(kwd):
    with open(f'{kwd}.csv', 'w') as f:
        csv_writer = csv.writer(f)
        csv_writer.writerow(['date', 'title', 'content', 'url'])
        pageManager = KookbangPageManager(page)
        for content in PageParser.get_contents_from_urls(pageManager.get_news_genexpr(kwd)):
            if content is not None:
                content.write_to_csv(csv_writer)



Error parsing https://kookbang.dema.mil.kr/newsWeb/20191023/2/ATCE_CTGR_0010020002/view.do


In [106]:
crawl_kookbang('USV') # searches USV and saves as USV.csv