In [None]:
import os
import json
import asyncio
import logging
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import numpy as np
from pathlib import Path

# Core dependencies
import openai
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.docstore.document import Document
from langchain.retrievers import VectorStoreRetriever
from langchain.chains import RetrievalQA
import fitz  # PyMuPDF for PDF processing
import docx  # python-docx for Word documents

In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
@dataclass
class LegalQuery:
    """Structure for legal queries"""
    query_id: str
    user_id: str
    query_text: str
    query_type: str  # contract_analysis, legal_research, compliance_check, etc.
    jurisdiction: str
    urgency: str  # low, medium, high, urgent
    timestamp: datetime
    context: Dict[str, Any] = None

@dataclass
class LegalResponse:
    """Structure for legal responses"""
    query_id: str
    response_text: str
    confidence_score: float
    sources: List[Dict[str, str]]
    legal_citations: List[str]
    recommendations: List[str]
    timestamp: datetime
    processing_time: float

class DeepSeekClient:
    """Client for DeepSeek R1 model integration"""
    
    def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
        self.client = openai.OpenAI(
            api_key=api_key,
            base_url=base_url
        )
        self.model = "deepseek-r1"
    
    async def generate_response(self, prompt: str, system_prompt: str = None, **kwargs) -> str:
        """Generate response using DeepSeek R1 model"""
        try:
            messages = []
            if system_prompt:
                messages.append({"role": "system", "content": system_prompt})
            messages.append({"role": "user", "content": prompt})
            
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=kwargs.get('temperature', 0.1),
                max_tokens=kwargs.get('max_tokens', 2000),
                top_p=kwargs.get('top_p', 0.9)
            )
            
            return response.choices[0].message.content
        except Exception as e:
            logger.error(f"Error generating response with DeepSeek: {e}")
            raise

class LegalDocumentProcessor:
    """Process and parse legal documents"""
    
    def __init__(self):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
        )
    
    def extract_text_from_pdf(self, pdf_path: str) -> str:
        """Extract text from PDF documents"""
        try:
            doc = fitz.open(pdf_path)
            text = ""
            for page in doc:
                text += page.get_text()
            doc.close()
            return text
        except Exception as e:
            logger.error(f"Error extracting text from PDF: {e}")
            return ""
    
    def extract_text_from_docx(self, docx_path: str) -> str:
        """Extract text from Word documents"""
        try:
            doc = docx.Document(docx_path)
            text = ""
            for paragraph in doc.paragraphs:
                text += paragraph.text + "\n"
            return text
        except Exception as e:
            logger.error(f"Error extracting text from DOCX: {e}")
            return ""
    
    def process_document(self, file_path: str, metadata: Dict[str, Any] = None) -> List[Document]:
        """Process document and return chunks"""
        file_path = Path(file_path)
        
        if file_path.suffix.lower() == '.pdf':
            text = self.extract_text_from_pdf(str(file_path))
        elif file_path.suffix.lower() in ['.docx', '.doc']:
            text = self.extract_text_from_docx(str(file_path))
        elif file_path.suffix.lower() == '.txt':
            with open(file_path, 'r', encoding='utf-8') as f:
                text = f.read()
        else:
            raise ValueError(f"Unsupported file format: {file_path.suffix}")
        
        # Split text into chunks
        chunks = self.text_splitter.split_text(text)
        
        # Create documents with metadata
        documents = []
        for i, chunk in enumerate(chunks):
            doc_metadata = {
                "source": str(file_path),
                "chunk_id": i,
                "file_type": file_path.suffix.lower(),
                **(metadata or {})
            }
            documents.append(Document(page_content=chunk, metadata=doc_metadata))
        
        return documents

