# Installing Required Libraries

!pip install -qU langchain-text-splitters langchain-community langgraph langchain-google-genai langchain-huggingface langchain-chroma glob2 pandas numpy pypdf sentence-transformers scikit-learn pyMuPDF rank_bm25

In [None]:
import pandas as pd
import numpy as np
import re
import os
import json
import glob
import time
import fitz
from google import genai
from pathlib import Path
from typing_extensions import List, TypedDict
from pydantic import BaseModel, Field

# Langchain
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain.prompts import PromptTemplate
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain_core.documents import Document
from langgraph.graph import START, StateGraph

# Components

### Gemini & Langsmith API Keys

In [None]:
os.environ["GOOGLE_API_KEY"] = ""
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = ""

In [None]:
client = genai.Client()

### Chat Model: ***ChatGoogleGenerativeAI***

In [None]:
llm = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
)

### Embeddings Model: ***BAAI/bge-small-en*** from HuggingFace

In [None]:
embeddings_cache_path = "/content/embeddings_cache"

embeddings = HuggingFaceEmbeddings(
    model_name="BAAI/bge-small-en",
    cache_folder=embeddings_cache_path,
    encode_kwargs={"normalize_embeddings": True}
)

### Vector Store: Chroma

In [None]:
chroma_db_cache_path = "/content/chroma_db"

vector_store = Chroma(embedding_function=embeddings, persist_directory=chroma_db_cache_path)

# Indexing

### Loading Documents

In [None]:
pdf_folder = r"/content/Web Scraped Solder Bridging"

def load_pdfs_from_folder(folder_path):
    pdf_paths = list(Path(folder_path).glob("*.pdf"))
    print(pdf_paths)
    docs = []
    for path in pdf_paths:
        doc = fitz.open(str(path))
        for page_num, page in enumerate(doc):
            text = page.get_text().strip()
            if text:
                docs.append(Document(
                    page_content=text,
                    metadata={"source": path.name, "page": page_num + 1}
                ))
    return docs

In [None]:
docs = load_pdfs_from_folder(pdf_folder)

total_chars = sum(len(doc.page_content) for doc in docs)
print(f"Total characters in 6 documents combined: {total_chars}")

### Chunking Documents

In [None]:
def chunk_by_page(doc):
    """
    Given a list of Document objects, emit exactly one chunk per page,
    safely accessing metadata via .get().
    """
    chunks = []
    source = doc.metadata.get("source", "unknown")
    page_num = doc.metadata.get("page", 0)

    chunks.append(
        Document(
            page_content=doc.page_content,
            metadata={
                "source": source,
                "page": page_num,
                "section": f"page_{page_num}",
                "splitter": "page",
                "id": f"{source}_p{page_num}_s0",
            },
        )
    )
    return chunks


# Usage:
# page_chunks = chunk_by_page(docs)
# for chunk in page_chunks:
#     print(chunk.metadata.get('source'), chunk.metadata.get('page'))

In [None]:
chunks = []
for doc in docs:
    chunks.extend(chunk_by_page(doc))

In [None]:
len(chunks)

### Inserting Chunks in Vector Store

In [None]:
# Batch Insert Approach
batch_size = 10
document_ids = []

for i in range(0, len(chunks), batch_size):
    batch = chunks[i : i + batch_size]
    batch_ids = vector_store.add_documents(documents=batch)
    document_ids.extend(batch_ids)
    print(
        f"Inserted {min(i + batch_size, len(chunks))}/{len(chunks)} chunks"
    )

print("Done. Example IDs: ", document_ids[:3])

# Retrieval and Generation

### Custom Prompts

In [None]:
# ─────────────────────────  EFFECTS  ──────────────────────────
custom_prompt_effects = PromptTemplate(
    input_variables=["context", "question"],
    template=r"""
You are a highly concise assistant.

Task → List the **best five effects** (max) of the failure mode in the question, **using ONLY the context**.

Constraints
1. Each effect = 3-4 words.
2. No sentences, no extra text.
3. If zero effects are supported, answer exactly:
   {{"process":"", "failure_mode":"", "effects":[]}}

Context:
{context}

Question:
{question}

Respond **only** in this JSON schema:
{{
  "process": "<process name from question>",
  "failure_mode": "<failure mode from question>",
  "effects": ["effect 1", "effect 2", "effect 3", "effect 4", "effect 5"]
}}
""",
)

