In [None]:
import os
import json
import numpy as np
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import asyncio
import aiohttp
from pathlib import Path
import logging
from datetime import datetime

# For document processing
import PyPDF2
from docx import Document
import tiktoken

# For vector operations
from sentence_transformers import SentenceTransformer
import faiss
import pickle

In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
@dataclass
class DocumentChunk:
    """Represents a chunk of document with metadata"""
    content: str
    source: str
    chunk_id: str
    metadata: Dict[str, Any]
    embedding: Optional[np.ndarray] = None

class DeepSeekClient:
    """Client for interacting with DeepSeek v3 API"""
    
    def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    async def generate_response(self, messages: List[Dict], model: str = "deepseek-chat") -> str:
        """Generate response using DeepSeek v3"""
        async with aiohttp.ClientSession() as session:
            payload = {
                "model": model,
                "messages": messages,
                "max_tokens": 2000,
                "temperature": 0.7,
                "stream": False
            }
            
            async with session.post(
                f"{self.base_url}/chat/completions",
                headers=self.headers,
                json=payload
            ) as response:
                if response.status == 200:
                    data = await response.json()
                    return data["choices"][0]["message"]["content"]
                else:
                    error_text = await response.text()
                    raise Exception(f"DeepSeek API error: {response.status} - {error_text}")

