# Proof-of-Concept notepad

In [2]:
import requests
import json
import csv

from datetime import datetime, timezone, timedelta
from bs4 import BeautifulSoup
from time import sleep
from random import randint
from collections import namedtuple
import re

from typing import Literal


Article = namedtuple("Article", ["title", "content", "url", "pub_time", "section", "press", "image"])
SummerizedArticle = namedtuple("SummerizedArticle", ["title", "content", "url", "pub_time", "section", "press", "problem", "issue", "keyword", "tag"])

Journal = namedtuple("Journal", ["journalNm", "journalId"])

Tag = namedtuple("Tag", ["tagId", "tagName"])

model = None
tokenizer = None


## Wrapper

### LLM

In [None]:
from llama_cpp import Llama
from transformers import AutoTokenizer

class LLM:
    MODEL_ID = "MLP-KTLim/llama-3-Korean-Bllossom-8B-gguf-Q4_K_M"
    MODEL_PATH = "/home/gpp/src/model/llama3-korean-bllossom-8b/llama-3-Korean-Bllossom-8B-Q4_K_M.gguf"

    model: Llama | None = None
    tokenizer: AutoTokenizer | None = None

    prompt: str = ""

    def __init__(self, verbose: bool=False, temperature: float=0.3, top_p: float=0.9, max_tokens: int=2048):
        if LLM.model is None:
            LLM.model = Llama(
                model_path=LLM.MODEL_PATH,
                n_ctx=8192,
                n_gpu_layers=-1,
                verbose=verbose
            )
        
        if LLM.tokenizer is None:
            LLM.tokenizer = AutoTokenizer.from_pretrained(LLM.MODEL_ID)

        self.temperature = temperature
        self.top_p = top_p
        self.max_tokens = max_tokens
        self.temperature = temperature
        self.prompt: str = ""
    
    def __del__(self):
        del LLM.model
        del LLM.tokenizer

        LLM.model = None
        LLM.tokenizer = None

    def set_prompt(self, prompt: str):
        self.prompt = prompt
        return
    
    def generate(self, instruction: str):
        if len(self.prompt) == 0:
            raise ValueError("prompt is not set.")
        
        if len(instruction) == 0:
            raise ValueError("instruction is not set.")
        
        generation_kwargs = {
            "max_tokens":self.max_tokens,
            "stop":["<|eot_id|>"],
            "top_p":self.top_p,
            "temperature":self.temperature,
        }

        messages = [
            {"role": "system", "content": f"{self.prompt}"},
            {"role": "user", "content": f"{instruction}"}
        ]

        p = LLM.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True,
            truncation=True
        )

        response = LLM.model(p, **generation_kwargs)
        return response["choices"][0]["text"]

llm: LLM = LLM()

### API

