In [1]:
import json
with open("merged_requirements.json","r") as f:
    requirements=json.load(f)
req_ids=[]
for req in requirements:
    req_ids.append(req['req_id'])

In [2]:
def trace_full_upstream_iterative(req_id,requirements):
    """
    Iteratively trace all upstream requirements based on inputs/outputs
    until reaching requirements with no upstream.
    """
    req_index = {req['req_id']: idx for idx, req in enumerate(requirements)}
    if req_id not in req_index:
        return []

    traced = set()           # store all upstream req_ids
    to_check = [req_id]      # stack/queue for iterative processing

    while to_check:
        current_id = to_check.pop()  # take one req_id
        current_idx = req_index[current_id]
        current_inputs = set(requirements[current_idx]['inputs'])

        # Check all previous requirements
        for r in requirements[:current_idx]:
            r_outputs = set(r['outputs'])
            if current_inputs.intersection(r_outputs):
                if r['req_id'] not in traced:
                    traced.add(r['req_id'])
                    to_check.append(r['req_id'])  # add to stack to process its upstream

    traced.discard(req_id)  # remove starting requirement if needed
    return list(traced)



In [3]:
def find_downstream_outputs_only(requirements, start_req_id):
    """
    Trace downstream requirements starting from a given req_id.
    Downstream = any later requirement that uses the current outputs as inputs.
    """

    # Find starting requirement index and outputs
    start_index = None
    start_outputs = []
    for i, req in enumerate(requirements):
        if req["req_id"] == start_req_id:
            start_index = i
            start_outputs = req.get("outputs", [])
            break

    if start_index is None:
        return []  # req_id not found

    downstream = []
    to_trace = set(start_outputs)

    # Walk forward through requirements
    for j in range(start_index + 1, len(requirements)):
        req = requirements[j]
        inputs = set(req.get("inputs", []))

        # If requirement consumes any of the current outputs
        if to_trace & inputs:
            downstream.append(req["req_id"])
            # Expand trace set with this requirement's outputs
            to_trace |= set(req.get("outputs", []))

    return downstream

In [4]:
def flatten_list(lst):
    flat = []
    for item in lst:
        if isinstance(item, list):
            flat.extend(flatten_list(item))
        else:
            flat.append(item)
    return flat
def reorder_list(reference, output):
    # Keep only items that exist in both, and sort according to reference order
    return [req for req in reference if req in output]

In [5]:
def get_upstream_downstream_ech_req(requirements):
    req_ids=[]
    for req in requirements:
        req_ids.append(req['req_id'])
    main_list=[]
    for id in req_ids:
        upstream_ids=[]
        upstream_ids=trace_full_upstream_iterative(id,requirements) 
        # downstream_ids=get_downstream(id,requirements)
        downstream_ids=find_downstream_outputs_only(requirements,id)
        upstream_ids.append(id)
        upstream_ids.append(downstream_ids)
        output=flatten_list(upstream_ids)
            # print(upstream_ids)
        reordered = reorder_list(req_ids, output)
        main_list.append({id:reordered})
    req_flat_dict = {list(d.keys())[0]: list(d.values())[0] for d in main_list}
    return req_flat_dict
req_flat_dict=get_upstream_downstream_ech_req(requirements)

In [6]:
import json
import faiss
from sentence_transformers import SentenceTransformer
import numpy as np

# Load requirements JSON file
def load_requirements(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)

# Prepare embeddings with content + inputs + outputs
def embed_requirements(requirements, model):
    texts = []
    for req in requirements:
        content = req.get("Content", "")
        inputs = ", ".join(req.get("inputs", []))
        outputs = ", ".join(req.get("outputs", []))
        actor=", ".join(req.get("actors", []))
        target=", ".join(req.get("targets", []))
        verbs=", ".join(req.get("verbs", []))
        nouns=", ".join(req.get("nouns", []))
        combined_text = f"Requirement: {content} | Inputs: {inputs} | Outputs: {outputs} | actor:{actor} | target:{target} | verbs:{verbs}| noun:{nouns}"
        texts.append(combined_text)
    embeddings = model.encode(texts, convert_to_numpy=True, show_progress_bar=True)
    return embeddings

# Build Faiss index
def build_faiss_index(embeddings):
    dim = embeddings.shape[1]
    index = faiss.IndexFlatL2(dim)  # L2 distance
    index.add(embeddings)
    return index

def save_index(index, path):
    faiss.write_index(index, path)

def save_mapping(requirements, path):
    id_map = [req["req_id"] for req in requirements]
    with open(path, "w", encoding="utf-8") as f:
        json.dump(id_map, f, indent=2)

def main():
    requirements = load_requirements("merged_requirements.json")

    model = SentenceTransformer('all-MiniLM-L6-v2')

    # Embed using content + inputs + outputs
    embeddings = embed_requirements(requirements, model)

    index = build_faiss_index(embeddings)

    save_index(index, "requirements.index")
    save_mapping(requirements, "id_map.json")

    print(f"Vector DB created with {len(requirements)} requirements using context-based embeddings.")

if __name__ == "__main__":
    main()


  from .autonotebook import tqdm as notebook_tqdm
Batches: 100%|██████████| 1/1 [00:00<00:00,  2.10it/s]

Vector DB created with 23 requirements using context-based embeddings.





In [7]:
from sentence_transformers import SentenceTransformer
def load_index(path="requirements.index"):
    return faiss.read_index(path)

