### Library Imports

In [159]:
import json
import sys

from pyverilog.vparser.parser import parse
from pyverilog.vparser.ast import (
    # Node types we'll explicitly handle
    Source,
    Description,
    ModuleDef,
    Always,
    SensList,
    IfStatement,
    Assign,
    BlockingSubstitution,
    NonblockingSubstitution,
    Eq,
    Or,
    GreaterThan,
    Identifier,
    IntConst,
    Cond,
    Block,
    Lvalue,
    Rvalue
)
from pyverilog.vparser.ast import *  # Import Always block
from pyverilog.ast_code_generator.codegen import ASTCodeGenerator

In [160]:
import os

from langchain.chains.qa_generation.prompt import CHAT_PROMPT
#from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import DocArrayInMemorySearch
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_core.prompts import ChatPromptTemplate
from langchain_groq import ChatGroq
from groq import Groq
from langchain_openai import ChatOpenAI
from langchain.llms.base import LLM
from langchain_community.llms import OpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
from langchain.chains.conversation.memory import ConversationSummaryMemory
from langchain.chains import RetrievalQA
import json

from operator import itemgetter
from typing import List

from langchain_openai.chat_models import ChatOpenAI

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydantic import BaseModel, Field
from langchain_core.runnables import (
    RunnableLambda,
    ConfigurableFieldSpec,
    RunnablePassthrough,
)
from langchain_core.runnables.history import RunnableWithMessageHistory
EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"

### OPEN AI setup

In [210]:
# Step 1: Set up the OpenAI LLM
llm_OAI = ChatOpenAI(
    model="gpt-4.1-mini", # "gpt-4" or "gpt-3.5-turbo"
    # model="gpt-4o-mini-2024-07-18",
    # model="gpt-3.5-turbo",
    openai_api_key="",  # Replace with your OpenAI API key
    # temperature=1,  # Adjust the creativity level
    # max_tokens=1000,   # Set the maximum output token limit
    verbose=True
)

In [211]:
from openai import OpenAI
client = OpenAI(
    api_key = "",
    organization= None
)

### Open source LLMA setup

In [249]:
class ChatGroqLLM(LLM):
    def __init__(self, groq_api_key, model_name):
        client = Groq(
            api_key= groq_api_key,  # Replace with your Groq API key
        )

    def _call(self, prompt: str, stop=None) -> str:
        """
        Call the underlying ChatGroq LLM with the given prompt and return the response.
        """
        chat_completion = client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": prompt,
            }
        ],
        model= model_name,
        )
        # Use the generate method with prompt directly
        response = chat_completion.choices[0].message.content
        
        return response

    @property
    def _llm_type(self) -> str:
        return "chat_groq"
groq_api_key = ""
# groq_api_key = ""
# Initialize Groq Langchain chat object and conversation
groq_chat = ChatGroq(
        groq_api_key=groq_api_key, 
        model_name="llama-3.3-70b-versatile"
        # model_name="meta-llama/llama-4-scout-17b-16e-instruct"
)

### AST Parser

In [164]:
class NodeVisitor:
    """
    A simple, robust visitor that attempts to handle cases where
    node.children() returns either:
      - A list of (attribute_name, child_node) pairs,
      - A list of child_nodes, or
      - A single object (like 'Description') instead of a list.
    """

    def visit(self, node):
        """Dispatch to a method named visit_<NodeClassName>."""
        method_name = "visit_" + node.__class__.__name__
        visitor = getattr(self, method_name, self.generic_visit)
        return visitor(node)

    def generic_visit(self, node):
        """Fallback for node types without a custom visit_* method."""
        # Check if node has a .children() method
        if hasattr(node, "children") and callable(node.children):
            children = node.children()
            
            # If .children() returned a single item (not a list/tuple):
            if not isinstance(children, (list, tuple)):
                # Just treat it as one "child"
                children = [children]

            for child in children:
                # Sometimes child is (attr_name, child_node), sometimes just child_node
                if isinstance(child, tuple) and len(child) == 2:
                    attr_name, child_node = child
                    self.visit(child_node)
                else:
                    # child itself is presumably a node
                    self.visit(child)
        # If no children, do nothing
        return
