In [None]:
import os
import json
import openai
import docx
from docx import Document
import time
import re
from typing import List, Dict, Optional

class KnowledgeGraphGenerator:
    """Knowledge Graph Generator - Converts natural language to Neo4j Cypher code and extracts entities"""
    
    def __init__(self):
        """Initialize generator and setup API key"""
        self.setup_api_key()
        self.client = openai.OpenAI(
            api_key=os.environ["DEEPSEEK_API_KEY"],
            base_url="https://api.deepseek.com/v1"
        )
        self.canonical_mapping = {}
        
    def setup_api_key(self):
        """Setup Deepseek API key"""
        if "DEEPSEEK_API_KEY" not in os.environ:
            api_key = input("Enter your Deepseek API Key: ").strip()
            os.environ["DEEPSEEK_API_KEY"] = api_key
    
    def load_canonical_mapping(self, mapping_file_path: str) -> Dict:
        """Load canonical mapping file for entity standardization"""
        try:
            if os.path.exists(mapping_file_path):
                with open(mapping_file_path, 'r', encoding='utf-8') as f:
                    self.canonical_mapping = json.load(f)
                print(f"✅ Canonical mapping loaded: {len(self.canonical_mapping)} entries")
            else:
                print(f"⚠️ Canonical mapping file not found: {mapping_file_path}")
                print("📝 Creating empty mapping file...")
                self.canonical_mapping = {}
                with open(mapping_file_path, 'w', encoding='utf-8') as f:
                    json.dump({}, f, indent=2)
        except Exception as e:
            print(f"❌ Error loading canonical mapping: {e}")
            self.canonical_mapping = {}
        
        return self.canonical_mapping
    
    def load_sentences_from_docx(self, doc_path: str) -> List[str]:
        """Load sentences from Word document"""
        try:
            doc = docx.Document(doc_path)
            sentences = []
            
            for para in doc.paragraphs:
                text = para.text.strip()
                if text:
                    # Remove numbered prefix if exists
                    if text and text[0].isdigit() and '. ' in text:
                        text = ". ".join(text.split(". ")[1:])
                    sentences.append(text)
            
            print(f"✅ Successfully loaded {len(sentences)} sentences")
            return sentences
            
        except Exception as e:
            print(f"❌ Failed to read document: {e}")
            return []
    
    def get_prompt_template(self) -> str:
        """Get prompt template with canonical mapping instructions"""
        mapping_str = json.dumps(self.canonical_mapping, indent=2) if self.canonical_mapping else "{}"
        
        return """You are an expert in knowledge graph construction.
Your task is twofold for the given natural language statement:
1. Convert the statement into Neo4j Cypher code, primarily using **MERGE** statements to represent the nodes and relationships. Focus on accurately capturing the information within this single sentence.
2. Extract and list all unique **entity names** that you used as node identifiers (e.g., the value in `{name: 'EntityName'}`) within the generated Cypher code.

Follow these instructions strictly:

## Canonical Entity Standardization
This procedure standardizes node names in Cypher code so that the graph stores a single, canonical representation of every concept.

**Input:**
1. A canonical-mapping file (JSON format below)
2. The initial Cypher script with MERGE statements

**Canonical Mapping:**
""" + mapping_str + """

**Method:**
1. Sequentially inspect every MERGE statement in the script and record the string assigned to each name property.
2. For every recorded name, consult the canonical-mapping file:
   a) If the name appears in a variant list, substitute the corresponding canonical term in the Cypher statement.
   b) If no mapping is found, preserve the original wording on the assumption that it is already canonical.
3. Retain without alteration the variable identifiers, node labels, relationship types, directions and the overall ordering of statements; only the literal contents of the name properties are subject to change.
4. Assemble the modified statements into a single, syntactically valid Cypher block.

## Output Format (Strict JSON)
- You **MUST** return the output ONLY as a single, valid JSON object.
- Do NOT include any text, explanations, or markdown formatting before or after the JSON object.
- The JSON object must contain exactly two keys: `"cypher"` and `"entities"`.
- The value for the `"cypher"` key must be a single string containing all the generated Neo4j Cypher code for the input sentence. Use `\\n` for newlines within the Cypher string. Use `MERGE` statements.
- The value for the `"entities"` key must be a JSON array of strings. This array should list all unique entity names used as the `name` property value within the generated Cypher code.

## Avoid These Common Mistakes

### 1. Ambiguity in Relationships
- Relationships must be clearly defined.
- ✅ Example:  
  MERGE (glass:Material {{name: "Glass"}})-[:REDUCES]->(internalFriction:Property {{name: "Internal Friction"}})
- ❌ Avoid unclear chains like:  
  MERGE (glass:Material {{name: "Glass"}})-[:REDUCES]->(internalFriction:Property {{name: "Internal Friction"}})-[:AFFECTS]->(flow:Property {{name: "Flow"}})
- Specify whether the effect is increasing or decreasing.

### 2. Property Assignment Errors
- Do not assign incorrect properties to entities.
- ✅ Example:  
  MERGE (uhpc:Product {{name: "UHPC"}})-[:HAS_PROCESSING_METHOD]->(hotCuring:Process {{name: "Hot Curing"}})
  MERGE (hotCuring)-[:HAS]->(curingTemperature:Property {{name: "Curing Temperature"}})
- ❌ Incorrect:  
  MERGE (uhpc:Product {{name: "UHPC"}})-[:HAS_PROPERTY]->(curingTemperature:Property {{name: "Curing Temperature"}})

### 3. Redundancy
- Avoid multiple nodes for the same concept (e.g., `"Waste Glass"`, `"Glass Particles"`, `"Fine Glass"` → should all be `"Glass"`).
- Use a **consistent naming convention** for entities.

### 4. Inconsistency
- Use **consistent labels** and **relationship types** for the same entity across different sentences.

## Example
**Input:**  
"Glass sand can be efficiently used to produce UHPC and eliminate the need for quartz sand, yielding a cost-effective and environmentally friendly solution."

**Expected Output:**
{{
  "cypher": "MERGE (glassSand:Material {{name: \\"Glass Sand\\"}})\\nMERGE (quartzSand:Material {{name: \\"Quartz Sand\\"}})\\nMERGE (uhpc:Product {{name: \\"UHPC\\"}})\\nMERGE (costEffectiveness:Benefit {{name: \\"Cost-Effective\\"}})\\nMERGE (environmentalBenefit:Benefit {{name: \\"Environmentally Friendly\\"}})\\n\\nMERGE (glassSand)-[:USED_IN]->(uhpc)\\nMERGE (glassSand)-[:REPLACES]->(quartzSand)\\nMERGE (glassSand)-[:YIELDS]->(costEffectiveness)\\nMERGE (glassSand)-[:YIELDS]->(environmentalBenefit)",
  "entities": ["Glass Sand", "Quartz Sand", "UHPC", "Cost-Effective", "Environmentally Friendly"]
}}

Now, please process the following sentence and provide ONLY a valid JSON object with cypher code and entities list:

"{sentence}"
"""
    
    def parse_response(self, response_text: str, original_sentence: str) -> Optional[Dict]:
        """Parse API response to extract Cypher code and entities"""
        # Clean response text
        cleaned_text = response_text.strip()
        
        # Remove code block markers
        if "```json" in cleaned_text:
            cleaned_text = cleaned_text.replace("```json", "").replace("```", "").strip()
        
        # Try direct JSON parsing
        try:
            data = json.loads(cleaned_text)
            if self.validate_response(data):
                data["sentence"] = original_sentence
                return data
        except json.JSONDecodeError:
            pass
        
        # If direct parsing fails, try regex extraction
        json_match = re.search(r'\{.*\}', cleaned_text, re.DOTALL)
        if json_match:
            try:
                data = json.loads(json_match.group())
                if self.validate_response(data):
                    data["sentence"] = original_sentence
                    return data
            except json.JSONDecodeError:
                pass
        
        # Try to extract cypher and entities separately
        try:
            cypher_pattern = r'"cypher"\s*:\s*"([^"]*(?:\\.[^"]*)*)"'
            entities_pattern = r'"entities"\s*:\s*\[(.*?)\]'
            
            cypher_match = re.search(cypher_pattern, cleaned_text, re.DOTALL)
            entities_match = re.search(entities_pattern, cleaned_text)
            
            if cypher_match:
                cypher_code = cypher_match.group(1)
                entities = []
                
                if entities_match:
                    entities_text = entities_match.group(1)
                    entity_pattern = r'"([^"]*(?:\\.[^"]*)*)"'
                    entities = re.findall(entity_pattern, entities_text)
                
                return {
                    "sentence": original_sentence,
                    "cypher": cypher_code,
                    "entities": entities
                }
        except Exception as e:
            print(f"⚠️ Parsing error: {e}")
        
        print(f"⚠️ Failed to parse response: {response_text[:100]}...")
        return None
    
    def validate_response(self, data: Dict) -> bool:
        """Validate response data"""
        return (isinstance(data, dict) and 
                "cypher" in data and 
                "entities" in data and
                isinstance(data["cypher"], str) and
                isinstance(data["entities"], list))
    
    def process_sentence(self, sentence: str, retry_count: int = 3) -> Optional[Dict]:
        """Process single sentence to generate Cypher code and entities"""
        prompt = self.get_prompt_template().format(sentence=sentence)
        
        for attempt in range(retry_count):
            try:
                response = self.client.chat.completions.create(
                    model="deepseek-chat",
                    messages=[{"role": "user", "content": prompt}],
                    temperature=0.2,
                    max_tokens=4000,
                    timeout=60
                )
                
                response_content = response.choices[0].message.content.strip()
                result = self.parse_response(response_content, sentence)
                
                if result:
                    return result
                else:
                    print(f"⚠️ Attempt {attempt + 1} parsing failed")
                    
            except Exception as e:
                print(f"⚠️ Attempt {attempt + 1} API call failed: {e}")
                if attempt < retry_count - 1:
                    time.sleep(5)  # Wait before retry
        
        return {"sentence": sentence, "cypher": "", "entities": []}
    
    def process_all_sentences(self, sentences: List[str]) -> List[Dict]:
        """Process all sentences"""
        results = []
        total = len(sentences)
        
        print(f"🚀 Starting to process {total} sentences...")
        
        for i, sentence in enumerate(sentences, 1):
            print(f"📝 Processing sentence {i}/{total}: {sentence[:50]}...")
            
            result = self.process_sentence(sentence)
            results.append(result)
            
            # Show processing result
            if result and result.get('cypher') and result.get('entities'):
                print(f"✅ Sentence {i} processed successfully")
            else:
                print(f"❌ Sentence {i} processing failed")
            
            # API rate limiting
            if i < total:
                time.sleep(2)
        
        # Statistics
        successful = sum(1 for r in results if r.get('cypher') and r.get('entities'))
        success_rate = (successful / total) * 100
        print(f"📊 Processing complete: {successful}/{total} ({success_rate:.1f}%)")
        
        return results
    
    def save_results(self, results: List[Dict], cypher_path: str, entities_path: str):
        """Save results to Word documents"""
        self.save_cypher_results(results, cypher_path)
        self.save_entities_results(results, entities_path)
        self.save_json_results(results, cypher_path.replace('.docx', '.json'))
    
    def save_cypher_results(self, results: List[Dict], path: str):
        """Save Cypher code results to Word document"""
        doc = Document()
        doc.add_heading("Cypher Code Generation Results", level=1)
        
        # Add statistics
        total = len(results)
        successful = sum(1 for r in results if r.get('cypher'))
        doc.add_paragraph(f"Total sentences: {total}")
        doc.add_paragraph(f"Successfully generated: {successful}")
        doc.add_paragraph(f"Success rate: {(successful/total*100):.1f}%")
        doc.add_paragraph("=" * 50)
        
        # Add each result
        for i, result in enumerate(results, 1):
            doc.add_heading(f"Sentence {i}", level=2)
            doc.add_paragraph(f"Original: {result.get('sentence', '')}")
            
            cypher = result.get('cypher', '')
            if cypher:
                # Format Cypher code
                formatted_cypher = cypher.replace('\\n', '\n')
                doc.add_paragraph("Cypher Code:")
                doc.add_paragraph(formatted_cypher, style='Normal')
            else:
                doc.add_paragraph("Cypher Code: [Generation failed]")
            
            doc.add_paragraph("-" * 30)
        
        doc.save(path)
        print(f"✅ Cypher results saved to: {path}")
    
    def save_entities_results(self, results: List[Dict], path: str):
        """Save entities results to Word document"""
        doc = Document()
        doc.add_heading("Entity Extraction Results", level=1)
        
        # Collect all entities
        all_entities = []
        for result in results:
            entities = result.get('entities', [])
            all_entities.extend(entities)
        
        unique_entities = sorted(set(all_entities))
        
        # Add statistics
        doc.add_paragraph(f"Total sentences: {len(results)}")
        doc.add_paragraph(f"Total extracted entities: {len(all_entities)}")
        doc.add_paragraph(f"Unique entities: {len(unique_entities)}")
        doc.add_paragraph("=" * 50)
        
        # Show entities by sentence
        doc.add_heading("Entities by Sentence", level=2)
        for i, result in enumerate(results, 1):
            doc.add_paragraph(f"Sentence {i}: {result.get('sentence', '')}")
            entities = result.get('entities', [])
            if entities:
                doc.add_paragraph(f"Entities: {', '.join(entities)}")
            else:
                doc.add_paragraph("Entities: [None]")
            doc.add_paragraph("")
        
        # Unique entities list
        doc.add_heading("All Unique Entities", level=2)
        for i, entity in enumerate(unique_entities, 1):
            doc.add_paragraph(f"{i}. {entity}")
        
        doc.save(path)
        print(f"✅ Entity results saved to: {path}")
    
    def save_json_results(self, results: List[Dict], path: str):
        """Save complete results as JSON"""
        with open(path, 'w', encoding='utf-8') as f:
            json.dump(results, f, indent=2, ensure_ascii=False)
        print(f"✅ Complete results saved to: {path}")
    
    def run(self, input_doc_path: str, cypher_output_path: str, entities_output_path: str, 
            canonical_mapping_path: str = "canonical_mapping.json"):
        """Run complete pipeline"""
        print("🔄 Starting Knowledge Graph Generation Pipeline...")
        
        # 1. Load canonical mapping
        self.load_canonical_mapping(canonical_mapping_path)
        
        # 2. Load sentences
        sentences = self.load_sentences_from_docx(input_doc_path)
        if not sentences:
            print("❌ No sentences loaded, terminating program")
            return
        
        # 3. Process all sentences
        results = self.process_all_sentences(sentences)
        
        # 4. Save results
        self.save_results(results, cypher_output_path, entities_output_path)
        
        print("🎉 Knowledge Graph Generation Pipeline Complete!")

def main():
    """Main function"""
    # Configure file paths
    input_doc =   # Input sentences document
    cypher_output =   # Cypher code output
    entities_output =   # Entities output
    canonical_mapping =   # Canonical mapping file
    
    # Create generator and run
    generator = KnowledgeGraphGenerator()
    generator.run(input_doc, cypher_output, entities_output, canonical_mapping)

if __name__ == "__main__":
    main()