In [4]:
# Import necessary libraries
import requests
from lxml import html
import re
from tqdm import tqdm
import time
import random
import os
import pandas as pd

# For database connection
from sqlalchemy import create_engine, text

In [9]:
# Define a list of user agents for random selection
user_agents = [
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36",
]

In [10]:
# Set up headers with a random User-Agent
headers = {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
    "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
    "Cache-Control": "no-cache",
    "Connection": "keep-alive",
    "Content-Type": "application/x-www-form-urlencoded",
    "Origin": "https://sillok.history.go.kr",
    "Pragma": "no-cache",
    "Referer": "https://sillok.history.go.kr/mc/inspectionMonthList.do",
    "Sec-Fetch-Dest": "document",
    "Sec-Fetch-Mode": "navigate",
    "Sec-Fetch-Site": "same-origin",
    "Sec-Fetch-User": "?1",
    "Upgrade-Insecure-Requests": "1",
    "User-Agent": random.choice(user_agents),
    "sec-ch-ua": '"Google Chrome";v="125", "Chromium";v="125", "Not.A/Brand";v="24"',
    "sec-ch-ua-mobile": "?0",
    "sec-ch-ua-platform": '"macOS"',
}

### Function: fetch_month_ids

This function takes a `king_id` as input and sends a POST request to the appropriate URL (depending on the type of `king_id`).

It parses the HTML response to extract month IDs and month names using XPath and regular expressions. A random delay is added after the request for safety.

**Parameters:**

- `king_id`: A string identifier (e.g., `msilok_001` or `qsilok_001`).

**Returns:**

- A list of tuples, each containing a month ID and its corresponding month name.


In [4]:
def fetch_month_ids(king_id):
    # Prepare form data for the POST request
    data = {"id": king_id}

    # Choose URL based on king_id type
    url = (
        "https://sillok.history.go.kr/mc/inspectionMonthList.do"
        if king_id[0] == "m"
        else "https://sillok.history.go.kr/mc/inspectionMonthList.do?treeType=C"
    )

    # Send POST request
    response = requests.post(url, headers=headers, data=data)

    # Safety pause to avoid overwhelming the server
    time.sleep(random.uniform(1, 3))

    # Parse HTML response
    text = response.text
    month_url = html.fromstring(text).xpath(
        '//*[@id="cont_area"]/div/div[2]/ul[2]/li/ul/li/a/@href'
    )

    # Extract month IDs and names using regular expressions
    month_id = [re.search(r"([m,q]silok_.*?)'", month).group(1) for month in month_url]
    month_name = [re.search(r"(\d{4}년 .*월?)'", month).group(1) for month in month_url]

    return list(zip(month_id, month_name))

### Function: fetch_day_ids

This function accepts a tuple containing a month ID and its name, and sends a POST request to fetch day IDs and associated date information.

It parses the HTML response using XPath and regular expressions, and returns a list of tuples with day IDs and date info. A safety pause is included after the request.

**Parameters:**

- `month_id`: A tuple (`month_id`, `month_name`).

**Returns:**

- A list of tuples, each containing a day ID and the corresponding date information.


In [5]:
def fetch_day_ids(month_id):
    data = {"id": month_id[0], "dateInfo": month_id[1]}
    url = (
        "https://sillok.history.go.kr/mc/inspectionDayList.do?treeType=M"
        if month_id[0][0] == "m"
        else "https://sillok.history.go.kr/mc/inspectionDayList.do?treeType=C"
    )
    response = requests.post(url, headers=headers, data=data)

    time.sleep(random.uniform(1, 3))

    text = response.text
    days = html.fromstring(text).xpath(
        '//*[@id="cont_area"]/div/div[1]/div/span[2]/ul/li/a/@href'
    )

    day_ids = [re.search(r"([m,q]silok_.*?)'", day).group(1) for day in days]
    date_info = [re.findall(r"'([^']*)'", day)[-1] for day in days]
    return list(zip(day_ids, date_info))

### Function: fetch_article_ids

This function takes a tuple with a day ID and its associated date information and sends a POST request to obtain article IDs for that day.

It parses the HTML response using XPath to extract the article identifiers. A random delay is added after the request.

**Parameters:**

- `day_id`: A tuple (`day_id`, `date_info`).

**Returns:**

- A list of article IDs as strings.