###############################################################################
# Extractor
###############################################################################
class VerilogInfoExtractor(NodeVisitor):
    """
    Walks the AST, collecting:
      - always blocks
      - continuous assign statements
    """

    def __init__(self):
        super().__init__()
        self.always_blocks = []
        self.assign_statements = []
        self.module_instantiations = []  # NEW: For module calls

    # ------------------------------------
    # MODULE INSTANTIATIONS
    # ------------------------------------
    def visit_InstanceList(self, node):
        """
        Handle a list of module instances (e.g., u_submodule, u_another_module).
        """
        for instance in node.instances:
            self.visit_Instance(instance)

    def visit_Instance(self, node):
        """
        Handle a single module instance.
        """
        instance_info = {
            "module_name": node.module,  # Module being instantiated
            "instance_name": node.name,  # Name of the instance (e.g., u_submodule)
            "port_connections": []
        }

        # Extract port connections
        for connection in node.portlist:
            port_name = connection.portname  # Name of the module's port
            signal_name = self.stringify_condition(connection.argname)  # Signal connected to this port
            instance_info["port_connections"].append({
                "port": port_name,
                "signal": signal_name
            })

        # Append to the list of module instantiations
        self.module_instantiations.append(instance_info)

        # Continue visiting deeper nodes if needed
        self.generic_visit(node)        


    # ---------------------------------------------------
    # Existing logic for Always, IfStatement, etc. omitted
    # ---------------------------------------------------

    def extract_case_statement(self, casenode):
        """
        Convert a CaseStatement (or Casex/Casez) into a structured dict.
        Example output:
        {
          "type": "case_statement",
          "case_expr": "opcode",   # e.g. the expression after 'case(...)'
          "cases": [
            {
              "conditions": ["4'd0"],     # e.g. case item expression
              "statements": [ ... ]       # the statements for that case
            },
            {
              "conditions": ["4'd1", "4'd2"], 
              "statements": [ ... ]
            },
            {
              "conditions": ["default"],
              "statements": [ ... ]
            }
          ]
        }
        """
        # Stringify the main case expression
        case_expr_str = self.stringify_condition(casenode.comp)

        case_items_info = []
        for case_item in casenode.caselist:
            # Each CaseItem has a list of conditions, or empty for default
            if not case_item.cond:
                # default
                cond_strs = ["default"]
            else:
                cond_strs = [self.stringify_condition(cond) for cond in case_item.cond]

            # The statement might be a single statement or a block
            statements_info = self.extract_case_item_statements(case_item.statement)

            case_items_info.append({
                "conditions": cond_strs,
                "statements": statements_info,
            })

        return {
            "type": "case_statement",
            "case_expr": case_expr_str,
            "cases": case_items_info,
        }

    def extract_case_item_statements(self, stmt):
        """
        Extract the statements that appear under a single CaseItem.
        Typically, you'd reuse your existing helpers (like extract_assignments, etc.)
        """

        statements_data = []

        if stmt is None:
            return statements_data

        if isinstance(stmt, Block):
            # If the case item is a block, handle each statement in it
            for s in stmt.statements:
                statements_data.extend(self.extract_case_item_statements(s))
        elif isinstance(stmt, IfStatement):
            # Potentially handle nested if
            statements_data.append({
                "type": "if_statement",
                "info": self.extract_if_statement(stmt)
            })
        elif isinstance(stmt, (BlockingSubstitution, NonblockingSubstitution)):
            lhs_str = self.stringify_condition(stmt.left.var)
            rhs_str = self.stringify_condition(stmt.right.var)
            statements_data.append({
                "type": "assignment",
                "lhs": lhs_str,
                "rhs": rhs_str
            })
        elif isinstance(stmt, Assign):
            lhs_str = self.stringify_condition(stmt.left)
            rhs_str = self.stringify_condition(stmt.right)
            statements_data.append({
                "type": "continuous_assign",
                "lhs": lhs_str,
                "rhs": rhs_str
            })
        elif isinstance(stmt, CaseStatement):
            # Nested case statement
            statements_data.append(self.extract_case_statement(stmt))
        else:
            # Fallback for any unhandled statement type
            statements_data.append({
                "type": "unhandled",
                "node_class": stmt.__class__.__name__
            })

        return statements_data

    # ------------------------------------
    # ALWAYS BLOCKS
    # ------------------------------------
    def visit_Always(self, node):
        """
        Collect data from an always block:
        1) The sensitivity list (triggers)
        2) The sequential logic (if-else, assignments, etc.)
        3) Post-logic assignments
        """
        block_info = {
            "block_type": "always",
            "triggers": self.extract_triggers(node.sens_list),
            "logic": [],
            "post_logic_assignments": []
        }

        # The statement(s) inside the always block
        statements = node.statement
        logic_blocks, tail_assigns = self.extract_logic(statements)

        block_info["logic"] = logic_blocks
        block_info["post_logic_assignments"] = tail_assigns

        self.always_blocks.append(block_info)

        # Also visit children in case of nested always or deeper nodes (rare but possible)
        self.generic_visit(node)

    ###########################################################################
    # Helper Methods
    ###########################################################################
    def extract_triggers(self, sens_list):
        """
        Parse the sensitivity list to get triggers like 'posedge clk' or 'negedge reset'.
        """
        triggers = []
        if isinstance(sens_list, SensList):
            for sens_item in sens_list.list:
                edge_type = getattr(sens_item.sig, 'clockedge', None)  # 'posedge', 'negedge', or None
                sig_name = getattr(sens_item.sig, 'name', None)
                if edge_type and sig_name:
                    triggers.append(f"{edge_type} {sig_name}")
                elif sig_name:
                    triggers.append(sig_name)
        return triggers

    def extract_logic(self, statement):
        """
        Traverse the statement(s) inside the always block to collect:
         - if-else (nested) chains
         - assignments after or outside if-else
        """
        logic_blocks = []
        tail_assignments = []

        if isinstance(statement, Block):
            # Iterate over each statement in the block
            for stmt in statement.statements:
                if isinstance(stmt, IfStatement):
                    logic_blocks.extend(self.extract_if_statement(stmt))
                elif isinstance(stmt, CaseStatement):
                    case_info = self.extract_case_statement(stmt)
                    logic_blocks.append(case_info)
                elif isinstance(stmt, ForStatement):
                    # Handle for-loop
                    loop_info = self.visit_ForStatement(stmt)
                    logic_blocks.append(loop_info)
                else:
                    tail_assignments.extend(self.extract_assignments(stmt))

        elif isinstance(statement, IfStatement):
            # Single IfStatement at top-level
            logic_blocks.extend(self.extract_if_statement(statement))
        elif isinstance(statement, CaseStatement):
            case_info = self.extract_case_statement(statement)
            logic_blocks.append(case_info)
        elif isinstance(statement, ForStatement):
            # Single for-loop at top-level
            loop_info = self.visit_ForStatement(statement)
            logic_blocks.append(loop_info)

        else:
            # Possibly a single assignment at top-level
            tail_assignments.extend(self.extract_assignments(statement))

        return logic_blocks, tail_assignments

    def extract_if_statement(self, ifstmt):
        """
        Return a list of if/else blocks in structured form, e.g.:
        [
          {
            "condition": "<expression>",
            "assignments": [{ "lhs": "data", "rhs": "8'b0" }, ... ]
          },
          ...
        ]
        """
        blocks = []

        # If part
        cond_str = self.stringify_condition(ifstmt.cond)
        if_assigns = self.extract_assignments(ifstmt.true_statement)
        blocks.append({
            "condition": cond_str,
            "assignments": if_assigns
        })

        # Else or else-if
        false_part = ifstmt.false_statement
        if false_part is not None:
            if isinstance(false_part, IfStatement):
                # Recursively extract nested else-if
                blocks.extend(self.extract_if_statement(false_part))
            else:
                # It's a direct else block or single statement
                else_assigns = self.extract_assignments(false_part)
                blocks.append({
                    "condition": "else",
                    "assignments": else_assigns
                })

        return blocks

    def extract_assignments(self, stmt):
        """
        Return a list of assignments in the form: [ { "lhs": "...", "rhs": "..." }, ... ]
        """
        assignments = []
        if stmt is None:
            return assignments
            
        if isinstance(stmt, Block):
            for s in stmt.statements:
                assignments.extend(self.extract_assignments(s))

        elif isinstance(stmt, (NonblockingSubstitution, BlockingSubstitution)):
            lhs_str = self.stringify_condition(stmt.left.var)
            rhs_str = self.stringify_condition(stmt.right.var)
            assignments.append({"lhs": lhs_str, "rhs": rhs_str})

        elif isinstance(stmt, Assign):
            # Continuous assignment - rarely inside an always, but included for completeness
            lhs_str = self.stringify_condition(stmt.left)
            rhs_str = self.stringify_condition(stmt.right)
            assignments.append({"lhs": lhs_str, "rhs": rhs_str})

        elif isinstance(stmt, IfStatement):
            # If statement directly in a block: treat it separately if needed
            pass

        return assignments
    # ------------------------------------
    # CONTINUOUS ASSIGN STATEMENTS
    # ------------------------------------
    def visit_Assign(self, node):
        """
        Capture 'assign lhs = rhs;' statements
        """
        lhs_str = self.stringify_condition(node.left.var)
        rhs_str = self.stringify_condition(node.right.var)
        self.assign_statements.append({
            "lhs": lhs_str,
            "rhs": rhs_str
        })

        self.generic_visit(node)
        
    def stringify_condition(self, cond):
        """
        Convert AST condition nodes into readable strings.
        """
        if cond is None:
            return "None"
        # Logical operators
        if isinstance(cond, And):
            left = self.stringify_condition(cond.left)
            right = self.stringify_condition(cond.right)
            return f"({left} AND {right})"
        if isinstance(cond, Land):
            left = self.stringify_condition(cond.left)
            right = self.stringify_condition(cond.right)
            return f"({left} ANDAND {right})"
        elif isinstance(cond, Or):
            left = self.stringify_condition(cond.left)
            right = self.stringify_condition(cond.right)
            return f"({left} OR {right})"
        elif isinstance(cond, Lor):
            left = self.stringify_condition(cond.left)
            right = self.stringify_condition(cond.right)
            return f"({left} OROR {right})"
        # Arithmetic operations
        elif isinstance(cond, Plus):  # Handle addition
            left = self.stringify_condition(cond.left)
            right = self.stringify_condition(cond.right)
            return f"({left} PLUS {right})"
        elif isinstance(cond, Minus):  # Handle subtraction
            left = self.stringify_condition(cond.left)
            right = self.stringify_condition(cond.right)
            return f"({left} MINUS {right})"

        elif isinstance(cond, Ulnot):
            operand = cond.children()[0]
            operand_str = self.stringify_condition(operand)
            return f"NOT ({operand_str})"
        elif isinstance(cond, Unot):
            operand = cond.children()[0]
            operand_str = self.stringify_condition(operand)
            return f"NOT ({operand_str})"
        elif isinstance(cond, Uand):
            operand = cond.children()[0]
            operand_str = self.stringify_condition(operand)
            return f"AND ({operand_str})"
        elif isinstance(cond, GreaterThan):
            left = self.stringify_condition(cond.left)
            right = self.stringify_condition(cond.right)
            return f"{left} GREATER THAN {right}"
        elif isinstance(cond, LessThan):
            left = self.stringify_condition(cond.left)
            right = self.stringify_condition(cond.right)
            return f"{left} LESS THAN {right}"
        elif isinstance(cond, Eq):
            left = self.stringify_condition(cond.left)
            right = self.stringify_condition(cond.right)
            return f"({left} EQUAL EQUAL {right})"
        elif isinstance(cond, Identifier):
            return cond.name
        elif isinstance(cond, IntConst):
            return cond.value
        elif isinstance(cond, Cond):
            # Ternary: cond.cond ? cond.true_value : cond.false_value
            c = self.stringify_condition(cond.cond)
            t = self.stringify_condition(cond.true_value)
            f = self.stringify_condition(cond.false_value)
            return f"({c}) IF TRUE ({t}) OTHERWISE ({f})"
        elif isinstance(cond, Pointer):
            base_str = self.stringify_condition(cond.var)  # 'b'
            ptr_str = self.stringify_condition(cond.ptr)  # 'i'
            return f"{base_str}[{ptr_str}]"
        elif isinstance(cond, Partselect):
            # e.g. var = 'b', msb = IntConst('3'), lsb = IntConst('0')
            base_str = self.stringify_condition(cond.var)  # "b"
            msb_str = self.stringify_condition(cond.msb)  # "3"
            lsb_str = self.stringify_condition(cond.lsb)  # "0"
            return f"{base_str}[{msb_str}:{lsb_str}]"
        elif isinstance(cond, Concat):
            # Extract and stringify each element in the concatenation
            elements = [self.stringify_condition(part) for part in cond.children()]
            # Join elements with commas and wrap them in curly braces
            return "{" + ", ".join(elements) + "}"

        else:
            # Fallback
            return str(cond)
    def visit_ForStatement(self, node):
        """
        Extract details of a for loop.
        """
        loop_info = {
            "type": "for_loop",
            "initialization": self.extract_assignments(node.children()[0]),
            "condition": self.stringify_condition(node.cond),
            "increment": self.extract_assignments(node.children()[2]),
            "body": self.extract_logic(node.children()[3]),
        }
    
        # Add the loop_info to an appropriate collection or append it to the block where it's encountered
        return loop_info
    