In [None]:
class ApiWrapper:
    TOKEN: str = ""
    URL: str = ""
    AUTH: dict = {}
    TAG_TABLE: dict = {}

    def __init__(self, url=URL_):
        ApiWrapper.URL = url
        ApiWrapper.AUTH = AUTH

        if AUTH:
            self.login()
            self.refresh_tag_table()


    def register(self):
        response = self.send("/auth/eula", method="GET", auth=False)
        eulas = [eula["eulaId"] for eula in response.json()["data"]]

        payload = {
            "userId": ApiWrapper.AUTH["userId"],
            "password": ApiWrapper.AUTH["password"],
            "userNm": "LLM_TEST",
            "gender": "MALE",
            "birthDate": "2001-08-16",
            "interestTagIds": [
                "test_tag"
            ],
            "agreedEulaIds": eulas
        }

        response = self.send("/auth/sign-up", method="POST", auth=False, data=payload)

        if response.status_code == 200:
            return self.login()
        
        return response.status_code


    def login(self):
        response = self.send("/auth/token", method="POST", auth=False, data=ApiWrapper.AUTH)

        if response.status_code == 200:
            ApiWrapper.TOKEN = response.json()["data"][0]["accessToken"]

        return ApiWrapper.TOKEN


    def upload_journal(self, journal_name: str) -> int:
        uploaded_journals = self.get_uploaded_journals()

        if self.get_journal_id(uploaded_journals, journal_name) is None:
            journal_id =  "%04d" % (len(uploaded_journals) + 1)
            response = self.send("/journal", method="POST", auth=True, data={"journalId": journal_id, "journalNm": journal_name})

            if response.status_code != 200:
                raise RuntimeError("failed to upload journal: %s" % response.text)

        return journal_id


    def get_uploaded_journals(self) -> list[Journal]:
        response = self.send("/journal", method="GET", auth=True)
        if response.status_code != 200:
            raise RuntimeError("unable to fetch uploaded journal: %s" % response.text)


        journals: list[Journal] = []
        for journal in response.json():
            journals.append(Journal(journalNm=journal["journalNm"], journalId=journal["journalId"]))

        return journals


    def get_uploaded_tags(self):
        response = self.send("/tag", method="GET", auth=True)
        if response.status_code != 200:
            raise RuntimeError("unable to fetch uploaded tag(s): %s" % response.text)

        tags: list[Tag] = []
        for tag in response.json():
            tags.append(Tag(tagId=tag["tagId"], tagName=tag["tagName"]))
        
        return tags
    

    def refresh_tag_table(self):
        ApiWrapper.TAG_TABLE = {}
        tags = self.get_uploaded_tags()

        for tag in tags:
            ApiWrapper.TAG_TABLE[tag.tagId] = tag.tagName

        return ApiWrapper.TAG_TABLE
    

    def upload_tag(self, tag_name: str, tag_id: str):
        self.refresh_tag_table()
        
        for tag in ApiWrapper.TAG_TABLE.keys():
            if tag_name == ApiWrapper.TAG_TABLE[tag]:
                return tag

        response = self.send("/tag", method="POST", auth=True, data={"tagName": tag_name, "tagId": tag_id})

        if response.status_code != 200:
            raise RuntimeError("failed to upload tag: %s" % response.text)

        ApiWrapper.TAG_TABLE[tag_id] = tag_name

        return tag_id


    def upload_news(self, news: list[Article], tag_name: str):
        for n in news:
            journal_id = self.upload_journal(n.press)
            tag_id = self.upload_tag(n.section, n.section)

            data = {
                "title": n.title,
                "link": n.url,
                "journalId": journal_id,
                "publicationDate": n.pub_time,
                "tagIds": [tag_id]
            }

            response = self.send("/news", auth=True, data=data)

            if response.status_code != 200:
                raise RuntimeError("failed to upload news: %s" % response.text)


    def send(self, endpoint: str, method: Literal["GET", "POST"]="GET", auth: bool=True, data: dict={}) -> requests.Response:
        headers = {
            "Content-Type": "application/json",
        }

        url = ApiWrapper.URL + endpoint

        if auth:
            if len(ApiWrapper.TOKEN) == 0:
                self.login()

            headers["Authorization"] = "Bearer " + ApiWrapper.TOKEN

        if method == "POST":
            response = requests.post(url=url, headers=headers, json=data)
            
        # elif method == "GET":

        else: # default: GET
            response = requests.get(url=url, headers=headers, json=data)
            
        return response


    def get_journal_name(self, journals: list[Journal], id: int) -> Journal:
        for journal in journals:
            if journal.journalId == id:
                return journal
            
        return None


    def get_journal_id(self, journals: list[Journal], name: str) -> Journal:
        for journal in journals:
            if journal.journalNm == name:
                return journal

        return None


## fetch news

In [None]:
NOW: str = datetime.now(tz=timezone(timedelta(hours=9))).strftime("%Y%m%d%H%M")
DEFAULT_IMAGE_URL: str = "https://i.namu.wiki/i/aemZBGJQLVu6ePeapyhYqE6OCJQId6CbI0WnQ6CqzTUJpHCO4EzLhRR4HZqy01pjxIA4AywnLqm_Ysw5A-9TJsbqpOKjEnK6rA5VjJf0phRNIhSIu7RINe2JsOzfiZ0pD5ySVhrKAixdSUX0a4xuEQ.webp"