In [6]:
def fetch_article_ids(day_id):
    data = {"id": day_id[0], "dateInfo": day_id[1]}
    response = requests.post(
        "https://sillok.history.go.kr/mc/inspectionDayList.do",
        headers=headers,
        data=data,
    )

    time.sleep(random.uniform(1, 3))

    text = response.text
    articles = html.fromstring(text).xpath(
        "//*[@id='cont_area']/div/div[3]/div/div[1]/ul/li/a/@id"
    )
    articles = [str(a) for a in articles]
    return articles

### Function: process_king_ids

This function orchestrates the entire scraping process:

1. It creates a list of king IDs (both `msilok` and `qsilok` types).
2. Iterates over these IDs to fetch month IDs, then day IDs, and finally article IDs.
3. Aggregates all article IDs into a single list.

**Returns:**

- A list of article IDs collected from the website.


In [None]:
month_ids = []

# Generate king IDs for both types
mking_ids = [f"msilok_{i:03d}" for i in range(1, 16)]
qking_ids = [f"qsilok_{i:03d}" for i in range(1, 14)]
king_ids = mking_ids + qking_ids

# Fetch month IDs for each king
for king_id in tqdm(
    king_ids, desc="Fetching month IDs from each king", total=len(king_ids)
):
    try:
        months = fetch_month_ids(king_id)
        month_ids.extend(months)
    except Exception as e:
        print(f"Failed to fetch month IDs for {king_id}: {e}")
        continue

print(f"Total {len(month_ids)} months fetched")

In [None]:
import asyncio
import aiohttp
from tqdm.asyncio import tqdm_asyncio
import nest_asyncio


async def fetch_day_ids_async(session, semaphore, month_id):
    async with semaphore:
        data = {"id": month_id[0], "dateInfo": month_id[1]}
        url = (
            "https://sillok.history.go.kr/mc/inspectionDayList.do?treeType=M"
            if month_id[0][0] == "m"
            else "https://sillok.history.go.kr/mc/inspectionDayList.do?treeType=C"
        )

        # Use aiohttp for async HTTP requests
        async with session.post(
            url, headers=headers, data=data, verify_ssl=False
        ) as response:
            # Safety pause with asyncio
            await asyncio.sleep(random.uniform(1, 3))

            text = await response.text()
            tree = html.fromstring(text)
            days = tree.xpath(
                '//*[@id="cont_area"]/div/div[1]/div/span[2]/ul/li/a/@href'
            )

            day_ids = [re.search(r"([m,q]silok_.*?)'", day).group(1) for day in days]
            date_info = [re.findall(r"'([^']*)'", day)[-1] for day in days]
            return list(zip(day_ids, date_info))


# Replace the synchronous day_ids fetching with asynchronous version
async def fetch_all_day_ids(month_ids):
    day_ids = []
    semaphore = asyncio.Semaphore(20)  # Limit to 20 concurrent requests

    # Create ClientSession for connection pooling
    async with aiohttp.ClientSession() as session:
        tasks = []
        for month in month_ids:
            task = asyncio.create_task(fetch_day_ids_async(session, semaphore, month))
            tasks.append(task)

        # Process tasks with a progress bar
        for task in tqdm_asyncio(
            asyncio.as_completed(tasks),
            desc="Fetching day IDs from each month",
            total=len(tasks),
        ):
            try:
                result = await task
                day_ids.extend(result)
            except Exception as e:
                print(f"Failed to fetch day IDs: {e}")
                continue

    return day_ids


# Execute the async function in Jupyter notebook
day_ids = []

# This is how you run async code in Jupyter


nest_asyncio.apply()  # This allows asyncio to work in Jupyter

# Run the async function
day_ids = asyncio.run(fetch_all_day_ids(month_ids))

In [None]:
async def fetch_article_ids(session, day_id, semaphore):
    async with semaphore:
        data = {"id": day_id[0], "dateInfo": day_id[1]}
        async with session.post(
            "https://sillok.history.go.kr/mc/inspectionDayList.do",
            headers=headers,
            data=data,
            verify_ssl=False,
        ) as response:
            text = await response.text()
        # Mimic random sleep between 1 and 3 seconds
        await asyncio.sleep(random.uniform(0, 1))
        tree = html.fromstring(text)
        articles = tree.xpath("//*[@id='cont_area']/div/div[3]/div/div[1]/ul/li/a/@id")
        return [str(a) for a in articles]


async def main(day_ids):
    article_ids = []
    semaphore = asyncio.Semaphore(30)  # Limit to 20 concurrent requests
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_article_ids(session, day, semaphore) for day in day_ids]
        for task in tqdm(
            asyncio.as_completed(tasks),
            total=len(tasks),
            desc="Fetching article IDs from each day",
        ):
            try:
                articles = await task
                article_ids.extend(articles)
            except Exception as e:
                print(f"Failed to fetch article IDs: {e}")
    return article_ids