def parse_verilog_always_blocks(filename):
    """
    Parse the given Verilog file, collect all always blocks,
    and return them as a list of structured JSON-like dicts.
    """
    ast, _ = parse(filename)
    extractor = VerilogInfoExtractor()
    extractor.visit(ast)
    return {
        "always_blocks": extractor.always_blocks,
        "assign_statements": extractor.assign_statements,
        "module_instantiations": extractor.module_instantiations
    }

### Embedding

In [165]:
# Embedding model name
def preprocess_json_for_embedding(json_data):
    chunks = []

    # Helper function to process assignments
    def process_assignments(assignments, trigger="", condition=""):
        for assignment in assignments:
            chunk = json.dumps({
                "trigger": trigger,
                "condition": condition,
                "assignment": assignment
            })
            chunks.append(chunk)

    # Helper function to process for-loops recursively
    def process_for_loop(for_loop, trigger=""):
        # Extract for-loop details
        initialization = "; ".join(
            f"{init['lhs']} = {init['rhs']}" for init in for_loop.get("initialization", [])
        )
        condition = for_loop.get("condition", "None")
        increment = "; ".join(
            f"{inc['lhs']} = {inc['rhs']}" for inc in for_loop.get("increment", [])
        )

        # Create a chunk summarizing the for-loop
        loop_summary = json.dumps({
            "trigger": trigger,
            "type": "for_loop",
            "initialization": initialization,
            "condition": condition,
            "increment": increment
        })
        chunks.append(loop_summary)

        # Process the body of the loop
        for body_element in for_loop.get("body", []):
            if isinstance(body_element, list):  # Body is a nested list
                for element in body_element:
                    if "condition" in element:  # Handle if-like structures
                        nested_condition = element.get("condition", "None")
                        assignments = element.get("assignments", [])
                        process_assignments(assignments, trigger, nested_condition)
                    elif element.get("type") == "for_loop":  # Handle nested for-loops
                        process_for_loop(element, trigger)
    def process_case_statement(case_statement, trigger=""):
        case_expr = case_statement.get("case_expr", "Unknown Expression")
    
        for case in case_statement.get("cases", []):
            conditions = " | ".join(case.get("conditions", []))
            for statement in case.get("statements", []):
                if statement.get("type") == "assignment":
                    assignment = {
                        "lhs": statement.get("lhs", "Unknown LHS"),
                        "rhs": statement.get("rhs", "Unknown RHS")
                    }
                    chunk = json.dumps({
                        "trigger": trigger,
                        "condition": conditions,
                        "case_expr": case_expr,
                        "assignment": assignment
                    })
                    chunks.append(chunk)

    # Process always blocks
    for always_block in json_data.get("always_blocks", []):
        triggers = ", ".join(always_block.get("triggers", []))
        for logic in always_block.get("logic", []):
            if logic.get("type") == "for_loop":
                process_for_loop(logic, trigger=triggers)
            elif logic.get("type") == "case_statement":
                process_case_statement(logic, trigger=triggers)
            elif "condition" in logic and "assignments" in logic:
                condition = logic.get("condition", "None")
                assignments = logic.get("assignments", [])
                process_assignments(assignments, triggers, condition)

    # Process assign statements
    for assign in json_data.get("assign_statements", []):
        chunk = json.dumps(assign)
        chunks.append(chunk)

    # Process module instantiations
    for module_inst in json_data.get("module_instantiations", []):
        chunk = json.dumps(module_inst)
        chunks.append(chunk)

    return chunks
def embed_text(chunks, model_name=EMBEDDING_MODEL_NAME):
    """
    Embed the text chunks using the HuggingFace embedding model.
    """
    # Initialize embeddings model
    embeddings = HuggingFaceEmbeddings(model_name=model_name)

    # Create a vector store using DocArrayInMemorySearch
    vector_store = DocArrayInMemorySearch.from_texts(chunks, embedding=embeddings)
    return vector_store

In [166]:
# Creating the vector store
import json
import difflib

def find_fuzzy_trace_path(filepath, signal_query, module_query, cutoff=0.6):
    with open(filepath, 'r') as f:
        signals = json.load(f)
    # Just load the whole JSON to signals
    # Build lookup keys from (signal, module) pairs
    key_map = { (entry.get("signal", ""), entry.get("module", "")): entry for entry in signals }
    keys = list(key_map.keys())

    # Create formatted strings for fuzzy matching
    formatted_keys = [f"{sig}::{mod}" for sig, mod in keys]
    query_string = f"{signal_query}::{module_query}"

    matches = difflib.get_close_matches(query_string, formatted_keys, n=1, cutoff=cutoff)

    if matches:
        matched_sig, matched_mod = matches[0].split("::")
        matched_entry = key_map.get((matched_sig, matched_mod))
        return matched_entry.get("trace_path", [])

    return "Trace path not found."