header: dict = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"}

sections = {
    # "정치" : 100,
    # "경제": 101,
    # "사회": 102,
    "생활": 103,
    # "세계": 104,
    # "과학": 105 
}

news = {
    section : []
    for section in sections.keys()
}

for section in sections.keys():
    articles: list[Article] = []

    for page in range(1):
        url = f"https://news.naver.com/section/template/SECTION_ARTICLE_LIST?sid={sections[section]}&sid2=&cluid=&pageNo={page}&date=&next={NOW}"
        response = requests.get(url, headers=header)
        bs = BeautifulSoup(json.loads(response.text)["renderedComponent"]["SECTION_ARTICLE_LIST"], 'html')
        sleep(0.5 + randint(0, 100) * 0.1)

        for element in bs.findAll("li"):
            url: str = element.select("a")[0]["href"].strip()
            response = requests.get(url, headers=header)

            pub_time: str = element.select(".sa_text_datetime")[0].text.strip()

            if pub_time[-1:] == "전": # "xx분전"
                numbers = int("".join(re.findall(r'\d+', pub_time)))

                timestamp_utc = datetime.now(timezone.utc) - timedelta(minutes=numbers)
                kst_time = timestamp_utc.astimezone(timezone(timedelta(hours=9)))
                pub_time = kst_time.isoformat().split("+")[0]

            content_bs = BeautifulSoup(response.text).select("#newsct_article")[0]

            articles.append(
                Article(
                    title=element.select("strong")[0].text.strip(),
                    # content=element.select(".sa_text_lede")[0].text.strip(),
                    content=content_bs.text.strip(),
                    image=content_bs.select("img")[0]["data-src"].strip() if content_bs.select("img") else DEFAULT_IMAGE_URL,
                    url=element.select("a")[0]["href"].strip(),
                    pub_time=pub_time,
                    section=section,
                    press=element.select(".sa_text_press")[0].text.strip(),
                )
            )

        sleep(0.5 + randint(0, 30))

    news[section] = articles[:]
    print(section)


## save news data to csv file

In [82]:
for section in sections.keys():
    articles = news[section]
    articles.insert(0, tuple(title for title in Article._fields))

    with open(f"./articles-{section}.csv", 'w', encoding="utf8") as f:
        csv.writer(f).writerows(articles)

## load news data from csv file

In [None]:
news_life: list[Article] = []

with open("./articles-생활.csv", 'r', encoding="utf8") as f:
    for i in csv.reader(f):
        news_life.append(
            Article(
                title=i[0],
                content=i[1],
                url=i[2],
                image=i[6],
                pub_time=i[3],
                section=i[4],
                press=i[5]
            )
        )

news_life.pop(0)
news_life[:5]

## 뉴스 업로드

In [None]:
sample_news = news_life[2]
sample_news

In [None]:
api.get_uploaded_journals()

In [None]:
api = ApiWrapper()

# for n in news:
    # 언론사 등록 확인

n = sample_news

journal_id = api.upload_journal(n.press)
tag_id = api.upload_tag(n.section, n.section)

data = {
    "title": n.title,
    "link": n.url,
    "journalId": journal_id,
    "publicationDate": n.pub_time,
    "tagIds": [tag_id]
}

response = api.send("/news", auth=True, data=data)

print(response.status_code)
print(response.text)

In [None]:
api.TOKEN

In [None]:
api.send("/news", auth=True).text

In [31]:
press = list(set([news.press for news in news_life]))

presses = {
    i: press[i] for i in range(len(press))
}

In [None]:
for press_id in presses.keys():
    print(press_id, presses[press_id])

In [72]:
def get_press_name_from_press_id(id: int):
    for press_id in presses.keys():
        if press_id == id:
            return presses[press_id]