In [None]:
class RAGSystem:
    """Retrieval-Augmented Generation system for legal knowledge"""
    
    def __init__(self, embeddings_model: str = "text-embedding-ada-002"):
        self.embeddings = OpenAIEmbeddings(model=embeddings_model)
        self.vector_store = None
        self.retriever = None
        self.doc_processor = LegalDocumentProcessor()
    
    def build_knowledge_base(self, documents_path: str):
        """Build knowledge base from legal documents"""
        logger.info("Building legal knowledge base...")
        
        documents = []
        documents_path = Path(documents_path)
        
        # Process all legal documents
        for file_path in documents_path.rglob("*"):
            if file_path.is_file() and file_path.suffix.lower() in ['.pdf', '.docx', '.doc', '.txt']:
                try:
                    # Add metadata about document type
                    metadata = {
                        "document_type": self._classify_document_type(file_path.name),
                        "jurisdiction": self._extract_jurisdiction(file_path.name),
                        "last_updated": datetime.now().isoformat()
                    }
                    
                    docs = self.doc_processor.process_document(str(file_path), metadata)
                    documents.extend(docs)
                    logger.info(f"Processed {len(docs)} chunks from {file_path.name}")
                    
                except Exception as e:
                    logger.error(f"Error processing {file_path}: {e}")
        
        # Create vector store
        if documents:
            self.vector_store = FAISS.from_documents(documents, self.embeddings)
            self.retriever = VectorStoreRetriever(
                vectorstore=self.vector_store,
                search_kwargs={"k": 5}
            )
            logger.info(f"Knowledge base built with {len(documents)} document chunks")
        else:
            logger.warning("No documents were processed for the knowledge base")
    
    def _classify_document_type(self, filename: str) -> str:
        """Classify document type based on filename"""
        filename_lower = filename.lower()
        if any(word in filename_lower for word in ['contract', 'agreement', 'terms']):
            return 'contract'
        elif any(word in filename_lower for word in ['statute', 'law', 'code', 'act']):
            return 'statute'
        elif any(word in filename_lower for word in ['case', 'court', 'judgment', 'ruling']):
            return 'case_law'
        elif any(word in filename_lower for word in ['regulation', 'rule', 'policy']):
            return 'regulation'
        else:
            return 'general'
    
    def _extract_jurisdiction(self, filename: str) -> str:
        """Extract jurisdiction from filename"""
        filename_lower = filename.lower()
        if any(word in filename_lower for word in ['federal', 'us', 'usa', 'united_states']):
            return 'federal'
        elif any(word in filename_lower for word in ['state', 'california', 'texas', 'new_york']):
            return 'state'
        elif any(word in filename_lower for word in ['china', 'chinese', '中国']):
            return 'china'
        else:
            return 'unknown'
    
    def retrieve_relevant_documents(self, query: str, k: int = 5) -> List[Document]:
        """Retrieve relevant documents for a query"""
        if not self.retriever:
            return []
        
        try:
            docs = self.retriever.get_relevant_documents(query)
            return docs[:k]
        except Exception as e:
            logger.error(f"Error retrieving documents: {e}")
            return []
    
    def save_knowledge_base(self, path: str):
        """Save the vector store to disk"""
        if self.vector_store:
            self.vector_store.save_local(path)
    
    def load_knowledge_base(self, path: str):
        """Load the vector store from disk"""
        try:
            self.vector_store = FAISS.load_local(path, self.embeddings)
            self.retriever = VectorStoreRetriever(
                vectorstore=self.vector_store,
                search_kwargs={"k": 5}
            )
            logger.info("Knowledge base loaded successfully")
        except Exception as e:
            logger.error(f"Error loading knowledge base: {e}")