In [167]:
def recursive_trace_json(node, depth=0, visited=None, max_depth=4, history=None, flatten_parts=None, decoder_stage=None):
    if visited is None:
        visited = set()
    if history is None:
        history = []
    if flatten_parts is None:
        flatten_parts = []
    if decoder_stage is None:
        decoder_stage = {"found": False}  # Use a dict to hold state
    if decoder_stage["found"]:
        return history, "DEPTH_LIMIT"    
    if node is None or depth > max_depth:
        # print("  " * depth + f"Max depth {max_depth} reached.")
        return history, "DEPTH_LIMIT"
    indent = "  " * depth
        
    for i, child in enumerate(node.children()):
        if node.__class__.__name__ == "DFBranch" and i == 1:
            # print(f"{indent} Skipping tracing of condition: {child.tocode()}")
            continue
        if hasattr(child, "tocode"):
            recursive_trace_json(child, depth + 1, visited, max_depth, history, flatten_parts, decoder_stage=decoder_stage)
        
    
    # Leaf node reached    
    if len(node.children()) == 0 and hasattr(node, "name"):
        leaf_signal = str(node.name)
        if leaf_signal in visited:
            return history, None
        visited.add(leaf_signal)

        # Check binddict for drivers of this leaf signal
        for k in binddict:
            if str(k).endswith(leaf_signal) and not "atomic" in str(k) and not "fpu" in str(k):
                module = str(k).split('.')[-2]
                for expr in binddict[k]:
                    if expr.tree:
                        expr_str = expr.tree.tocode()
                        if module == "mor1kx_decode":
                            decoder_stage["found"] = True  # Update mutable container
                        # Add to structured history
                        history.append({
                            "module": module,
                            "assign": f"{leaf_signal} = {expr_str}"
                        })
                        flatten_parts.append((leaf_signal, expr_str))
                        # Recurse on the newly found expression tree
                        recursive_trace_json(expr.tree, depth + 1, visited, max_depth, history, decoder_stage=decoder_stage)
                        
                    else:
                        history.append({
                            "module": module,
                            "port_map": f"{leaf_signal} is a port (no logic in this module)"
                        })
                    
    return history, None

In [168]:
from pyverilog.vparser.parser import parse
from pyverilog.dataflow.dataflow_analyzer import VerilogDataflowAnalyzer
from pyverilog.utils.scope import ScopeChain
import json

In [169]:
import json
import re

# Load the JSON file
# with open("HIGH_LEVEL_EVENTS_CTRL.json", "r") as f:
#     data = json.load(f)

# data = trace
top_module = "mor1kx_cpu_cappuccino"

# Simplify signal names: remove top-level prefixes and collapse hierarchy
def simplify_signal(signal):
    signal = signal.replace(top_module + "_", "")
    signal = signal.replace(top_module + ".", "")
    parts = signal.split(".")
    if len(parts) > 1:
        last_module = parts[-2]
        signal_name = parts[-1]
        return f"{last_module}.{signal_name}"
    return signal

# Replace logical operators in expressions
def replace_logical_operators(expr):
    expr = expr.replace("|", " OR ")
    expr = expr.replace("&", " AND ")
    expr = expr.replace("~", " NOT ")
    expr = expr.replace("!", " NOT ")
    return expr

# Simplify each token in the RHS expression (handles OR, AND, NOT)
def simplify_rhs_expression(rhs):
    tokens = re.split(r'(\bOR\b|\bAND\b|\bNOT\b|\(|\))', rhs)
    simplified_tokens = [
        simplify_signal(token.strip()) if re.match(r'\w[\w\d_.]*', token.strip()) else token
        for token in tokens
    ]
    return ' '.join(filter(None, simplified_tokens))
def strip_outer_parentheses(expr):
    expr = expr.strip()
    while expr.startswith('(') and expr.endswith(')'):
        # Check if parentheses are balanced
        level = 0
        balanced = True
        for idx, char in enumerate(expr):
            if char == '(':
                level += 1
            elif char == ')':
                level -= 1
            if level == 0 and idx != len(expr)-1:
                balanced = False
                break
        if balanced:
            expr = expr[1:-1].strip()
        else:
            break
    return expr

# Recursively format ternary expressions into IF-THEN-ELSE blocks
def format_ternary(expr, indent=0):
    expr = strip_outer_parentheses(expr)  # Add this!
    spaces = "  " * indent

    # Base case: if no ternary detected
    if '?' not in expr or ':' not in expr:
        return f"{spaces}{simplify_rhs_expression(expr)}"

    # Track parentheses nesting, find top-level '?' and matching ':'
    level = 0
    qmark_idx, colon_idx = None, None
    for idx, char in enumerate(expr):
        if char == '(':
            level += 1
        elif char == ')':
            level -= 1
        elif char == '?' and level == 0 and qmark_idx is None:
            qmark_idx = idx
        elif char == ':' and level == 0 and qmark_idx is not None:
            colon_idx = idx
            break

    # If no top-level '?' and ':' found, fallback to base case
    if qmark_idx is None or colon_idx is None:
        return f"{spaces}{simplify_rhs_expression(expr)}"

    # Extract condition, true_expr, false_expr
    condition = expr[:qmark_idx].strip("() ")
    true_expr = expr[qmark_idx+1:colon_idx].strip()
    false_expr = expr[colon_idx+1:].strip()

    # Format recursively
    return (
        f"{spaces}IF {simplify_signal(condition)} == TRUE THEN\n"
        f"{format_ternary(true_expr, indent+1)}\n"
        f"{spaces}ELSE\n"
        f"{format_ternary(false_expr, indent+1)}"
    )



# Split true/false parts in a ternary expression
def split_ternary_parts(remaining):
    level, idx = 0, None
    for i, char in enumerate(remaining):
        if char == '(':
            level += 1
        elif char == ')':
            level -= 1
        elif char == ':' and level == 0:
            idx = i
            break
    true_expr = remaining[:idx].strip()
    false_expr = remaining[idx+1:].strip()
    return true_expr, false_expr

# Process and output
# target_signal = "ctrl_flag_clear"
# target_module = "mor1kx_ctrl_cappuccino"

# for entry in data:
#     if entry["signal"] == target_signal and entry["module"] == target_module:
#         print(f"Net name: {target_signal}")
#         print(f"Found in module: atomic_flag_clear")
#         print(f"Top level module: {top_module}")
#         print("Logical Tracing tree:")
# for step in entry["trace_path"]:
def generate_tracing_tree(data):
    output = []  # Collect all lines here
    for step in data:
        module = simplify_signal(step["module"])    
        assign_expr = step["assign"]
        lhs, rhs = map(str.strip, assign_expr.split('=', 1))
        lhs = simplify_signal(lhs)
        rhs = replace_logical_operators(rhs)
        if '?' in rhs and ':' in rhs:
            formatted_rhs = format_ternary(rhs)
            formatted_rhs = replace_logical_operators(formatted_rhs)  # Replace AFTER ternary formatting
            # print(f"In module {module},\n{lhs} =\n{formatted_rhs}")
            output.append(f"In module {module},\n{lhs} =\n{formatted_rhs}")

        else:
            simplified_rhs = simplify_rhs_expression(rhs)
            # print(f"In module {module},\n{lhs} = {simplified_rhs}")
            output.append(f"In module {module},\n{lhs} = {simplified_rhs}")
    return "\n".join(output)  # Combine all lines into a single string


### semantic search