def get_press_id_from_press_name(name: str):
    for press_id in presses.keys():
        if presses[press_id] == name:
            return press_id

In [27]:
TOKEN = ApiWrapper().login()
URL = ApiWrapper().URL

In [None]:
for press_id in presses.keys():
    response = requests.post(
        URL + "/journal",

        headers={
            "Authorization": f"Bearer {TOKEN}"
        },

        data={
            "journalNm": presses[press_id]
            "journalId": press_id,
        }
    )

    print(response.status_code, response.text)

In [None]:
response = requests.post(
    URL + "/news",

    headers={
        "Content-Type": "application/json",
        "Authorization": f"Bearer {TOKEN}"
    },

    json={
        "title": sample_news.title,
        "link": sample_news.url,
        # "journalId": get_press_id_from_press_name(sample_news.press),
        "journalId": sample_news.press,
        "publicationDate": sample_news.pub_time.split('+')[0],
        "tagIds": [
            6
        ]
    }
)

print(response.status_code, response.text)

## test LLM

In [None]:
from llama_cpp import Llama
from transformers import AutoTokenizer

if model is not None:
    del model
if tokenizer is not None:
    del tokenizer

model_id = 'MLP-KTLim/llama-3-Korean-Bllossom-8B-gguf-Q4_K_M'
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = Llama(
    model_path='/home/gpp/src/model/llama3-korean-bllossom-8b/llama-3-Korean-Bllossom-8B-Q4_K_M.gguf',
    n_ctx=8192,
    n_gpu_layers=-1,
    verbose=True
)

def apply_prompt(instruction: str) -> str:
    PROMPT = \
    """
    다음은 [뉴스 제목]과 [뉴스 본문]입니다.
    요약에는 "주제"와 "주요 내용"이 포함되어야 합니다.
    - "problem": 뉴스의 주요 문제를 설명하는 핵심 문장
    - "summerized": 뉴스의 요약, 50자 이상의 세 문장
    - "keyword": 뉴스의 핵심 키워드
    다음은 예시입니다. keyword는 반드시 명사형으로 된 한 문장으로 작성해야 합니다.

    {
        "problem": "기후 변화로 부산 지역 해수면 상승",
        "summerized": "최근 부산 지역 해수면이 0.5cm 상승했다. 부산대학교 해양과학과 조교수는 이를 기후 변화로 인한 결과로 보았다. 환경부와 국토교통부는 내달 합동 조사단을 꾸려 해수면 상승 원인을 찾기로 했다.",
        "keyword": "부산 지역 해수면 상승"
    }
    """

    generation_kwargs = {
        "max_tokens":4096,
        "stop":["<|eot_id|>"],
        "top_p":0.9,
        "temperature":0.3,

    }

    messages = [
        {"role": "system", "content": f"{PROMPT}"},
        {"role": "user", "content": f"{instruction}"}
    ]

    prompt = tokenizer.apply_chat_template(
        messages, 
        tokenize = False,
        add_generation_prompt=True,
        truncation=True
    )

    response = model(prompt, **generation_kwargs)
    return response["choices"][0]["text"]

apply_prompt("")

In [None]:
from tqdm import tqdm

news_summarized: list[SummerizedArticle] = []

section = "생활 분야"

# PROMPT = \
# f"""
# 다음은 {section} 뉴스의 [뉴스 제목]과 [뉴스 본문]입니다.
# 요약에는 "주제"와 "주요 내용"이 포함되어야 합니다.
# 출력은 JSON 형식으로 해주세요. 각 키는 "problem", "issue", "summerized", "keyword"입니다.
# - "problem": 뉴스의 주요 문제를 설명하는 문장
# - "summerized": 뉴스의 요약, 50자 이상의 세 문장
# - "issue": 뉴스의 핵심 문장을 포함하는 완결된 한 문장
# - "keyword": 뉴스의 핵심을 집약하는 한 단어, 7어절 이하 명사형 문장