In [None]:
# ─────────────────────────  CAUSES  ───────────────────────────
custom_prompt_causes = PromptTemplate(
    input_variables=["context", "question"],
    template=r"""
You are a highly concise assistant.

Task → List the **best five causes** (max) of the failure mode in the question, **using ONLY the context**.

Constraints
1. Each cause = 3-4 words.
2. No sentences, no extra text.
3. If zero causes are supported, answer exactly:
   {{"process":"", "failure_mode":"", "causes":[]}}

Context:
{context}

Question:
{question}

Respond **only** in this JSON schema:
{{
  "process": "<process name from question>",
  "failure_mode": "<failure mode from question>",
  "causes": ["cause 1", "cause 2", "cause 3", "cause 4", "cause 5"]
}}
""",
)

In [None]:
# ─────────────────────  ACTIONS / REMEDIES  ─────────────────────
custom_prompt_actions = PromptTemplate(
    input_variables=["context", "question"],
    template=r"""
You are a highly concise assistant.

Task → Suggest the **best five corrective actions / remedies** (max) that mitigate or fix the failure mode mentioned in the question, **using ONLY the context**.

Constraints
1. Each action = 3-4 words.
2. No sentences, no extra text.
3. If zero actionable fixes are supported, answer exactly:
   {{"process":"", "failure_mode":"", "actions":[]}}

Context:
{context}

Question:
{question}

Respond **only** in this JSON schema:
{{
  "process": "<process name from question>",
  "failure_mode": "<failure mode from question>",
  "actions": ["action 1", "action 2", "action 3", "action 4", "action 5"]
}}
""",
)

In [None]:
bm25_retriever = BM25Retriever.from_documents(smart_chunks)
bm25_retriever.k = 5

ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, vector_store.as_retriever(search_kwargs={"k": 5})],
    weights=[0.5, 0.5],
)

### Langchain Graphs

In [None]:
class Search(BaseModel):
    query: str = Field(description="Search query to run.")

In [None]:
class State(TypedDict):
    question: str
    query: Search
    context: List[Document]
    answer: str

In [None]:
def analyze_query(state: State):
    structured_llm = llm.with_structured_output(Search)
    query = structured_llm.invoke(state["question"])
    return {"query": query}

In [None]:
def retrieve(state: State):
    query = state["query"]
    retrieved_docs = ensemble_retriever.invoke(query.query)[:8]
    print("No. of docs retrieved:", len(retrieved_docs))
    
    return {"context": retrieved_docs}