In [257]:
#def semantic_search_with_history(vector_store=None, queries, hiers, target_matching, llm):
import time
import re
pattern = re.compile(r"'total_tokens':\s*(\d+)")
def semantic_search_with_history(queries, hiers, llm, filepath, module, file_path):

    """
    Perform semantic search to retrieve relevant specification chunks for a query.
    """

    # def get_relevant_docs(query):
    #     retrieved_docs = vector_store.similarity_search(query, k=target_matching)
    #     # print("RD: ", retrieved_docs)
    #     if not retrieved_docs:
    #         return None
    #     # Filter documents to only include those containing "ibus_adr"
    #     filtered_docs = [
    #         doc for doc in retrieved_docs if query in doc.page_content
    #     ]
    #     if not filtered_docs:
    #         print("No logical expressions matched with the query net => ", query)
    #         return None
    #     # Combine all retrieved document contents into a single context
    #     combined_docs = "\n\n".join(doc.page_content for doc in filtered_docs)
    #     # print("CD: ", combined_docs)
    #     return combined_docs
    with open(file_path, "w", encoding="utf-8") as file:
        FINAL_TEMPLATE = """
        1. Role Assignment: 
        • You are an expert in computer architecture, specializing in abstracting hardware signal traces into high-level 
        architectural behaviors related to instruction execution.
    
        2. Context:
        • The trace comes from an OpenRISC-1000 implementation, but your output must be ISA- and core-agnostic (valid for 
        OR1K, RISC-V, ARM, x86).
        • Architectural *event* = a high-level phenomenon visible to ISA software (e.g. “pipeline stall on operand hazard”).
        • Ignore micro-architectural configuration options except In-Order execution (e.g. Atomic).
        3. Purpose:
        • Translate an OpenRISC-1000 RTL trace into an architecture-agnostic event description that software engineers can 
        use to craft C test programs which will trigger that architecture-agnostic event.
        4. Rule:
        ──────────────────────── CONTRACT START ──────────────────────
        ANALYSIS LADDER  (follow in order, no omissions)
        [L1] **Signal Normalization**  – simplify expression, fold constants, strip feature macros, trim widths 
        [L2] **Dependency Chase**      – list root signals that directly gate the target signal and their boolean relation  
        [L3] **Micro-Arch Role**       – describe what this signal is trying to achieve in the processor not how the implementation wires it.  
        [L4] **Architectural Event**   – translate L3 and net's purpose into a phrase in ISA-level terms
        [L5] **Test-Stimulus Hints**   – How software (C code with operations like arithmatic, memory access, flag-setting etc.) can provoke L4. 
        [L6] **Grouping Instructions** – Group related instructions into categories that influence this signal.
        
        MANDATORY QUESTIONS TO ANSWER
        Net: <signal_name>
        [Q1] High-Level Event: <one short sentence>
        [Q2] Logical Summary & Reasoning:<combining L2+L3>
        [Q3] Test-Stimulus Guidance:<L5>
        [Q4] Instruction Categories:<The types of relevant instructions that influence this signal and can be grouped into categories>
        GUIDELINES
        • Do **not** mention RTL module names, internal signal names (other than the one in “Net:”)
        • Identify the first pipeline stage where the net can influence architectural state
          and work from that perspective.  
        • Be as detailed or as brief as you feel appropriate—no word-count limits apply.
        ──────────────────────── CONTRACT END ────────────────────────
        5. Output exactly these five labelled sections. Anything other than this 5 labels - submission is rejected
        Net: <signal_name>
        
        High-Level Event: <text>
        
        Logical Summary & Reasoning:<text>
        
        Test-Stimulus Guidance:<text>
        
        Instruction Categories:<text>
    
        
        Again **DO NOT disclose and low level RTL details**.Breaking any clause and this rule ⇒ the submission is rejected automatically.
        """
        class InMemoryHistory(BaseChatMessageHistory, BaseModel):
            """In memory implementation of chat message history."""
    
            messages: List[BaseMessage] = Field(default_factory=list)
    
            def add_message(self, message: BaseMessage) -> None:
                """Add a self-created message to the store"""
                self.messages.append(message)
    
            def clear(self) -> None:
                self.messages = []
    
        store = {}
    
        def get_session_history(user_id: str, conversation_id: str) -> BaseChatMessageHistory:
            if (user_id, conversation_id) not in store:
                store[(user_id, conversation_id)] = InMemoryHistory()
            return store[(user_id, conversation_id)]
    
    
        prompt = ChatPromptTemplate.from_messages(
            [
                ("system", "{instruction}\n\n"),
                # MessagesPlaceholder(variable_name="history"), # changed by MHS 03/06
                # ("ai", "{last_message}"), # inject last message here
                ("human", "Net name: {question}\n\nModule Hierarchy:{hier}\n\nJSON text fomat:\n{doc}\n\n"),
            ]
        )
        # def debug_prompt(input_data):
        #     formatted_prompt = prompt.invoke(input_data)
        #     # print("Prompt Sent to LLM:\n", formatted_prompt)
        #     # print("\n================Sent=================")
        #     return formatted_prompt
    
        chain = prompt | llm
    
        with_message_history = RunnableWithMessageHistory(
            chain,
            get_session_history=get_session_history,
            input_messages_key="question",
            history_messages_key="history",
            history_factory_config=[
                ConfigurableFieldSpec(
                    id="user_id",
                    annotation=str,
                    name="User ID",
                    description="Unique identifier for the user.",
                    default="",
                    is_shared=True,
                ),
                ConfigurableFieldSpec(
                    id="conversation_id",
                    annotation=str,
                    name="Conversation ID",
                    description="Unique identifier for the conversation.",
                    default="",
                    is_shared=True,
                ),
            ],
        )
    
        results = {}
        query = queries[0]
        hier = hiers[0]
        user_id = "050725"
        conversation_id = "050725_cid"
        trace = find_fuzzy_trace_path(filepath, queries[0], module)
        doc = generate_tracing_tree(trace)
        # print("============= DOC ================")
        # print(doc)
        # print("==================================")
        #doc = get_relevant_docs_json(queries[0])
        last_message = ""
        result = with_message_history.invoke(
            #{"instruction": SUB_INS, "doc": doc, "question": query, "hier": hier, "last_message": last_message},
            {"instruction": FINAL_TEMPLATE, "doc": doc, "question": query, "hier": hier},
            config={
                "configurable": {"user_id": user_id, "conversation_id": conversation_id}
            },
        )
        results[query] = [result.content]
        results[query].append(result.usage_metadata)
        print("High level event:\n", result.content)
        print("Token Count:\n", result.usage_metadata)
        file.write(result.content + "\n\n")
        total = 0
        match = pattern.search(str(result.usage_metadata))
        if match:
            total += int(match.group(1))
            
        for i in range(1, len(queries)):
            time.sleep(10)
            query = queries[i]
            hier = hiers[i]
            trace = find_fuzzy_trace_path(filepath, queries[i], module)
            doc = generate_tracing_tree(trace)
            # doc = get_relevant_docs_json(queries[i])
            history = get_session_history(user_id, conversation_id)
            last_message = history.messages[-1].content if history.messages else ""
            last_message = ""
            # print("Last messages: ======> ", last_message)
            # formatted_prompt = prompt.format_prompt(
            #     instruction=SORT_INS,
            #     doc=doc,
            #     hier=hier,
            #     question=query,
            #     last_message=last_message
            # )
            # print(formatted_prompt.to_messages())  # See each message role/content
            result = with_message_history.invoke(
                {"instruction": FINAL_TEMPLATE,"doc": doc, "question": query, "hier": hier},
                #{"instruction": SUB_INS,"doc": doc, "question": query, "hier": hier, "last_message": last_message},
                config={
                    "configurable": {"user_id": user_id, "conversation_id": conversation_id}
                },
            )
            results[query] = [result.content]
            results[query].append(result.usage_metadata)
            print("High level event:\n", result.content)
            print("Token Count:\n", result.usage_metadata)
            file.write(result.content + "\n\n")
            match = pattern.search(str(result.usage_metadata))
            if match:
                total += int(match.group(1))
        
    print("Toal Token Count:\n", total)
    return results, store

