In [30]:
from celery import Celery
import os
from dotenv import load_dotenv
from ollama import Client
import httpx
from langchain_community.chat_models.ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate

In [31]:
# markdown 파일 생성
def save_md_to_file(markdown_str : str, directory : str, file_name : str) :
    # 디렉토리가 존재하지 않으면 생성
    if not os.path.exists(directory):
        os.makedirs(directory)
        
     # 파일 경로 생성
    file_path = os.path.join(directory, file_name)
    
    with open(file_path, 'w', encoding='utf-8') as f:
        f.write(markdown_str)
    
    # 파일 객체 정보 변환
    file_info = {
        'file_path' : file_path,
        'file_name' : os.path.getsize(file_path)
    }
    
    return file_info

In [32]:
# 각 파일당 하나씩 프롬프트 생성하기
user_prompt = '''
You are an experienced technical writer skilled in documenting and formatting code reviews. 
Your task is to take the results from a code review and format them into a markdown file. 
The markdown file must be written in English and should include the following sections: Analysis Summary, Key Features, Pre-condition Check, Runtime Error Check, Optimization, Security Issue, and Evaluation. 
Each section should be formatted appropriately:

- **Analysis Summary:** Summarize the code review findings in one or two lines.
- **Key Features:** Analyze and describe the key features of the added or modified files.
- **Precondition checks:** Checks that a function or method has the necessary variable states or ranges of values to function correctly.
- **Runtime error checking:** Examines code for possible runtime errors and identifies other potential risks.
- **Optimization:** Scan your code and recommend optimized code. When recommending code, be sure to include the full source of the file. Please write your code using code blocks to conform to the markdown format - this is a must. 
- **Security issues:** Scans your code to see if it uses modules with serious security flaws or contains security vulnerabilities.
- **Evaluation:** Comprehensively evaluates your work. Consider the quality, functionality, and maintainability of the code.
            
Ensure the markdown document is clear, well-structured, and easy to read.
```python
# main.py
import requests
from bs4 import BeautifulSoup
from langchain.agents import initialize_agent, AgentType
import json
import random
import re
from fastapi.requests import Request
from fastapi import FastAPI, HTTPException, Query, Header, Depends
from dotenv import load_dotenv
import os
from langchain.schema import StrOutputParser
import time
from urllib.parse import urlparse
from langchain_openai.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import SystemMessage

llm = ChatOpenAI(
    temperature=0.1,
    model="gpt-4o",  
    openai_api_key="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"
)

# .env 파일에서 환경 변수 로드
load_dotenv()

# .env 파일에서 EXPECTED_VALUE 변수 가져오기
EXPECTED_VALUE = os.getenv("EXPECTED_VALUE")

app = FastAPI()

# Dictionary to keep track of successfully parsed URLs
parsed_urls = {}


def extract_data_from_script(soup, script_name):
    # Find the script tag containing the desired variable
    script_tag = soup.find('script', string=re.compile(f"{script_name}"))

    if script_tag:
        # Extract the variable's value using regex
        match = re.search(f"{script_name} = (.*);", script_tag.string)
        if match:
            data_json = match.group(1)
            return data_json

    return None


def extract_specific_values(data):
    specific_values = {}

    def find_values(obj):
        if isinstance(obj, dict):
            for key, value in obj.items():
                if "Menu" in key or "PlaceDetailBase" in key:
                    specific_values[key] = value
                find_values(value)
        elif isinstance(obj, list):
            for item in obj:
                find_values(item)

    find_values(data)
    return specific_values


def extract_specific_values_v2(data):
    specific_values = {}

    def find_values(obj):
        if isinstance(obj, dict):
            for key, value in obj.items():
                if "TripSummary" in key:
                    specific_values[key] = value
                find_values(value)
        elif isinstance(obj, list):
            for item in obj:
                find_values(item)

    find_values(data)
    return specific_values

# Define a custom dependency to check the header value


def check_header_value(x_custom_header: str = Header(None, convert_underscores=True)):
    if x_custom_header != "expected_value":
        raise HTTPException(status_code=400, detail="Invalid header value")
    return x_custom_header

# Middleware to check the header value for all requests


@app.middleware("http")
async def check_header_middleware(request: Request, call_next):
    x_custom_header = request.headers.get("x-custom-header")
    if x_custom_header != "expected_value":
        raise HTTPException(status_code=400, detail="Invalid header value")
    response = await call_next(request)
    return response


@app.middleware("https")
async def check_header_middleware(request: Request, call_next):
    x_custom_header = request.headers.get("x-custom-header")
    if x_custom_header != "expected_value":
        raise HTTPException(status_code=400, detail="Invalid header value")
    response = await call_next(request)
    return response

# 랜덤 프록시
def random_us_proxy() :
    proxy_url = "https://www.us-proxy.org/"

    res = requests.get(proxy_url)
    soup = BeautifulSoup(res.text,'lxml')

    table = soup.find('tbody')
    rows = table.find_all('tr')
    proxy_server_list = []

    for row in rows:
        https = row.find('td', attrs = {'class':'hx'})
        if https.text == 'yes':
            ip = row.find_all('td')[0].text
            port = row.find_all('td')[1].text
            server = f"{ip}:{port}"
            proxy_server_list.append(server)

    proxy_server = random.choices(proxy_server_list)[0]
    return proxy_server


@app.get("/bot/place")
async def get_place_info(keyword: str = Query(..., title="keyword"), x_custom_header: str = Depends(check_header_value)):
    if not keyword:
        raise HTTPException(status_code=403, detail="No keyword provided")

    # URL-encode the keyword
    # encoded_keyword = keyword.encode('utf-8').decode('latin1')

    # 딜레이
    time.sleep(1.3)
    url = f"https://m.search.naver.com/search.naver?sm=mtp_sly.hst&where=m&query={keyword}&acr=1"    
    print(f"url : {url}")

    # Skip parsing if URL has been parsed successfully or failed before
    if url in parsed_urls:
        if parsed_urls[url] == "success":
            return parsed_urls[url]
        else:            
            raise HTTPException(
                status_code=404, detail="Parsing previously failed")

    # 프록시 추가        
    # proxy_server = random_us_proxy()
    # proxies = {"http": 'http://' + proxy_server, 'https': 'http://' + proxy_server}

    response = requests.get(url)
    print(f"response status_code  : {response.status_code}")
    if response.status_code != 200:
        parsed_urls[url] = "fail"
        print(response.content)
        raise HTTPException(status_code=404, detail="Parsing failed")

    soup = BeautifulSoup(response.content, 'html.parser')

    target_tags = soup.find_all('a', href=lambda href: href and href.startswith(
        "https://m.place.naver.com/place"))
    
    results = []

    # target_tags에 중복된 값 제거
    target_tags = list(set(target_tags))

    for tag in target_tags:
        href = tag['href']  # Get the href attribute of the tag as a string
        # Remove "/photo?entry=pll" from href if present
        if "/photo" in href:
            href = href.replace("/photo", "")
        if "/home" in href:
            href = href.replace("/home", "")

        # Skip parsing if URL has been parsed successfully or failed before
        # if href in parsed_urls:
        #    print("already parsed")
        #    continue
            

        # 딜레이
        time.sleep(1.5)

        payload = { 'api_key': 'bfc02298429da30d46250183fb72683b', 'url': href }
        response = requests.get('https://api.scraperapi.com/', params=payload)    

        # proxy_server = random_us_proxy()
        # proxies = {"http": 'http://' + proxy_server, 'https': 'http://' + proxy_server}
        # response = requests.get(href, proxies=proxies)
        print(f"href : {href}")
        # print(f"response : {response.content}")
        print(f"response status_code  : {response.status_code}")
  
        if response.status_code != 200:
            parsed_urls[href] = "fail"
            continue

        soup = BeautifulSoup(response.content, 'html.parser')

        # Extract the JSON data from window.__APOLLO_STATE__
        data_json = extract_data_from_script(soup, "window.__APOLLO_STATE__")
        # print(f"data_json : {data_json}")        

        if data_json:
            # Parse the JSON data
            data = json.loads(data_json)

            # Extract specific values as key-value pairs
            specific_values = extract_specific_values(data)
            # print(f"data : {data}")
            print(f"specific_values : {specific_values}")

            results.append(specific_values)
            parsed_urls[href] = "success"

    if not results:
        parsed_urls[url] = "fail"
        print("fail to find")
        raise HTTPException(status_code=404, detail="No relevant data found")

    return results


@app.get("/bot/trend")
async def get_place_info(keyword: str = Query(..., title="keyword"), x_custom_header: str = Depends(check_header_value)):
    if not keyword:
        raise HTTPException(status_code=403, detail="No keyword provided")

    results = []
    url = f"https://trip.place.naver.com/list?query={keyword}&theme=6&x=126.9783882&y=37.5666103&level=top"
    response = requests.get(url)
    # 크롤링 실패 시
    if response.status_code != 200:
        print("fail cant find")

    soup = BeautifulSoup(response.content, 'html.parser')

    # Extract the JSON data from window.__APOLLO_STATE__
    data_json = extract_data_from_script(soup, "window.__APOLLO_STATE__")

    if data_json:
        # Parse the JSON data
        data = json.loads(data_json)

        # Extract specific values as key-value pairs
        specific_values = extract_specific_values_v2(data)

        results.append(specific_values)

    if not results:
        parsed_urls[url] = "fail"
        print("fail to find")
        raise HTTPException(status_code=404, detail="No relevant data found")

    return results


@app.get("/bot/blog")
async def get_blog_info(keyword: str = Query(..., title="keyword"), x_custom_header: str = Depends(check_header_value)):
    if not keyword:
        raise HTTPException(status_code=403, detail="No keyword provided")
    # naver crawling
    result, url_result = crawling_naver_blog(keyword=keyword)

    if result == []:
        raise HTTPException(status_code=404, detail="No Crawling result")

    # llm 랭체인 생성
    # create prompt
    summary_prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                """
                당신은 사용자들에게 여행지 정보를 요약해주는 도우미입니다.
                아래는 {keyword}에 대한 최근 글을 모아놓은 데이터입니다.                                         
                
                제공된 데이터를 사용자가 이해하기 쉽도록 최소 3개에서 최대 10개까지로 요약해주세요.
                중복된 내용은 모두 제거해주세요.
                제공된 데이터에 없는 내용은 지어내지 말아주세요.   
                {keyword}에 관한 내용만 요약해주세요.
                
                각 특징은 구분할 수 있게, 개행문자로 나눠주세요.                
                문장의 끝은 해요체로 끝나야 합니다.
                                
                ------------
                {context}
                ------------
                """,
            ),
        ]
    )

    # langchain
    # create chain
    summary_chain = summary_prompt | llm | StrOutputParser()
    summary_result = summary_chain.invoke({
        "keyword": keyword,
        "context": format_docs(result),
    })
    return {"result": summary_result, "urls": url_result}


def crawling_naver_blog(keyword):
    blog_result = []
    url_result = []
    # 1. 검색어로 블로그 목록 조회
    url = f"https://m.search.naver.com/search.naver?ssc=tab.m_blog.all&sm=mtb_jum&query={keyword}"
    response = requests.get(url)

    if response.status_code != 200:
        print("fail cant find")
        return []

    soup = BeautifulSoup(response.content, 'html.parser')

    target_tags = soup.find_all('ul', class_='lst_view')
    if not target_tags:
        print("fail cant find2")
        return []

    results = []

    # 각 ul 태그에 대해
    for ul in target_tags:
        # li 태그를 모두 찾습니다.
        li_tags = ul.find_all('li')
        for li in li_tags:
            # li > div.view_wrap > div.detail_box > div.title_area > a 태그를 찾습니다.
            a_tag = li.select_one(
                'div.view_wrap > div.detail_box > div.title_area > a')
            if a_tag is not None:
                # href와 inner HTML 데이터를 가져옵니다.
                href = a_tag.get('href')
                # 결과에 추가합니다.
                results.append(href)

    if results.__len__() == 0:
        print("fail cant find3")
        return []

    # 2, 3, 4는 실제 백엔드에서 구현해야함.
    # 5. 해당 url로 scraper api 호출
    # 최근꺼 2개만 조회하자    
    for result in results[:3]:
        parsed_url = urlparse(result)

        body = ""
        # 도메인 다음에 오는 데이터
        path = parsed_url.path
        # '/'를 기준으로 데이터를 분리해서 리스트에 담기
        path_list = path.split('/')[1:]

        # 각 데이터를 변수에 담기
        try:
            var1, var2 = path_list
        except ValueError:
            print("parse error..!")
            continue

        new_naver_url = f"https://m.blog.naver.com/PostView.naver?blogId={var1}&logNo={var2}&proxyReferer=https%3A%2F%2Fm.search.naver.com%2F"

        # # blog_result에 이미 수집한 url인 경우 넘어감
        if new_naver_url in url_result:
            print("already parsed")
            continue

        r = requests.post(url='https://async.scraperapi.com/jobs',
                          json={'apiKey': 'f8fc33de41c4641274044d4fe69380f4', 'urls': [new_naver_url]})

        # 주어진 JSON 응답
        response_json = json.loads(r.text)

        # statusUrl 값 추출
        status_url = response_json[0]['statusUrl']

        # statusUrl로 HTTP 요청 보내기
        # 완료가 될때 까지 반복
        # 아직 진행 중이면 2초 후 다시 요청
        # 최대 5번까지 실행
        for i in range(5):
            response = requests.get(status_url)
            response_json = json.loads(response.text)
            if response_json['status'] == 'finished':
                body = response_json['response']['body']
                break
            else:
                time.sleep(2)

        if body != "":
            # body를 BeautifulSoup으로 파싱
            soup = BeautifulSoup(body, 'html.parser')
            # div._postView > div.post_ct > div.se-viewer > div.se-main-container > div.se-text 만 전부 추출
            target_tags = soup.select(
                'div._postView > div.post_ct > div.se-viewer > div.se-main-container > div.se-text')
            # html 태그와 주석 처리를 모두 제거하고 텍스트만 남긴다음, 텍스트를 하나로 합침
            result = ' '.join([tag.text.strip() for tag in target_tags])
            blog_result.append(result)
            url_result.append(new_naver_url)
        time.sleep(3)
    # blog_result의 중복값 제거
    blog_result = list(set(blog_result))
    return blog_result, url_result


def format_docs(result):
    # blog_result를 개행문자를 넣어서 하나로 합침
    # document formatting
    return "\n\n".join(result)  # 개행라인 추가


@app.get("/bot/blog/agent")
async def get_blog_info_agent(keyword: str = Query(..., title="keyword"), x_custom_header: str = Depends(check_header_value)):
    if not keyword:
        raise HTTPException(status_code=403, detail="No keyword provided")
    # naver crawling
    result, url_result = crawling_naver_blog(keyword=keyword)

    if result == []:
        raise HTTPException(status_code=404, detail="No Crawling result")

    agent = initialize_agent(
        llm=llm,
        handle_parsing_erros=True,
        agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
        # 시스템 프롬프트 변경
        agent_kwargs={
            "system_message": SystemMessage(
                content="""
                당신은 사용자들에게 여행지 정보를 요약해주는 도우미입니다.
                아래는 {keyword}를(을) 다녀온 블로거들이 작성한 블로그 글을 모아놓은 데이터입니다.                                         
                
                제공된 데이터를 사용자가 이해하기 쉽도록 최소 3개에서 최대 10개까지로 요약해주세요.
                중복된 내용은 모두 제거해주세요.
                제공된 데이터에 없는 내용은 지어내지 말아주세요.   
                볼로거의 가족들에 관한 이야기는 빼고, {keyword}에 관한 내용만 사용해주세요.
                
                각 특징은 구분할 수 있게, 개행문자로 나눠주세요.                
                문장의 끝은 해요체로 끝나야 합니다.
                                
                ------------
                {context}
                ------------
                """,
            )
        },
    )
    summary_result = agent.invoke({
        "keyword": keyword,
        "context": format_docs(result),
    })
    return {"result": summary_result["output"], "urls": url_result}
```
'''