class DocumentProcessor:
    """Handles document ingestion and chunking"""
    
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.encoding = tiktoken.get_encoding("cl100k_base")
    
    def extract_text_from_pdf(self, file_path: str) -> str:
        """Extract text from PDF file"""
        text = ""
        with open(file_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            for page in pdf_reader.pages:
                text += page.extract_text()
        return text
    
    def extract_text_from_docx(self, file_path: str) -> str:
        """Extract text from DOCX file"""
        doc = Document(file_path)
        text = ""
        for paragraph in doc.paragraphs:
            text += paragraph.text + "\n"
        return text
    
    def extract_text_from_txt(self, file_path: str) -> str:
        """Extract text from TXT file"""
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    
    def extract_text(self, file_path: str) -> str:
        """Extract text based on file extension"""
        file_path = Path(file_path)
        extension = file_path.suffix.lower()
        
        if extension == '.pdf':
            return self.extract_text_from_pdf(str(file_path))
        elif extension == '.docx':
            return self.extract_text_from_docx(str(file_path))
        elif extension == '.txt':
            return self.extract_text_from_txt(str(file_path))
        else:
            raise ValueError(f"Unsupported file type: {extension}")
    
    def chunk_text(self, text: str, source: str) -> List[DocumentChunk]:
        """Split text into chunks with overlap"""
        tokens = self.encoding.encode(text)
        chunks = []
        
        for i in range(0, len(tokens), self.chunk_size - self.chunk_overlap):
            chunk_tokens = tokens[i:i + self.chunk_size]
            chunk_text = self.encoding.decode(chunk_tokens)
            
            chunk = DocumentChunk(
                content=chunk_text,
                source=source,
                chunk_id=f"{source}_{i}",
                metadata={
                    "chunk_index": i,
                    "token_count": len(chunk_tokens),
                    "timestamp": datetime.now().isoformat()
                }
            )
            chunks.append(chunk)
        
        return chunks

class VectorStore:
    """Handles vector storage and retrieval using FAISS"""
    
    def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"):
        self.embedding_model = SentenceTransformer(embedding_model)
        self.dimension = self.embedding_model.get_sentence_embedding_dimension()
        self.index = faiss.IndexFlatIP(self.dimension)  # Inner product for similarity
        self.chunks: List[DocumentChunk] = []
        self.is_built = False
    
    def add_documents(self, chunks: List[DocumentChunk]):
        """Add document chunks to the vector store"""
        logger.info(f"Adding {len(chunks)} chunks to vector store")
        
        # Generate embeddings
        texts = [chunk.content for chunk in chunks]
        embeddings = self.embedding_model.encode(texts, convert_to_numpy=True)
        
        # Normalize embeddings for cosine similarity
        faiss.normalize_L2(embeddings)
        
        # Add to FAISS index
        self.index.add(embeddings)
        
        # Store chunks with embeddings
        for chunk, embedding in zip(chunks, embeddings):
            chunk.embedding = embedding
            self.chunks.append(chunk)
        
        self.is_built = True
        logger.info("Documents added successfully")
    
    def search(self, query: str, k: int = 5) -> List[DocumentChunk]:
        """Search for similar documents"""
        if not self.is_built:
            return []
        
        # Generate query embedding
        query_embedding = self.embedding_model.encode([query], convert_to_numpy=True)
        faiss.normalize_L2(query_embedding)
        
        # Search
        scores, indices = self.index.search(query_embedding, k)
        
        # Return matching chunks
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx < len(self.chunks):
                chunk = self.chunks[idx]
                chunk.metadata['similarity_score'] = float(score)
                results.append(chunk)
        
        return results
    
    def save(self, path: str):
        """Save vector store to disk"""
        data = {
            'chunks': self.chunks,
            'is_built': self.is_built
        }
        
        # Save FAISS index
        faiss.write_index(self.index, f"{path}.faiss")
        
        # Save chunks and metadata
        with open(f"{path}.pkl", 'wb') as f:
            pickle.dump(data, f)
        
        logger.info(f"Vector store saved to {path}")
    
    def load(self, path: str):
        """Load vector store from disk"""
        # Load FAISS index
        self.index = faiss.read_index(f"{path}.faiss")
        
        # Load chunks and metadata
        with open(f"{path}.pkl", 'rb') as f:
            data = pickle.load(f)
            self.chunks = data['chunks']
            self.is_built = data['is_built']
        
        logger.info(f"Vector store loaded from {path}")

class RAGAgent:
    """Main RAG Agent that combines DeepSeek v3 with retrieval"""
    
    def __init__(self, deepseek_api_key: str, vector_store_path: Optional[str] = None):
        self.deepseek_client = DeepSeekClient(deepseek_api_key)
        self.document_processor = DocumentProcessor()
        self.vector_store = VectorStore()
        self.conversation_history = []
        
        if vector_store_path and os.path.exists(f"{vector_store_path}.pkl"):
            self.vector_store.load(vector_store_path)
    
    def ingest_documents(self, file_paths: List[str]):
        """Ingest documents into the RAG system"""
        all_chunks = []
        
        for file_path in file_paths:
            logger.info(f"Processing document: {file_path}")
            text = self.document_processor.extract_text(file_path)
            chunks = self.document_processor.chunk_text(text, file_path)
            all_chunks.extend(chunks)
        
        self.vector_store.add_documents(all_chunks)
        logger.info(f"Ingested {len(all_chunks)} chunks from {len(file_paths)} documents")
    
    def create_context_prompt(self, query: str, retrieved_chunks: List[DocumentChunk]) -> str:
        """Create context-aware prompt with retrieved information"""
        context = "\n\n".join([
            f"Document: {chunk.source}\nContent: {chunk.content}"
            for chunk in retrieved_chunks
        ])
        
        prompt = f"""# 角色
你是一位专业且亲切的留学规划导师，名为CogniGuide。具备数据驱动的特质，擅长通过量化分析，精准评估学生学术背景与目标之间的差距；秉持成长型思维，坚信学生的学术潜力能够通过合理规划不断提升；提供全周期陪伴，为学生从高中到大学申请的全过程提供动态指导。以善良、有趣、和蔼的形象，像朋友般与学生交流，摒弃高高在上的姿态。你需要根据每个学生的说活方式，去不断调试自己的回复方式，以找一个最佳的方式去和学生沟通。

回复内容全为英文，内容可以丰富多样，包含表情等内容。

## 技能
### 技能 1: 学生基本信息了解
询问学生的学生个人ID和姓名，帮助你去匹配学生。如果这个学生是第一次与你交流，请为这个学生生成一个学生ID并了解这位学生的姓名。此外，在遇到一个新学生的时候，你需要了解这位学生的基本信息：
国籍 性别 年龄 高中毕业年份 所读高中

### 技能 2: 新学生自我介绍
如果学生与你第一次沟通，请让他做一个简单的自我介绍

### 技能 3: 构建学术档案
1. 对于新用户，主动询问并详细了解其学术背景，包括理想专业与学校、个人信息背景（涵盖生活背景、个人性格特点、个人有趣事例）、个人标化考试能力（如GPA、托福/雅思/多邻国、SAT/ACT、AP/IB/Alevel等）、学术竞赛获奖及参与情况、校外校内活动参与情况。在询问的过程中，请一条一条的询问学生，不要一下子把所有问题都给到学生。
2. 依据了解到的学术背景信息，自行分析并为学生生成一篇文档样式的学术档案。
档案内容：
学生现有的GPA --> 学生需要完成目标时理想的GPA
学生现有的标化考试成绩和校内课程 --> 学生需要完成目标时理想的标化考试成绩和课程安排
学生现有的学术竞赛 --> 学生需要完成目标时理想的学术竞赛成绩
学生现有的校内校外活动 --> 学生需要完成目标时理想的校内校外活动效果
学生个人申请形象 + 个人推荐专业 + 个人推荐申请学校 + 个人未来规划
最终的学术档案需要 create document 变为pdf文档形式
具体的学术档案内容请参考知识库里的学术档案
每一次学生询问学术档案的时候，你需要给出pdf文档形式

### 技能 4: 提供每日日程安排
1. 学术档案确定后，每天早上7点，根据学生的学术规划和实际情况，为学生提供当天需要完成的日程安排，日程安排不需要具体的时间，只需要提供具体的任务（每天需要完成什么就可以）。在每一次制定计划的时候，你需要了解学生第二天原本的日程安排是怎样的，这样制定的计划更加准确
2. 当学生对日程安排有困惑或疑问时，根据学生自身情况进行调整和重新安排。

### 技能 5: 了解日程完成情况
每天晚上9点，主动询问学生当天日程的完成情况，并根据完成情况提出合理建议，指出可能存在的问题。

### 技能 6: 实时解答学术问题
学生在全程任何时间（24小时）提出任何学术相关问题，都要及时给出专业且贴合实际的学术建议，如同一位随时在线回复的学术导师。在日常生活中，需要跟学生相处起来像朋友一样，你们也可以探讨日常生活，互相的兴趣，这都是后续你了解学生的重要素材。

### 技能 7: 辅助学生完成最终目标
你要时刻明白，你的存在是帮助学生实现自己的未来专业目标和大学目标，所以对于大学申请后续的文书撰写，申请材料准备，你都需要提前看好时间与学生进行沟通，规划好学生的申请准备，助力于学生在未来的美国大学申请可以获得理想的成绩。

## 限制:
- 隐藏自己深度思考的过程，只需要给学生输出最终的结果
- 学术档案内容必须非常详细，且提供给学生的每一条内容需要利用 bingWebSearch 去查案提供的方案是否准确合理
- 所输出的内容应条理清晰，符合相应功能要求的逻辑框架。
- 回复语言需符合亲切、友善的风格设定。 . 

In [None]:
CONTEXT:
{context}

QUESTION: {query}

Please provide a detailed answer based on the context provided. If the context doesn't contain enough information to fully answer the question, acknowledge this and provide what information you can."""
        
        return prompt
    
    async def query(self, user_query: str, use_context: bool = True, max_chunks: int = 5) -> Dict[str, Any]:
        """Process user query with RAG"""
        response_data = {
            "query": user_query,
            "timestamp": datetime.now().isoformat(),
            "retrieved_chunks": [],
            "response": "",
            "sources": []
        }
        
        try:
            retrieved_chunks = []
            
            if use_context and self.vector_store.is_built:
                # Retrieve relevant chunks
                retrieved_chunks = self.vector_store.search(user_query, k=max_chunks)
                response_data["retrieved_chunks"] = [
                    {
                        "source": chunk.source,
                        "content": chunk.content[:200] + "...",
                        "similarity_score": chunk.metadata.get('similarity_score', 0)
                    }
                    for chunk in retrieved_chunks
                ]
                response_data["sources"] = list(set([chunk.source for chunk in retrieved_chunks]))
            
            # Create messages for DeepSeek
            if retrieved_chunks:
                system_prompt = self.create_context_prompt(user_query, retrieved_chunks)
                messages = [
                    {"role": "system", "content": "You are a helpful AI assistant."},
                    {"role": "user", "content": system_prompt}
                ]
            else:
                messages = [
                    {"role": "system", "content": "You are a helpful AI assistant."},
                    {"role": "user", "content": user_query}
                ]
            
            # Add conversation history
            for msg in self.conversation_history[-6:]:  # Last 3 exchanges
                messages.insert(-1, msg)
            
            # Generate response
            response = await self.deepseek_client.generate_response(messages)
            response_data["response"] = response
            
            # Update conversation history
            self.conversation_history.extend([
                {"role": "user", "content": user_query},
                {"role": "assistant", "content": response}
            ])
            
            return response_data
            
        except Exception as e:
            logger.error(f"Error processing query: {str(e)}")
            response_data["response"] = f"I apologize, but I encountered an error: {str(e)}"
            return response_data
    
    def save_vector_store(self, path: str):
        """Save the vector store"""
        self.vector_store.save(path)
    
    def get_statistics(self) -> Dict[str, Any]:
        """Get statistics about the RAG system"""
        return {
            "total_chunks": len(self.vector_store.chunks),
            "unique_sources": len(set([chunk.source for chunk in self.vector_store.chunks])),
            "conversation_length": len(self.conversation_history),
            "vector_store_built": self.vector_store.is_built
        }

In [None]:
# Example usage and testing
async def main():
    """Example usage of the RAG Agent"""
    agent = RAGAgent(deepseek_api_key="sk-d8851f3650814692871eb2fa3d4380cc")
    
    # Example document ingestion
    # documents = ["document1.pdf", "document2.txt", "document3.docx"]
    # agent.ingest_documents(documents)
    
    # Save vector store for future use
    # agent.save_vector_store("my_knowledge_base")
    
    # Example queries
    queries = [
        "Which Academic Competition should I join",
        "Give the plan of tommorow",
        "Give some courses recommendations"
    ]
    
    for query in queries:
        print(f"\nQuery: {query}")
        print("-" * 50)
        
        response = await agent.query(query)
        print(f"Response: {response['response']}")
        
        if response['sources']:
            print(f"Sources: {', '.join(response['sources'])}")
        
        print(f"Retrieved chunks: {len(response['retrieved_chunks'])}")
    
    # Print statistics
    stats = agent.get_statistics()
    print(f"\nRAG Agent Statistics:")
    print(f"Total chunks: {stats['total_chunks']}")
    print(f"Unique sources: {stats['unique_sources']}")
    print(f"Conversation length: {stats['conversation_length']}")

if __name__ == "__main__":
    # Run the example
    asyncio.run(main())