### Response

In [328]:
import json
verilog_file = []
verilog_files = [
    "mor1kx-defines.v",
    "mor1kx-sprs.v",
    "mor1kx_fetch_cappuccino.v",  # Top-level module
    "mor1kx_icache.v",
    "mor1kx_immu.v",
    "mor1kx_simple_dpram_sclk.v",
    "mor1kx_true_dpram_sclk.v",
    "mor1kx_store_buffer.v",
    "mor1kx_cache_lru.v",
]
verilog_files_decoder = [
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_decode.v"
    ]
verilog_files_ctrl = [
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_ctrl_cappuccino.v"
]

filelist = [
    # "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx-defines.v",
    # "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx-sprs.v",
    # "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_utils.vh",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_cpu_cappuccino.v", # Top-level module
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_fetch_cappuccino.v",  
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_icache.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_dcache.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_immu.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_dmmu.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_simple_dpram_sclk.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_true_dpram_sclk.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_store_buffer.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_cache_lru.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_decode.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_decode_execute_cappuccino.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_branch_prediction.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_branch_predictor_simple.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_execute_alu.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_lsu_cappuccino.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_wb_mux_cappuccino.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_rf_cappuccino.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_execute_ctrl_cappuccino.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_ctrl_cappuccino.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_cfgrs.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_pic.v",
    "/home/m588h354/projects/Rare_net_analysis-repo/event_identification/mor1kx_verilogs/mor1kx_ticktimer.v",
    
]
topmodule = 'mor1kx_cpu_cappuccino'
# === STEP 1: Parse and Build Analyzer ===
analyzer = VerilogDataflowAnalyzer(filelist, topmodule)
analyzer.generate()
terms = analyzer.getTerms()
binddict = analyzer.getBinddict()

########### Previous Way #################################
# always_blocks = parse_verilog_always_blocks(verilog_files_ctrl)
# # Specify the output JSON file path
# output_file = "always_blocks_ctrl.json"
# # print(json.dumps(always_blocks, indent=2))
# # Write the data to a JSON file
# with open(output_file, "w") as json_file:
#     json.dump(always_blocks, json_file, indent=2)

# print(f"Always blocks saved to {output_file}")

Generating LALR tables


In [None]:
# Load the JSON file
# with open(output_file, "r") as file:
#     verilog_json = json.load(file)  # Load the JSON content as a Python dictionary
# chunks = preprocess_json_for_embedding(verilog_json)
# # Write each chunk on a separate line

# with open("always_blocks_chunks.txt", "w") as file:
#     for chunk in chunks:
#         file.write(chunk + "\n")
# vector_store = embed_text(chunks, model_name=EMBEDDING_MODEL_NAME)
for key in binddict.keys():
    if "mor1kx_lsu_cappuccino" in str(key):
        print(key)

In [266]:
from collections import Counter

# Read the file and parse lines
with open('/home/m588h354/projects/autophasew/openrisc/src/vcd_texts/lsu.txt', 'r') as f:
    lines = f.readlines()

# Extract the net names (3rd column)
net_names = [line.strip().split(None, 2)[2] for line in lines if line.strip()]

# Count occurrences
net_counts = Counter(net_names)

# Sort nets alphabetically
sorted_nets = sorted(net_counts.items())

# Write to output file
with open('/home/m588h354/projects/Rare_net_analysis-repo/event_identification/all_nets_lsu.txt', 'w') as f:
    for net, count in sorted_nets:
        f.write(f"{net} {count}\n")


In [351]:
def extract_net_names(file_path, start_line, end_line):
    net_list = []
    module_hier = []
    with open(file_path, "r", encoding="utf-8") as file:
        for i, line in enumerate(file, start=1):
            if start_line <= i <= end_line:
                parts = line.split()
                if parts:
                    net = parts[0]
                    net_parts = net.split(".")
                    module = ".".join(net_parts[:-1])
                    module = module.replace(".", "->")
                    net_name = net.split(".")[-1].split("[")[0]
                    net_list.append(net_name)
                    module_hier.append(module)
            elif i > end_line:
                break
    
    return net_list, module_hier
nets, module_hier = extract_net_names("./all_nets_lsu.txt", 60, 90)
#nets, module_hier = extract_net_names("./fetch_nets_rareness.txt", 1, 2)
print(nets)
print(module_hier)

