In [1]:
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
import re
import os, json

from textwrap import dedent
from pprint import pprint

import warnings
warnings.filterwarnings("ignore")

In [3]:
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma

embeddings_model = OpenAIEmbeddings(
    model="text-embedding-3-small",
)

In [5]:
from typing import TypedDict, Annotated, Sequence, Dict, Any
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langgraph.graph import Graph, StateGraph
from langchain.tools import tool
import operator
from typing import List, Dict, Any
import requests
from bs4 import BeautifulSoup
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
import streamlit as st
import time

In [6]:
# 상태 정의
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    memory: Dict[str, Any]
    checkpoints: List[Dict]
    next: str


In [None]:
# 웹 스크래핑 도구
@tool
def search_web(query: str) -> str:
    """인터넷에서 자세 교정 관련 최신 정보를 검색합니다"""
    try:
        results = ddg(query, max_results=3)
        return "\n".join([f"제목: {r['title']}\n내용: {r['body']}\n링크: {r['link']}\n" for r in results])
    except Exception as e:
        return f"검색 중 오류 발생: {str(e)}"

@tool
def scrape_posture_info(url: str) -> str:
    """특정 웹사이트의 내용을 스크랩하여 Chroma DB에 저장하고 관련 정보를 검색합니다"""
    try:
        # 웹 스크래핑
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'html.parser')
        
        title = soup.title.string if soup.title else "제목 없음"
        content = soup.find('article') or soup.find('main') or soup.find('body')
        
        # 이미지 URL 추출
        images = []
        if content:
            for img in content.find_all('img'):
                src = img.get('src', '')
                alt = img.get('alt', '이미지 설명 없음')
                if src and src.startswith(('http://', 'https://')):
                    images.append({'url': src, 'alt': alt})
                elif src:  # 상대 경로를 절대 경로로 변환
                    full_url = requests.compat.urljoin(url, src)
                    images.append({'url': full_url, 'alt': alt})

        # 동영상 URL 추출
        videos = []
        if content:
            # YouTube iframes
            for iframe in content.find_all('iframe'):
                src = iframe.get('src', '')
                if 'youtube' in src or 'vimeo' in src:
                    videos.append({'url': src, 'type': 'embed'})
            
            # Video tags
            for video in content.find_all('video'):
                src = video.get('src', '')
                if src:
                    if not src.startswith(('http://', 'https://')):
                        src = requests.compat.urljoin(url, src)
                    videos.append({'url': src, 'type': 'video'})

        # 텍스트 추출 및 청크 분할
        text = content.get_text(strip=True) if content else ""
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        chunks = text_splitter.split_text(text)
        
        # 각 청크에 미디어 정보 포함하여 저장
        docs = []
        for i, chunk in enumerate(chunks):
            # 청크별로 관련 미디어 메타데이터 포함
            metadata = {
                "source": url,
                "title": title,
                "images": images[i:i+2] if images else [],  # 각 청크당 최대 2개 이미지
                "videos": videos[i:i+1] if videos else []   # 각 청크당 최대 1개 동영상
            }
            docs.append(Document(page_content=chunk, metadata=metadata))

        # Chroma DB 중복 체크 추가
        vector_store = Chroma(
        collection_name="posture_info",
        embedding_function=embeddings_model,
        persist_directory="./chroma_db"
        )
    
        # URL 기반 중복 체크
        existing_docs = vector_store.get(
        where={"source": url}
        )
    
        if not existing_docs:
            vector_store.add_documents(docs)
        
        # 관련 정보 검색
        results = vector_store.similarity_search(
            "자세 교정 방법과 팁",
            k=3
        )
        
        # 검색 결과 포맷팅 (미디어 포함)
        response = f"웹사이트 제목: {title}\nURL: {url}\n\n관련 정보:\n"
        
        for i, doc in enumerate(results, 1):
            response += f"\n{i}. {doc.page_content[:300]}...\n"
            
            # 이미지 정보 추가
            if doc.metadata.get('images'):
                response += "\n관련 이미지:\n"
                for img in doc.metadata['images']:
                    response += f"![{img['alt']}]({img['url']})\n"
            
            # 동영상 정보 추가
            if doc.metadata.get('videos'):
                response += "\n관련 동영상:\n"
                for video in doc.metadata['videos']:
                    if video['type'] == 'embed':
                        response += f"임베드 동영상: {video['url']}\n"
                    else:
                        response += f"비디오 링크: {video['url']}\n"
            
            response += "\n---\n"
            
        return response
        
    except Exception as e:
        return f"스크래핑 및 검색 중 오류 발생: {str(e)}"