In [None]:
class LexSyntheiaAgent:
    """Main AI Legal Assistant Agent"""
    
    def __init__(self, deepseek_api_key: str, openai_api_key: str = None):
        self.deepseek_client = DeepSeekClient(deepseek_api_key)
        self.rag_system = RAGSystem()
        
        # Set OpenAI API key for embeddings
        if openai_api_key:
            os.environ["OPENAI_API_KEY"] = openai_api_key
        
        # Legal-specific system prompt
        self.system_prompt = """
        You are LexSyntheia, an advanced AI legal assistant designed to help lawyers and legal professionals with various legal tasks. Your capabilities include:

        1. Legal Research: Analyze statutes, case law, and regulations
        2. Contract Analysis: Review and analyze legal agreements
        3. Compliance Checking: Assess regulatory compliance
        4. Document Drafting: Assist with legal document preparation
        5. Case Strategy: Provide insights for legal strategies

        Guidelines:
        - Always provide accurate, well-researched legal information
        - Cite relevant sources and legal precedents
        - Clearly distinguish between legal facts and opinions
        - Highlight potential risks and considerations
        - Provide practical recommendations
        - Use clear, professional language
        - Always include appropriate disclaimers about seeking professional legal advice

        Remember: You are an AI assistant and do not replace professional legal counsel.
        """
    
    def initialize_knowledge_base(self, documents_path: str):
        """Initialize the RAG knowledge base"""
        self.rag_system.build_knowledge_base(documents_path)
    
    def load_knowledge_base(self, path: str):
        """Load existing knowledge base"""
        self.rag_system.load_knowledge_base(path)
    
    async def process_legal_query(self, legal_query: LegalQuery) -> LegalResponse:
        """Process a legal query and return a comprehensive response"""
        start_time = datetime.now()
        
        try:
            # Retrieve relevant documents
            relevant_docs = self.rag_system.retrieve_relevant_documents(
                legal_query.query_text, k=5
            )
            
            # Prepare context from retrieved documents
            context = self._prepare_context(relevant_docs)
            
            # Create enhanced prompt
            enhanced_prompt = self._create_enhanced_prompt(
                legal_query, context
            )
            
            # Generate response using DeepSeek R1
            response_text = await self.deepseek_client.generate_response(
                enhanced_prompt, 
                self.system_prompt,
                temperature=0.1,
                max_tokens=2000
            )
            
            # Extract legal citations and recommendations
            legal_citations = self._extract_legal_citations(response_text)
            recommendations = self._extract_recommendations(response_text)
            
            # Prepare sources information
            sources = [
                {
                    "source": doc.metadata.get("source", "Unknown"),
                    "document_type": doc.metadata.get("document_type", "general"),
                    "jurisdiction": doc.metadata.get("jurisdiction", "unknown")
                }
                for doc in relevant_docs
            ]
            
            # Calculate confidence score
            confidence_score = self._calculate_confidence_score(
                relevant_docs, legal_query.query_text
            )
            
            processing_time = (datetime.now() - start_time).total_seconds()
            
            return LegalResponse(
                query_id=legal_query.query_id,
                response_text=response_text,
                confidence_score=confidence_score,
                sources=sources,
                legal_citations=legal_citations,
                recommendations=recommendations,
                timestamp=datetime.now(),
                processing_time=processing_time
            )
            
        except Exception as e:
            logger.error(f"Error processing legal query: {e}")
            raise
    
    def _prepare_context(self, relevant_docs: List[Document]) -> str:
        """Prepare context from retrieved documents"""
        if not relevant_docs:
            return "No relevant legal documents found in the knowledge base."
        
        context_parts = []
        for i, doc in enumerate(relevant_docs, 1):
            source = doc.metadata.get("source", "Unknown Source")
            doc_type = doc.metadata.get("document_type", "general")
            
            context_parts.append(
                f"[Document {i} - {doc_type.title()}]\n"
                f"Source: {source}\n"
                f"Content: {doc.page_content}\n"
            )
        
        return "\n".join(context_parts)
    
    def _create_enhanced_prompt(self, legal_query: LegalQuery, context: str) -> str:
        """Create an enhanced prompt with context and query details"""
        return f"""
        Legal Query Analysis Request:
        
        Query Type: {legal_query.query_type}
        Jurisdiction: {legal_query.jurisdiction}
        Urgency: {legal_query.urgency}
        
        Query: {legal_query.query_text}
        
        Relevant Legal Context:
        {context}
        
        Please provide a comprehensive legal analysis that includes:
        1. Direct answer to the query
        2. Relevant legal principles and precedents
        3. Potential risks and considerations
        4. Practical recommendations
        5. Next steps or actions to consider
        
        Format your response with clear sections and cite specific sources where applicable.
        Include appropriate legal disclaimers.
        """
    
    def _extract_legal_citations(self, response_text: str) -> List[str]:
        """Extract legal citations from the response"""
        # Simple regex patterns for common legal citations
        import re
        
        citation_patterns = [
            r'\d+\s+U\.S\.C\.?\s+§?\s*\d+',  # USC citations
            r'\d+\s+F\.\d+d?\s+\d+',          # Federal court cases
            r'\d+\s+S\.Ct\.\s+\d+',           # Supreme Court cases
            r'[A-Z][a-z]+\s+v\.\s+[A-Z][a-z]+',  # Case names
        ]
        
        citations = []
        for pattern in citation_patterns:
            matches = re.findall(pattern, response_text)
            citations.extend(matches)
        
        return list(set(citations))  # Remove duplicates
    
    def _extract_recommendations(self, response_text: str) -> List[str]:
        """Extract recommendations from the response"""
        # Look for recommendation sections or bullet points
        import re
        
        # Find lines that start with recommendation indicators
        recommendation_patterns = [
            r'(?i)recommend[a-z]*:?\s*(.+)',
            r'(?i)suggest[a-z]*:?\s*(.+)',
            r'(?i)advise[a-z]*:?\s*(.+)',
            r'^\s*[-•]\s*(.+)$'  # Bullet points
        ]
        
        recommendations = []
        lines = response_text.split('\n')
        
        for line in lines:
            for pattern in recommendation_patterns:
                matches = re.findall(pattern, line)
                if matches:
                    recommendations.extend(matches)
        
        return recommendations[:5]  # Limit to top 5 recommendations
    
    def _calculate_confidence_score(self, relevant_docs: List[Document], query: str) -> float:
        """Calculate confidence score based on retrieved documents relevance"""
        if not relevant_docs:
            return 0.0
        
        # Simple heuristic based on number and quality of retrieved documents
        base_score = min(len(relevant_docs) * 0.15, 0.75)  # Up to 0.75 for having docs
        
        # Boost for specific document types
        for doc in relevant_docs:
            doc_type = doc.metadata.get("document_type", "general")
            if doc_type in ["statute", "case_law"]:
                base_score += 0.05
            elif doc_type == "regulation":
                base_score += 0.03
        
        return min(base_score, 1.0)