['cpu_err_o', 'cpu_req_i', 'cpu_we_i', 'current_lru', 'current_lru_history', 'dc_access_i', 'dc_dbus_err_i', 'dc_enable_i', 'hit', 'invalidate', 'invalidate_ack', 'invalidate_adr', 'next_lru_history', 'next_refill_adr', 'read', 'refill', 'refill_adr_i', 'refill_allowed_i', 'refill_dat_i', 'refill_done', 'refill_hit', 'refill_req_o', 'refill_valid', 'refill_we_i', 'snoop_adr_i', 'snoop_check', 'snoop_check_way_match(0)', 'snoop_check_way_match(1)', 'snoop_check_way_tag(0)', 'snoop_check_way_tag(1)', 'snoop_check_way_valid(0)']
['TOP->orpsoc_top->gencpu->mor1kx0->mor1kx_cpu->cappuccino->mor1kx_cpu->mor1kx_lsu_cappuccino->dcache_gen->mor1kx_dcache', 'TOP->orpsoc_top->gencpu->mor1kx0->mor1kx_cpu->cappuccino->mor1kx_cpu->mor1kx_lsu_cappuccino->dcache_gen->mor1kx_dcache', 'TOP->orpsoc_top->gencpu->mor1kx0->mor1kx_cpu->cappuccino->mor1kx_cpu->mor1kx_lsu_cappuccino->dcache_gen->mor1kx_dcache', 'TOP->orpsoc_top->gencpu->mor1kx0->mor1kx_cpu->cappuccino->mor1kx_cpu->mor1kx_lsu_cappuccino->dcache_

In [352]:
target_signal_list = []
nets_to_remove = []
def find_signal_key(target_signal, module):
    # print(f"T_S: {target_signal}, and MODULE {module}")
    for key in binddict.keys():
        # if target_signal in str(key).split('.')[-1]:
        #     print(f"================> : {str(key)}")
        if target_signal == str(key).split('.')[-1] and str(key).split('.')[1] == module:
            print(f"Matched: {str(key)}")
            return key
    return None
full_output = []
# signal_in_module = 'mor1kx_ctrl_cappuccino'
signal_in_module = 'mor1kx_lsu_cappuccino'
# net_count = 
with open("HIGH_LEVEL_EVENTS_LSU.json", "w") as f:
    # Signal to trace
    for net in nets:
        signal_to_trace = net
        scoped_key = find_signal_key(signal_to_trace, signal_in_module)
        if scoped_key is None:
            print(f"Net {signal_to_trace} is not in the AST!")
            nets_to_remove.append(signal_to_trace)
        else:
            i = scoped_key
            # for i in scoped_key:
            tree = binddict[i][0].tree
            top_module = str(i).split('.')[-2]
            history = [{
                "module": top_module,
                "assign": f"{signal_to_trace} = {tree.tocode()}"
            }]
            # print(history)
            trace_history, final_expr = recursive_trace_json(tree, max_depth=10, visited=set(), history=history)
            full_output.append({
                "signal": signal_to_trace,
                "module": top_module,
                "trace_path": trace_history # it adds as a whole
            })
    json.dump(full_output, f, indent=2)
    print("JSON written to HIGH_LEVEL_EVENTS_LSU.json")

Matched: mor1kx_cpu_cappuccino.mor1kx_lsu_cappuccino.md_generate17.ge_if21.dcache_gen.mor1kx_dcache.cpu_err_o
Matched: mor1kx_cpu_cappuccino.mor1kx_lsu_cappuccino.md_generate17.ge_if21.dcache_gen.mor1kx_dcache.cpu_req_i
Matched: mor1kx_cpu_cappuccino.mor1kx_lsu_cappuccino.md_generate17.ge_if21.dcache_gen.mor1kx_dcache.cpu_we_i
Net current_lru is not in the AST!
Matched: mor1kx_cpu_cappuccino.mor1kx_lsu_cappuccino.md_generate17.ge_if21.dcache_gen.mor1kx_dcache.current_lru_history
Matched: mor1kx_cpu_cappuccino.mor1kx_lsu_cappuccino.md_generate17.ge_if21.dcache_gen.mor1kx_dcache.dc_access_i
Matched: mor1kx_cpu_cappuccino.mor1kx_lsu_cappuccino.md_generate17.ge_if21.dcache_gen.mor1kx_dcache.dc_dbus_err_i
Matched: mor1kx_cpu_cappuccino.mor1kx_lsu_cappuccino.dc_enable_i
Matched: mor1kx_cpu_cappuccino.mor1kx_lsu_cappuccino.md_generate17.ge_if21.dcache_gen.mor1kx_dcache.hit
Matched: mor1kx_cpu_cappuccino.mor1kx_lsu_cappuccino.md_generate17.ge_if21.dcache_gen.mor1kx_dcache.invalidate
Matched: m

In [353]:
# 
filtered_net_list = []
for item in nets:
    if item not in nets_to_remove:
        filtered_net_list.append(item)
filtered_net_list

['cpu_err_o',
 'cpu_req_i',
 'cpu_we_i',
 'current_lru_history',
 'dc_access_i',
 'dc_dbus_err_i',
 'dc_enable_i',
 'hit',
 'invalidate',
 'invalidate_ack',
 'invalidate_adr',
 'next_refill_adr',
 'read',
 'refill',
 'refill_adr_i',
 'refill_allowed_i',
 'refill_dat_i',
 'refill_done',
 'refill_hit',
 'refill_req_o',
 'refill_valid',
 'refill_we_i',
 'snoop_adr_i',
 'snoop_check']

In [354]:
results, store = semantic_search_with_history(
                        #vector_store,
                         filtered_net_list,
                         module_hier,
                         # 10,
                         groq_chat,
                         "HIGH_LEVEL_EVENTS_LSU.json",
                         "mor1kx_lsu_cappuccino",
                         "./event_files/LLAMA_HIGH_LEVEL_EVENTS_LSU_60_90.txt"
                        )

# with open("./event_files/LLAMA_HIGH_LEVEL_EVENTS_ALU_61_90_tmp.txt", "w", encoding="utf-8") as file:
#     for v in results.values():
#         high_level_event = v[0]
#         print("High level event:\n", high_level_event)
#         print("Token Count:\n", v[1])

#         file.write(high_level_event + "\n\n")

High level event:
 Net: cpu_err_o

High-Level Event: Pipeline stall due to cache error or exception.

Logical Summary & Reasoning: The cpu_err_o signal is related to errors or exceptions occurring during the execution of instructions, specifically when accessing the cache. This signal is triggered when the processor encounters an issue while trying to fetch or store data, such as a cache miss, a protection violation, or an error in the cache hierarchy. The root cause of this signal can be attributed to the interaction between the processor's load/store unit and the cache, where the processor is attempting to access a memory location that is not valid or is protected. The boolean relation between the signals involved in this process can be described as a combination of cache access requests, memory protection checks, and error detection mechanisms.

Test-Stimulus Guidance: To provoke this signal, software engineers can write C test programs that perform memory accesses with varying patt

In [237]:
import re

def calculate_total_tokens(log_file_path):
    total = 0
    pattern = re.compile(r"'total_tokens':\s*(\d+)")
    
    with open(log_file_path, 'r') as file:
        for line in file:
            match = pattern.search(line)
            if match:
                total += int(match.group(1))
    
    return total

# Example usage
log_file_path = 'token_log.txt'  # Replace with your actual file path
total_tokens_sum = calculate_total_tokens(log_file_path)
print(f"Total 'total_tokens': {total_tokens_sum}")


Total 'total_tokens': 48525


In [355]:
import json
EVENT_FILE = "HIGH_LEVEL_EVENTS_LSU_60_90"
# def convert_text_to_json(input_file, output_file):
#     with open(input_file, 'r') as f:
#         content = f.read()

#     # Split the file into blocks (separated by two newlines)
#     blocks = content.strip().split('\n\n')
#     result = []

#     for block in blocks:
#         event = {}
#         lines = block.strip().split('\n')
#         for line in lines:
#             if line.startswith('Net:'):
#                 event['Net'] = line.split(':', 1)[1].strip()
#             elif line.startswith('High-Level Event:'):
#                 event['High-Level Event'] = line.split(':', 1)[1].strip()
#             elif line.startswith('High-Level Summary:'):
#                 event['Logical Summary'] = line.split(':', 1)[1].strip()
#             elif line.startswith('Reasoning:'):
#                 event['Reasoning'] = line.split(':', 1)[1].strip()
#         if event:
#             result.append(event)

#     # Write to JSON file
#     with open(output_file, 'w') as f:
#         json.dump(result, f, indent=2)

#     print(f"✅ Converted {len(result)} entries to JSON and saved to {output_file}")
def convert_text_to_json(input_file, output_file):

    with open(input_file, 'r') as f:
        content = f.read()

    # Split content by 'Net:' while keeping the delimiter
    blocks = content.split('Net:')[1:]  # Skip the first empty split

    result = []

    for block in blocks:
        lines = block.strip().split('\n')
        event = {'Net': lines[0].strip()}
        for idx, line in enumerate(lines[1:], start=1):
            if line.startswith('High-Level Event:'):
                event['High-Level Event'] = line.split(':', 1)[1].strip()
            elif line.startswith('Logical Summary & Reasoning:'):
                summary_lines = [line.split(':', 1)[1].strip()]
                # Capture multi-line summary
                for next_line in lines[idx+1:]:
                    if next_line.startswith('Test-Stimulus Guidance:'):
                        break
                    summary_lines.append(next_line.strip())
                event['Logical Summary & Reasoning'] = ' '.join(summary_lines).strip()
                # break  # No need to process further lines
            elif line.startswith('Test-Stimulus Guidance:'):
                summary_lines = [line.split(':', 1)[1].strip()]
                # Capture multi-line summary
                for next_line in lines[idx+1:]:
                    if next_line.startswith('Instruction Categories:'):
                        break
                    summary_lines.append(next_line.strip())
                event['Test-Stimulus Guidance'] = ' '.join(summary_lines).strip()
            elif line.startswith('Instruction Categories:'):
                summary_lines = [line.split(':', 1)[1].strip()]
                # Capture multi-line summary
                for next_line in lines[idx+1:]:
                    if next_line.startswith('Net:'):
                        break
                    summary_lines.append(next_line.strip())
                event['Instruction Categories'] = ' '.join(summary_lines).strip()
                break
        result.append(event)

    with open(output_file, 'w') as f:
        json.dump(result, f, indent=2)
    print(f"✅ Converted {len(result)} entries to JSON and saved to {output_file}")
# Example usage
convert_text_to_json(f'./event_files/{EVENT_FILE}.txt', f'./event_files/{EVENT_FILE}.json')

# import csv

# def json_to_libreoffice_csv(json_file, csv_file):
#     with open(json_file, 'r') as f:
#         data = json.load(f)

#     # Open CSV writer
#     with open(csv_file, 'w', newline='', encoding='utf-8') as f:
#         writer = csv.writer(f)
#         # Write headers
#         writer.writerow(['High-Level Event', 'Net', 'Triggered?', 'Comments'])

#         # Write data rows
#         for item in data:
#             writer.writerow([
#                 item.get('High-Level Event', ''),
#                 item.get('Net', ''),
#                 '',  # Empty Triggered?
#                 ''   # Empty Comments
#             ])

#     print(f"✅ CSV saved to '{csv_file}' with {len(data)} entries.")

# # Example usage
# json_to_libreoffice_csv(f'./event_files/{EVENT_FILE}.json', f'./event_files/{EVENT_FILE}.csv')



✅ Converted 24 entries to JSON and saved to ./event_files/HIGH_LEVEL_EVENTS_LSU_60_90.json


### Creating knowledge base for a given set of nets

In [None]:
# for key,value in store.items():
#     print(f"{key}: {value}")
#     print()
#     print()
for i,v in store.items():
    print(v)

In [None]:
import os
import time
# Function to perform semantic search and format the output
def search_and_save(vector_store, net_name, hier, rareness_value, num_results, llm, output_file):
    # Perform semantic search
    result = semantic_search(vector_store, net_name, hier, num_results, llm)
    if result is not None:
        formatted_output = format_output(result)
        
        # Add the event rareness section with the value from the text file
        formatted_output += f"\nEvent Rareness: The net '{net_name}' has a rareness value of {rareness_value}.\n"
        
        # Append to the output file
        with open(output_file, "a") as f:
            # f.write(f"Net Name: {net_name}\n")
            f.write(formatted_output)
            f.write("\n" + "-"*80 + "\n")
    else:
        print("Net ", net_name, " has no logical expression in JSON!")

# Main processing loop
def process_nets_file(input_file, vector_store, num_results, llm, output_file):
    if os.path.exists(output_file):
        os.remove(output_file)  # Clear the file if it already exists

    with open(input_file, "r") as f:
        for line in f:
            # Split each line into net name and rareness value
            columns = line.strip().split()
            if len(columns) >= 2:  # Ensure both net name and rareness value are present
                parts = columns[0].split('.')
                net_name = parts[-1]
                module_hier = '->'.join(parts[:-1])
                rareness_value = columns[1]
                search_and_save(vector_store, net_name, module_hier, rareness_value, num_results, llm, output_file)
                # time.sleep(20)

# Input and output file paths
input_file = "mor1kx_shortlisted_tmp.txt"
output_file = "semantic_search_results_openAI.txt"

# Call the processing function
process_nets_file(input_file, vector_store, 10, groq_chat, output_file)

print(f"Processing complete. Results saved to {output_file}.")


### Creating Test program process Using OpenAI API Assistant

In [None]:
from typing_extensions import override
from openai import AssistantEventHandler, OpenAI

class EventHandler(AssistantEventHandler):
    @override
    # Triggered whenever the assistant generated a chunk of text during the streaming process
    # It prints assistant > to the console and flushes the output to ensure it appears immediately
    def on_text_created(self, text) -> None:
        print(f"\nassistant > ", end="", flush=True)

    @override
    # Triggered when the assistant decides to call a "tool." In the beta Assistant framework tools
    # might be something like a search, a calculator, or a file reference
    # It prints a line indicating the assistant is calling a certain tool type. For example, if the 
    # assistant calls a file search tool, you might see assistant > file search
    def on_tool_call_created(self, tool_call):
        print(f"\nassistant >> {tool_call.type}\n", flush=True)

    @override
    # This is triggered when the entire message from the assistant finished. In other words, once all chunks
    # of text have been streamed and compiled into a final message
    def on_message_done(self, message) -> None:
        # print a citation to the file searched
        message_content = message.content[0].text # Extracts message content
        annotations = message_content.annotations
        citations = []
        for index, annotation in enumerate(annotations):
            message_content.value = message_content.value.replace(
                annotation.text, f"[{index}]"
            )
            if file_citation := getattr(annotation, "file_citation", None):
                cited_file = client.files.retrieve(file_citation.file_id)
                citations.append(f"[{index}] {cited_file.filename}")
        print(message_content.value)
        print("\n".join(citations))

# Then, we use the stream SDK helper
# with the EventHandler class to create the Run
# and stream the response.

with client.beta.threads.runs.stream(
    thread_id=thread.id,
    assistant_id=assistant.id,
    # instructions="Please address the user as Shuvo. The user has a premium account.",
    event_handler=EventHandler(),
) as stream:
    stream.until_done()

In [None]:
!pip install replicate

In [None]:
!export REPLICATE_API_TOKEN=

In [None]:
import replicate

In [None]:
output = replicate.run(
    "meta/codellama-70b-instruct:a279116fe47a0f65701a8817188601e2fe8f4b9e04a518789655ea7b995851bf",
    input={
        "top_k": 10,
        "top_p": 0.95,
        "prompt": "In Bash, how do I list all text files in the current directory (excluding subdirectories) that have been modified in the last month?",
        "max_tokens": 500,
        "temperature": 0.8,
        "system_prompt": "",
        "repeat_penalty": 1.1,
        "presence_penalty": 0,
        "frequency_penalty": 0
    }
)

# The meta/codellama-70b-instruct model can stream output as it's running.
# The predict method returns an iterator, and you can iterate over that output.
for item in output:
    # https://replicate.com/meta/codellama-70b-instruct/api#output-schema
    print(item, end="")

In [106]:
def find_net_for_event(file_path, target_event):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    result = None
    for i in range(1, len(lines)):
        line = lines[i].strip()
        if line.startswith("High-Level Event:") and target_event in line:
            prev_line = lines[i - 1].strip()
            # print(f"Previous line: {prev_line}")
            if prev_line.startswith("Net:"):
                result = prev_line.replace("Net: ", "").strip()
                break
    return result

In [107]:
project_root = "/home/m588h354/projects/autophasew/openrisc/src"
event_root = "/home/m588h354/projects/Rare_net_analysis-repo/event_identification"
stage = "ctrl"
hle_file_name = "HIGH_LEVEL_EVENTS_CTRL_11_30"
arch_event_dir = f"{project_root}/architectural_events/{stage}_events"
event_details_file = f"{event_root}/event_files/{hle_file_name}.txt"
json_file_retrieve = f"{event_root}/event_files/{hle_file_name}.json"
event = 'Conditional flag clearing during instruction execution'
net_name  = find_net_for_event(event_details_file, event)
print(f"Net name for event '{event}': {net_name}")

Net name for event 'Conditional flag clearing during instruction execution': ctrl_flag_clear