In [None]:
def generate_effects(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"]).strip()
    prompt_input = custom_prompt_effects.invoke({
        "question": state["question"],
        "context": docs_content
    })
    response = llm.invoke(prompt_input)

    return {"answer": response.content}

In [None]:
def generate_causes(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"]).strip()
    prompt_input = custom_prompt_causes.invoke({
        "question": state["question"],
        "context": docs_content
    })
    response = llm.invoke(prompt_input)

    return {"answer": response.content}

In [None]:
def generate_actions(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"]).strip()
    prompt_input = custom_prompt_actions.invoke(
        {"question": state["question"], "context": docs_content}
    )
    response = llm.invoke(prompt_input)
    return {"answer": response.content}

In [None]:
# ---------------- EFFECTS ----------------
effects_graph_builder = StateGraph(State).add_sequence([
    analyze_query,
    retrieve,
    generate_effects
])
effects_graph_builder.add_edge(START, "analyze_query")
graph_effects = effects_graph_builder.compile()

In [None]:
# ---------------- CAUSES -----------------
causes_graph_builder = StateGraph(State).add_sequence([
    analyze_query,
    retrieve,
    generate_causes
])
causes_graph_builder.add_edge(START, "analyze_query")
graph_causes = causes_graph_builder.compile()

In [None]:
# ---------------- ACTIONS / REMEDIES ----- 
actions_graph_builder = StateGraph(State).add_sequence([
    analyze_query,
    retrieve,
    generate_actions
])
actions_graph_builder.add_edge(START, "analyze_query")
graph_actions = actions_graph_builder.compile()  

# Testing

In [None]:
# Test Recommeded Action generation
question = "What is the solution for NWO/HIP ?"
response = graph_actions.invoke({"question": question})
print(response["answer"])

print(response["context"])

In [None]:
# Test EFFECTS generation
question = "What are the potential effects of SOLDER BEADING ?"
response = graph_effects.invoke({"question": question})
print(response["answer"])

In [None]:
print(response["context"])

In [None]:
# Test CAUSES generation
question = "What are the effects of Overheated joints?"
response = graph_causes.invoke({"question": question})
print(response["answer"])

In [None]:
print(response["context"])

# Populating the FMEA

In [None]:
def safe_invoke(graph, question, retries=2, wait=60):
    """Safely invokes a graph with retry logic."""
    try:
        return graph.invoke({"question": question})
    except Exception as e:
        if retries > 0:
            print(f"Retrying after error: {e}")
            time.sleep(wait)
            return safe_invoke(graph, question, retries - 1, wait)
        else:
            raise

In [None]:
def parse_json_response(response_text):
    """Cleans and parses JSON safely from model output."""
    try:
        match = re.search(r'\{.*\}', response_text, re.DOTALL)
        if match:
            return json.loads(match.group())
    except json.JSONDecodeError as e:
        print(f"JSON parsing failed: {e}")
    return {}

In [None]:
def format_context_attributes(context_list):
    """Formats the document source and page attributes for traceability."""
    return "\n".join(
        [
            f'Doc {i}: {{page: {doc.metadata.get("page")}, doc: {doc.metadata.get("source")}}}'
            for i, doc in enumerate(context_list)
        ]
    )

In [None]:
def log_update(row, effects, causes):
    print("x" + "-" * 64 + "x")
    print(f"SMT Process: {row['SMT Process']}")
    print(f"Failure Mode: {row['Failure Mode']}")
    print("Effects:" if effects else "No effects retrieved.")
    if effects:
        print(effects)
    print("Causes:" if causes else "No causes retrieved.")
    if causes:
        print(causes)
    print()

In [None]:
def fill_effects_and_causes(
    df, graph_effects, graph_causes, graph_actions, sleep_between=6
):
    for index, row in df.iterrows():
        process = row["SMT Process"]
        failure_mode = row["Failure Mode"]

        # EFFECTS
        question_effects = f"What are the effects of {failure_mode} ?"
        result_effects = safe_invoke(graph_effects, question_effects)

        if result_effects["context"]:
            parsed_effects = parse_json_response(result_effects["answer"])
            effects_list = parsed_effects.get("effects", [])
            df.at[index, "Potential Effects"] = ", ".join(effects_list)
            df.at[index, "Effects_Attr"] = format_context_attributes(
                result_effects["context"]
            )
        else:
            df.at[index, "Potential Effects"] = ""
            df.at[index, "Effects_Attr"] = ""

        time.sleep(sleep_between)

        # CAUSES
        question_causes = f"What are the causes of {failure_mode} ?"
        result_causes = safe_invoke(graph_causes, question_causes)

        if result_causes["context"]:
            parsed_causes = parse_json_response(result_causes["answer"])
            causes_list = parsed_causes.get("causes", [])
            df.at[index, "Potential Causes"] = ", ".join(causes_list)
            df.at[index, "Causes_Attr"] = format_context_attributes(
                result_causes["context"]
            )
        else:
            df.at[index, "Potential Causes"] = ""
            df.at[index, "Causes_Attr"] = ""

        # Recommended Actions
        q_actions = f"What are the recommended actions for {failure_mode} ?"
        r_actions = safe_invoke(graph_actions, q_actions)

        if r_actions["context"]:
            actions_json = parse_json_response(r_actions["answer"])
            actions_list = actions_json.get("actions", [])
            df.at[index, "Recommended Actions"] = ", ".join(actions_list)
            df.at[index, "Recommended_Actions_Attr"] = format_context_attributes(
                r_actions["context"]
            )
        else:
            df.at[index, "Recommended Actions"] = ""
            df.at[index, "Recommended_Actions_Attr"] = ""

        # Optional logging
        log_update(
            row, df.at[index, "Potential Effects"], df.at[index, "Potential Causes"]
        )

        time.sleep(sleep_between)

    return df

In [None]:
fmea_df = pd.read_csv("/content/FMEA_with_Processes_and_FMs.csv", index=False)

In [None]:
result_df = fill_effects_and_causes(fmea_df, graph_effects, graph_causes, graph_actions)

In [None]:
result_df.head()

In [None]:
pd.to_csv("Final Generated FMEA.csv", index=False)