def load_id_map(path="id_map.json"):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def query_vector_db(query_text, model, index, id_map, top_k=20):
    query_vec = model.encode([query_text], convert_to_numpy=True)
    distances, indices = index.search(query_vec, top_k)
    results = []
    for dist, idx in zip(distances[0], indices[0]):
        results.append({"req_id": id_map[idx], "distance": float(dist)})
    return results

model = SentenceTransformer('all-MiniLM-L6-v2')
index = load_index("requirements.index")
id_map = load_id_map("id_map.json")

query = "IF the theft alarm sound warning is available(AlarmSoundWarningAvailability = True)AND the horn is requested for theft alarm system(TheftAlarmStatus = Active Alarm State Horn)         THEN the warning audible signal shall be controlled at the frequency (HORN_ALARM_FREQ) with a (HORN_ALARM_DUTY_CYCLE) duty cycle.(SoundWarningCtrl = 100%)"
results = query_vector_db(query, model, index, id_map,top_k=2)
print(results)# 

[{'req_id': 'REQ-0939555 D', 'distance': 0.29784950613975525}, {'req_id': 'REQ-0986424 C', 'distance': 0.518606960773468}]


In [8]:
use_cases=["Horn Control","Sound Warning Management","Manual Sound Warning","Theft Alarm Sound Warning Control","Panic Mode Sound Warning Control","Thermal Runaway hazard sound warning Control","FOTA Update Sound Warning inhibition"]

In [9]:
usecase=[]
for use in use_cases:
    l=[]
    end_end_list=[]
    results = query_vector_db(use, model, index, id_map,top_k=2)
    l.append(results[0]['req_id'])
    # l.append(results[1]['req_id'])
    end_end_list.append(req_flat_dict[l[0]])
    # end_end_list.append(req_flat_dict[l[1]])
    end_end_list=flatten_list(end_end_list)
    reordered = reorder_list(req_ids, end_end_list)
    usecase_obj = {
        "usecase": use,
        "req_ids": reordered,
    }
    usecase.append(usecase_obj)

    

In [10]:
with open("requirements.json","r") as f:
    raw_requirments=json.load(f)

In [11]:
import pandas as pd
df = pd.read_csv("flow.csv")
df.head(5)
flow=df[df['Status']=="PLM Parameters"]['flowtitle'].values

In [12]:

def get_External_internal(var_name):
    match = df[df['flowtitle'] == var_name]
    
    if not match.empty:
        if (match['P/C'].iloc[0]=="P") or (match['P/C'].iloc[0]=="C"):
            var = f"{var_name}[{match['P/C'].iloc[0]}][{match['IJK'].iloc[0]}]"
        else:
            var=f"{var_name}[{match['P/C'].iloc[0]}]"
            # check if any row matched
        
    else:
        var = f"{var_name}[NA]"
    
    return var


In [13]:
def get_coverage_req_ids(result,raw_requirments):
    req=[]
    for us in result:
        re=us['Req_ids']
        for j in re:
            req.append(j['req_id'])
    Result_req=list(set(req))
    raw_id=[]
    for i in raw_requirments:
        req_id=i['req_id']
        parts = req_id.rsplit(" ", 1)  # split into ["REQ-34545", "A"]
        req_id1 = parts[0] + parts[1]
        raw_id.append(req_id1)
    raw_id=list(set(raw_id))        
    missing_in_list2 = [item for item in raw_id if item not in Result_req]
    coverage=len(Result_req)/len(raw_id)*100
    if coverage==100.00:
        return coverage," "
    else:
        return coverage,missing_in_list2

In [14]:
result = []

# Iterate over each use case
for t in usecase:
    usecase_name=t['usecase']
    req_ids=t['req_ids']
    usecase_inputs = []
    usecase_outputs = []
    diversity=[]

    # # Collect inputs/outputs from each req_id
    for req_id in req_ids:
        for req in requirements:
            if req['req_id']==req_id:
                usecase_inputs.extend(req.get("inputs", []))
                # usecase_outputs.extend(req.get("outputs", []))
        parts = req_id.rsplit(" ", 1)  # split into ["REQ-34545", "A"]
        req_id1 = parts[0] + "  " + parts[1]
        for raw_req in raw_requirments:
            if raw_req['req_id']==req_id1:
                diversity.append({"req_id":req_id,"Content":raw_req['content'],"DiversityExpression":raw_req['diversi']})
    
    if req_ids:
        last_ouput=req_ids[-1]
        for req in requirements:
            if req['req_id']==last_ouput:
                usecase_outputs.extend(req.get("outputs", []))
    else:
        usecase_outputs=""
            
            

    # # Remove duplicates
    usecase_inputs = list(set(usecase_inputs))
    usecase_outputs = list(set(usecase_outputs))
    matches1 = list(set(flow) & set(usecase_inputs))
    matches2 = list(set(flow) & set(usecase_outputs))
    
    
    inputs=[]
    for i in usecase_inputs:
        var=get_External_internal(i)
        inputs.append(var)
    outputs=[]
    for j in  usecase_outputs:
        var=get_External_internal(j)
        outputs.append(var)
         
    

    # # Build usecase object
    usecase_obj = {
        "usecase": usecase_name,
        # "req_ids": req_ids,
        "inputs": inputs,
        "outputs": outputs,
        "PLM Parameters":matches1+matches2,
        "Caliberation Parameters":"",
        "Req_ids":diversity
    }

    result.append(usecase_obj)
    



In [15]:
coverage,missing_req_ids=get_coverage_req_ids(result,raw_requirments)
result.append({"Coverage":coverage,"Ungrouped_reqids":missing_req_ids})


In [16]:
with open("usecases_fina_test1.json", "w") as f:
    json.dump(result, f, indent=4)