In [None]:
# ReAct 에이전트 정의 수정
def create_agent():
    llm = ChatOpenAI(model="gpt-4o-mini")
    tools = [search_web, scrape_posture_info]
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", """당신은 자세 교정 전문가입니다. 사용자의 자세 문제를 분석하고 개선 방법을 제안합니다.
        필요한 경우 다음 도구들을 사용할 수 있습니다:
        1. search_web: 최신 자세 교정 정보 검색
        2. scrape_posture_info: 특정 URL에서 자세한 정보 추출
        
        검색이나 스크래핑 결과를 바탕으로 사용자 친화적인 전문적인 조언을 제공하세요."""),
        ("user", "{input}")
    ])
    
    return llm.bind(prompt)

In [None]:
# 메모리 체크포인트 관리
def save_checkpoint(state: AgentState) -> AgentState:
    # 현재 시간 추가
    from datetime import datetime
    
    checkpoint = {
        "timestamp": datetime.now().isoformat(),
        "conversation": state["messages"],
        "context": state["memory"]
    }
    state["checkpoints"].append(checkpoint)
    return state


In [None]:
# 그래프 구성
def create_graph():
    workflow = StateGraph(AgentState)
    
    # 노드 추가
    workflow.add_node("agent", create_agent())
    workflow.add_node("checkpoint", save_checkpoint)
    
    # 엣지 연결
    workflow.add_edge("agent", "checkpoint")
    workflow.add_edge("checkpoint", "agent")
    
    workflow.set_entry_point("agent")
    return workflow.compile()

In [None]:
#streamlit
def create_streamlit_app():
    st.set_page_config(
        page_title="자세 교정 AI 트레이너",
        page_icon="🧘‍♀️",
        layout="wide"
    )
    
    st.title("자세 교정 AI 트레이너 🧘‍♀️")
    st.markdown("자세 교정 관련 URL을 공유하시면 해당 내용을 분석하여 조언해드립니다.")

    # 세션 상태 초기화
    if "messages" not in st.session_state:
        st.session_state.messages = []
        st.session_state.graph = create_graph()

    # 채팅 히스토리 표시
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            if message.get("images") or message.get("videos"):
                # 텍스트 먼저 표시
                st.markdown(message["content"])
                
                # 이미지 표시
                if message.get("images"):
                    cols = st.columns(len(message["images"]))
                    for idx, img in enumerate(message["images"]):
                        with cols[idx]:
                            st.image(img["url"], caption=img["alt"])
                
                # 비디오 표시
                if message.get("videos"):
                    for video in message["videos"]:
                        if video["type"] == "embed":
                            st.video(video["url"])
                        else:
                            st.markdown(f"[비디오 보기]({video['url']})")
            else:
                st.markdown(message["content"])

    # 사용자 입력
    if prompt := st.chat_input("메시지를 입력하세요"):
        st.session_state.messages.append({"role": "user", "content": prompt})
        
        with st.chat_message("user"):
            st.markdown(prompt)

        # URL 확인 및 처리
        with st.chat_message("assistant"):
            with st.spinner("답변 생성 중..."):
                state = {
                    "messages": [HumanMessage(content=prompt)],
                    "memory": {},
                    "checkpoints": [],
                    "next": "agent"
                }
                
                url_pattern = r'https?://[^\s]+'
                urls = re.findall(url_pattern, prompt)
                
                if urls:
                    for url in urls:
                        info = scrape_posture_info(url)
                        # 스크랩 정보에서 미디어 메타데이터 추출
                        media_info = extract_media_from_response(info)
                        response = process_response(info, media_info)
                else:
                    result = st.session_state.graph.invoke(state)
                    response = {
                        "content": result["messages"][-1].content,
                        "images": [],
                        "videos": []
                    }

                st.markdown(response["content"])
                
                # 이미지 표시
                if response.get("images"):
                    cols = st.columns(len(response["images"]))
                    for idx, img in enumerate(response["images"]):
                        with cols[idx]:
                            st.image(img["url"], caption=img["alt"])
                
                # 비디오 표시
                if response.get("videos"):
                    for video in response["videos"]:
                        if video["type"] == "embed":
                            st.video(video["url"])
                        else:
                            st.markdown(f"[비디오 보기]({video['url']})")

                st.session_state.messages.append({
                    "role": "assistant",
                    "content": response["content"],
                    "images": response.get("images", []),
                    "videos": response.get("videos", [])
                })

def extract_media_from_response(response: str) -> dict:
    """스크래핑 응답에서 미디어 정보 추출"""
    images = []
    videos = []
    
    # 이미지 URL 추출
    img_pattern = r'!\[(.*?)\]\((.*?)\)'
    for alt, url in re.findall(img_pattern, response):
        images.append({"url": url, "alt": alt})
    
    # 비디오 URL 추출
    video_lines = [line for line in response.split('\n') 
                  if '임베드 동영상:' in line or '비디오 링크:' in line]
    for line in video_lines:
        url = line.split(': ')[1].strip()
        videos.append({
            "url": url,
            "type": "embed" if '임베드' in line else "video"
        })
    
    return {"images": images, "videos": videos}

def process_response(response: str, media_info: dict) -> dict:
    """응답 처리 및 포맷팅"""
    return {
        "content": response,
        "images": media_info["images"],
        "videos": media_info["videos"]
    }

if __name__ == "__main__":
    create_streamlit_app()

streamlit run app.py

상태 관리


메모리 체크포인트

그래프 구조

LangGraph를 사용하여 에이전트와 체크포인트 노드를 연결합니다.
Gradio 인터페이스

채팅 인터페이스를 제공하여 사용자와 상호작용합니다.
실제 구현 시에는 다음 사항들을 추가로 구현해야 합니다:

실제 웹 스크래핑 로직
자세한 프롬프트 엔지니어링
에러 처리
메모리 관리 최적화
보안 관련 기능