# 출력은 JSON 형식으로 요청 드린 내용만 담아 주세요.
# """


generation_kwargs = {
    "max_tokens":4096,
    "stop":["<|eot_id|>"],
    "top_p":0.9,
    "temperature":0.3,

}

for article in tqdm(news_life):
    title = article.title.replace("[", "<").replace("]", ">")
    content = article.content.replace("[", "<").replace("]", ">")

    instruction = f'''
    [뉴스 제목]\n
    {title}\n
    [뉴스 본문]\n
    {content}
    '''

    messages = [
        {"role": "system", "content": f"{PROMPT}"},
        {"role": "user", "content": f"{instruction}"}
    ]

    prompt = tokenizer.apply_chat_template(
        messages, 
        tokenize = False,
        add_generation_prompt=True,
        truncation=True
    )

    flag = True
    
    while flag:
        flag = False
        response = model(prompt, **generation_kwargs)
        print(response["choices"][0]["text"])

        try:
            response_json = json.loads(response["choices"][0]["text"])

            news_summarized.append(
                SummerizedArticle(
                    title=title,
                    content=response_json['problem'],
                    url=article.url,
                    pub_time=article.pub_time,
                    section=article.section,
                    press=article.press,
                    problem=response_json['problem'],
                    issue=response_json['issue'],
                    keyword=response_json['keyword'],
                    tag=article.section
                )
            )
        except:
            flag = True
            print("error!\r", end="")
            messages[1]["content"] = instruction + "\n출력은 JSON 형식이어야 합니다. 다시 시도하세요."
            continue
        

    # print(response['choices'][0]['text'][len(prompt):])

    # 요약 뉴스: 아티클, 뉴스 원문: 뉴스
    # 키워드: LLM이 분류한 이슈 - 요약 뉴스에 들어감
    # 태그: 원본 뉴스가 속한 카테고리

In [None]:
news_life[17]

In [None]:
from tqdm import tqdm

section = "생활 분야"

PROMPT = \
f"""
다음은 {section} 뉴스의 [뉴스 제목]과 [뉴스 본문]입니다.
이 뉴스는 어떤 연령대, 어떤 성별의 사람들이 가장 관심을 가질까요?
"""

generation_kwargs = {
    "max_tokens":4096,
    "stop":["<|eot_id|>"],
    "top_p":0.9,
    "temperature":0.3,

}

for article in tqdm([news_life[17]]):
    title = article.title.replace("[", "<").replace("]", ">")
    content = article.content.replace("[", "<").replace("]", ">")

    instruction = \
    f'''
    [뉴스 제목]\n
    {title}\n
    [뉴스 본문]\n
    {content}
    '''

    messages = [
        {"role": "system", "content": f"{PROMPT}"},
        {"role": "user", "content": f"{instruction}"}
    ]

    prompt = tokenizer.apply_chat_template(
        messages, 
        tokenize = False,
        add_generation_prompt=True,
        truncation=True
    )

    response = model(prompt, **generation_kwargs)
    print(response["choices"][0]["text"])


In [None]:
news_summarized

In [None]:
news_life[25]

In [None]:
print(response["choices"][0]["text"])

In [None]:
response

# 모듈 테스트

## 뉴스 가져오기

In [1]:
from wrapper.news_fetcher import NewsFetcher

news = NewsFetcher()

In [None]:
news_list = news.fetch_news(n_pages=5)
news.save_csv()

## 뉴스 업로드

In [None]:
from wrapper.api_wrapper import ApiWrapper

api = ApiWrapper()

In [None]:
api.upload_news(news.news["정치"][1:])
api.upload_news(news.news["사회"][1:])

# 뉴스 요약

In [None]:
from wrapper.llm_wrapper import LLM
from tqdm import tqdm

from wrapper.news_fetcher import NewsFetcher
from entity.entity import *


news = NewsFetcher().load_csv()["사회"]

llm = LLM(n_ctx=8192, max_tokens=512)
summerized: list[SummerizedNews] = []