In [33]:
# llm 생성
llama_llm = ChatOllama(           
    base_url = "http://localhost:11434",
    model="llama3.1",
    temperature=0,
)     
mistral_llm = ChatOllama(
    base_url = "http://localhost:11434",
    model="mistral",
    temperature=0,
)     


In [34]:
# prompt 생성
code_review_messages = ChatPromptTemplate.from_messages([
    ("system", """
    You're a very good software analyst. From now on, users will show you the entire committed source code. Take a look at it, analyze it, and tell us what you find. 
    After looking at the source code, present an optimized version of the code, including performance improvements. 

    As you analyze the source code, keep the following topics in mind as you do so
    - Analysis summary: Summarize your code review findings in one or two lines.
    - Key features: Analyze and describe the key features of each file.
    - Prerequisite checks: Verify that the function or method has the necessary variable states or value ranges to function correctly.
    - Runtime error checking: Inspect your code for possible runtime errors and identify other potential risks.
    - Optimization: Scans code patches for optimization points and recommends optimized code if it appears to be degrading performance. 
    - Security issues: Scan your code to see if it uses modules with serious security flaws or contains security vulnerabilities.
    - Evaluation: Evaluate your work comprehensively. Considers the quality, functionality, and maintainability of the code.

    Return a response in markdown so that your analysis is easy to parse.
    The topics above are the same as the subheadings in your final analysis. In particular, be sure to write the entire code in the form of code blocks in Markdown for optimizations.
    The Markdown documentation must be written in Korean.
    Do not write any additional text other than the response values in Markdown format.
    """),
    ("human", "{prompt}")
])
translate_messages = ChatPromptTemplate.from_messages([
    ("system", """
    You are an excellent translator. Please translate the markdown written by the user into Korean. 
    At this time, for the content in the code block, only the annotation should be translated, and the code should be left as it is.
    """),
    ("human", "{review_result}")
])

In [35]:
# chain 생성
# 1. 코드 분석 실행. 어떤 코드고, 리팩토링하면 좋은 것 까지.
review_chain = code_review_messages | llama_llm

In [36]:
# 2. 해당 코드 분석을 포멧에 맞게 변환.
translate_chain = translate_messages | mistral_llm

In [37]:
# 체인 실행
final_chain = {"review_result" : review_chain} | translate_chain

In [38]:
# 데이터 리턴
result = final_chain.invoke({
    "prompt" : user_prompt
})

In [39]:
# 파일생성
file_dir = "./"
file_name = "example.md"
file_info = save_md_to_file(result.content, file_dir, file_name)