nest_asyncio.apply()
# In a Jupyter Notebook cell, you can run:
article_ids = asyncio.run(main(day_ids))

### Download HTML Pages

This block downloads the HTML content of each article page and saves it to the `data/mqsillok/raw/html` directory. A safety pause is added between requests.


In [None]:
save_dir = "data/mqsillok/raw/html"
os.makedirs(save_dir, exist_ok=True)

for article_id in tqdm(
    article_ids, desc="Downloading HTML pages", total=len(article_ids)
):
    try:
        url = f"https://sillok.history.go.kr/mc/id/{article_id}"
        response = requests.get(url, headers=headers)

        # Safety pause
        time.sleep(random.uniform(1, 3))
        html_content = response.text

        # Save HTML content to a file
        with open(
            os.path.join(save_dir, f"{article_id}.html"), "w", encoding="utf-8"
        ) as f:
            f.write(html_content)
    except Exception as e:
        print(f"Failed to download HTML for {article_id}: {e}")

### Extract Data from HTML Pages

This block reads the saved HTML files, extracts information (title, year, date, content, and king) from each page using XPath, and compiles the data into a pandas DataFrame.


In [None]:
data = []
for article_id in tqdm(
    article_ids, desc="Extracting data from HTML pages", total=len(article_ids)
):
    try:
        file_path = os.path.join(save_dir, f"{article_id}.html")
        with open(file_path, "r", encoding="utf-8") as f:
            html_content = f.read()

        page = html.fromstring(html_content)

        # Extract title
        title = page.xpath(
            "/html/body/div[2]/div[2]/div[2]/div/div[1]/div/span/text()"
        )[0].strip()

        # Extract year (remove the '년' character)
        year_text = page.xpath(
            "/html/body/div[2]/div[2]/div[2]/div/div[1]/div/span/span/text()"
        )[0].strip()
        year = int(year_text.replace("년", ""))

        # Extract content text
        content = " ".join(
            p.text_content().strip()
            for p in page.xpath("/html/body/div[2]/div[2]/div[2]/div/div[3]/div/div//p")
        )

        # Extract date
        date = page.xpath("/html/body/div[2]/div[2]/div[2]/div/ul[1]/li[6]/a/text()")[
            0
        ].strip()

        # Extract king information from the URL and clean it up
        king_raw = page.xpath(
            "/html/body/div[2]/div[2]/div[2]/div/ul[1]/li[3]/a/@href"
        )[0].strip()
        king = king_raw.split(", ")[3].strip(";").strip("')").replace("實錄", "")

        info = {
            "title": title,
            "year": year,
            "date": date,
            "content": content,
            "king": king,
        }
        data.append(info)
    except Exception as e:
        print(f"Failed to extract data for {article_id}: {e}")

df = pd.DataFrame(data)
print(f"Extracted data for {len(df)} articles")

In [None]:
df

### Save Data to PostgreSQL Database

This block creates a PostgreSQL table (dropping it if it exists) and inserts the data from the DataFrame into the table using SQLAlchemy. Make sure to update the connection string with your credentials.


In [31]:
# Define the PostgreSQL connection string (update with your credentials)
db_connection_str = "postgresql://dhg503:dhg503dhg503@localhost:54503/dhg503"
engine = create_engine(db_connection_str)

# SQL to drop the table if it exists and create a new table
create_table_sql = """
DROP TABLE IF EXISTS mqshillu;
CREATE TABLE mqshillu (
    title TEXT,
    year INTEGER,
    date TEXT,
    content TEXT,
    king TEXT
);
"""

In [None]:
# Execute SQL statements within a transaction
with engine.connect() as conn:
    conn.execute(text("BEGIN"))
    conn.execute(text(create_table_sql))

    # Insert data row by row from the DataFrame
    for _, row in df.iterrows():
        insert_sql = text(
            """
            INSERT INTO mqshillu (title, year, date, content, king)
            VALUES (:title, :year, :date, :content, :king)
            """
        )
        conn.execute(
            insert_sql,
            {
                "title": row["title"],
                "year": row["year"],
                "date": row["date"],
                "content": row["content"],
                "king": row["king"],
            },
        )

    conn.execute(text("COMMIT"))

print("Data successfully saved to the PostgreSQL database.")