for i, target in tqdm(enumerate(news), total=len(news)):
    llm.set_prompt(
        f"""
        다음 형식에 맞추어 핵심 키워드 중심으로의 세 문장으로 요약해 주세요.
        이 뉴스는 무작위로 선택된 {target.tag} 분야의 뉴스입니다.
        요약하신 자료는 텍스트 임베딩을 거쳐 클러스터링 작업에 사용될 겁니다.

        1. 이 기사에서 다루는 핵심 사건
        2. 사건의 배경과 관련된 맥락
        3. 사건이 가지는 의미나 시사점, 중요성
        """
    )

    content = llm.generate(
        f"""
        title: {target.title}\n
        content: {target.content}
        """
    )

    summerized.append(SummerizedNews(title=target.title, content=content, topics="", id=1288+i))


In [None]:
summerized[1].title

In [None]:
summerized[5].content

## 클러스터링

### 요약문 임베딩

In [None]:
import numpy as np
from llama_cpp import Llama


del llm

MODEL_PATH = "/home/gpp/src/model/llama3-korean-bllossom-8b/llama-3-Korean-Bllossom-8B-Q4_K_M.gguf"
MODEL_ID = "MLP-KTLim/llama-3-Korean-Bllossom-8B-gguf-Q4_K_M"

model = Llama(
    model_path=MODEL_PATH,
    embedding=True,
    verbose=False
)

embeddings = []

for texts in tqdm(summerized):
    embedding = model.embed(texts.content)
    embeddings.append({texts.id: embedding})

X = [embeddings[i][summerized[i].id] for i in range(len(summerized))]
max_length = max([len(x) for x in X])

# 패딩된 임베딩 생성
padded_X = np.array([np.pad(x, ((0, max_length - len(x)), (0, 0)), 'constant') for x in X])


In [None]:
from sklearn.metrics.pairwise import cosine_similarity

similarity_matrix = cosine_similarity(padded_X.reshape(len(padded_X), -1))
similarity_matrix


In [None]:
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

# X는 TF-IDF 벡터 또는 다른 임베딩 벡터
similarity_matrix = cosine_similarity(padded_X.reshape(len(padded_X), -1))
distance_matrix = 1 - similarity_matrix  # 코사인 거리

# DBSCAN 클러스터링
dbscan = DBSCAN(eps=0.735, min_samples=2, metric='precomputed')
clusters = dbscan.fit_predict(distance_matrix)

# 클러스터 결과
for cluster_id in set(clusters):
    if cluster_id != -1:  # -1은 노이즈
        print(f"Cluster {cluster_id}:")
        for i in np.where(clusters == cluster_id)[0]:
            print(f" - {news[i].title}")


In [None]:
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler


n_clusters = 30  # 클러스터의 개수를 설정
kmeans = KMeans(n_clusters=n_clusters, random_state=42)

# 3D 배열을 2D 배열로 변환
n_samples = padded_X.shape[0]  # 샘플 수
n_timesteps = padded_X.shape[1]  # 시간 스텝 수
n_features = padded_X.shape[2]  # 피처 수

# Reshape to (100, 240 * 4096)
reshaped_X = padded_X.reshape(n_samples, n_timesteps * n_features)

scaled = StandardScaler().fit_transform(reshaped_X)

labels = kmeans.fit_predict(scaled)
labels

In [None]:
for cluster in range(n_clusters):
    for j in range(len(summerized)):
        if labels[j] == cluster:
            print(cluster, summerized[j].title)

In [None]:
summerized[0].content

## Article 업로드

In [None]:
from wrapper.api_wrapper import ApiWrapper


api = ApiWrapper()
api.send("/article").json()

In [None]:
api.send(
    "/article",
    method="POST",
    auth=True,
    data={
    "title": "string",
    "content": "string",
    "publicationDate": "2024-10-20T08:42:38.012Z",
    "newsIdxes": [
        0
    ]
    }
)