In [None]:
# Usage Example and Testing
async def main():
    """Example usage of the LexSyntheia Legal Assistant"""
    
    # Initialize the agent
    agent = LexSyntheiaAgent(
        deepseek_api_key="sk-3d2f92176d02496cb78c8965f1739d2d",
        openai_api_key="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"
    )
    
    # Initialize or load knowledge base
    # agent.initialize_knowledge_base("./legal_documents")
    # agent.rag_system.save_knowledge_base("./knowledge_base")
    
    # Create a sample legal query
    query = LegalQuery(
        query_id="query_001",
        user_id="lawyer_123",
        query_text="What are the key elements required for a valid contract under US law?",
        query_type="legal_research",
        jurisdiction="federal",
        urgency="medium",
        timestamp=datetime.now()
    )
    
    # Process the query
    try:
        response = await agent.process_legal_query(query)
        
        print("=== LexSyntheia Legal Assistant Response ===")
        print(f"Query ID: {response.query_id}")
        print(f"Confidence Score: {response.confidence_score:.2f}")
        print(f"Processing Time: {response.processing_time:.2f}s")
        print(f"\nResponse:\n{response.response_text}")
        print(f"\nSources: {len(response.sources)} documents referenced")
        print(f"Legal Citations: {response.legal_citations}")
        print(f"Recommendations: {response.recommendations}")
        
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    asyncio.run(main())