In [1]:
import torch 

torch.set_default_device("cuda")

In [2]:
!pip install scikit-learn

Defaulting to user installation because normal site-packages is not writeable


In [3]:
import sklearn

Обрабатываем SSTS файлы:

In [4]:
from docx2python import docx2python
import re

def parse_docx_text_SSTS(file_path):
    docx_content = docx2python(file_path)
    text_content = docx_content.text

    sections = {}
    section_index = None
    current_section = []
    pre_section_content = []

    for line in text_content.split('\n'):
        line = line.strip()
        if not line:
            continue

        pattern = r'\((?:[^()]*[&|][^()]*)+\)'
        pattern2 = r'\((?:[a-zA-Z](?:[|&][a-zA-Z])*)\)'
        match = re.search(pattern2, line)
        if match:
            if current_section and section_index is not None:
                sections[section_index] = ' '.join(current_section)
            elif not section_index:
                # If no section has been started yet, store pre-section content
                sections["pre_section"] = ' '.join(pre_section_content)
            section_index = line  # Use the entire line as the section index
            current_section = [line]
        else:
            if section_index is None:
                pre_section_content.append(line)
            else:
                current_section.append(line)

    if current_section and section_index is not None:
        sections[f"{section_index}."] = ' '.join(current_section)

    return sections


Заменяем алгебру логики: 


In [5]:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

device = "cuda" # the device to load the model onto

model_name = "Qwen/Qwen2.5-14B-Instruct"
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="cuda",
    cache_dir="/home/dev/llm_weights"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)

Loading checkpoint shards:   0%|          | 0/8 [00:00<?, ?it/s]

In [6]:
def generate(prompt, system_prompt=None, max_new_tokens=1):
    messages = [
        {"role": "system", "content": "You are a helpful assistant." if system_prompt is None else system_prompt},
        {"role": "user", "content": prompt}
    ]
    text = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )
    model_inputs = tokenizer([text], return_tensors="pt").to(device)

    generated_ids = model.generate(
        model_inputs.input_ids,
        max_new_tokens=max_new_tokens,
        temperature=0.00000001,
    )
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]

    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

    return response


def translate_logical_expression(expression):
    prompt = f"""EXAMPLES: 
         TASK: Convert (a|b) into natural english. RESPONSE: a or b should be true.
         TASK: Convert (a&b) into natural english. RESPONSE: a and b both should be true.
         TASK: Convert ((a&c)|(b&d)) into natural english. RESPONSE: both a and c should be true OR both b and d should be true.
         
         TASK: Convert the logical expression {expression} into natural English. Provide only the translation in a single clear sentence. RESPONSE:"""
    return generate(prompt, max_new_tokens=64)


import re

def replace_boolean_algebra_with_translation(text):
    # Regex pattern to match boolean algebra expressions
    pattern = r'\((?:[^()]*[&|][^()]*)+\)' 
    pattern2 = r'\((?:[a-zA-Z](?:[|&][a-zA-Z])*)\)'

    # Split the text into lines
    lines = text.split('\n')
    
    for i, line in enumerate(lines):
        # Find all boolean algebra expressions in the line
        matches = re.findall(pattern2, line)
        for match in matches:
            # Translate each expression and replace it in the line
            translation = translate_logical_expression(match)
            translation = "(" + translation + ")"
            line = line.replace(match, translation)
        lines[i] = line
    
    # Join the lines back into a single string
    return '\n'.join(lines)


Раскидываем текст по разным заголовкам

In [7]:
def structure_text_hmi(text):
    structured_text = {
        "preconditions": [],
        "main_scenario": [],
        "postconditions": []
    }

    current_section = None

    import string

    scenario_counter = 0
    for line in text.split("\n"):
        line = line.strip()
        if not line:
            continue
        from fuzzywuzzy import fuzz

        if fuzz.partial_ratio("Preconditions", line) > 95:
            current_section = "preconditions"
        elif fuzz.partial_ratio("Main Scenario", line) > 95:
            current_section = "main_scenario"
        elif fuzz.partial_ratio("Postconditions", line) > 95:
            current_section = "postconditions"
        elif fuzz.partial_ratio("Alternative Scenario", line) > 95:
            scenario_counter += 1
            current_section = f"alternative_scenario_{string.ascii_uppercase[scenario_counter - 1]}"
            structured_text[current_section] = []  # Start a new list for each alternative scenario
        elif current_section:
            structured_text[current_section].append(line.strip())

    return structured_text

def structure_text_to_dict_hmi(text_content):
    structured_text = structure_text_hmi(text_content)
    section_dict = {}

    
    for section, lines in structured_text.items():
        if section == "alternative_scenarios":
            section_text = []
            for scenario in lines:
                scenario_text = "\n".join(scenario)
                section_text.append(scenario_text)
            section_dict[section] = section_text
        else:
            section_text = "\n".join(lines)
            section_dict[section] = section_text

    return section_dict

Пример использования:

In [8]:
class Fact:
    def __init__(self, text, doc_name, section_name):
        self.text = text  # текст утверждения
        self.doc_name = doc_name  # название дока
        self.section_name = section_name  # название секции внутри которой находится факт
    
    def __repr__(self):
        return f"Text: {self.text}, DocName: {self.doc_name}, SectionName: {self.section_name}"


def process_docx_file_SSTS(file_path):
    docx_content = docx2python(file_path)
    text_content = docx_content.text
    first_line = text_content.split('\n', 1)[0]

    sections = parse_docx_text_SSTS(file_path)
    facts = []
    for section, content in sections.items():
        translated_content = replace_boolean_algebra_with_translation(content)
        fact = Fact(text=translated_content, doc_name=first_line, section_name=section)
        facts.append(fact)
    return facts


def process_file_hmi(file_path):
    docx_content = docx2python(file_path)
    text_content = docx_content.text
    first_line = text_content.split('\n', 1)[0]

    sections_dict = structure_text_to_dict_hmi(text_content)
    facts = []
    
    for section, text in sections_dict.items():
        translated_text = replace_boolean_algebra_with_translation(text)
        fact = Fact(text=translated_text, doc_name=first_line, section_name=section)
        facts.append(fact)
    
    return facts

In [9]:
def tell_about_fact(fact: Fact, is_rule=True):
    l = "A" if is_rule else "B"
    return f"Statement {l} is taken from document with name {fact.doc_name}, section with name {fact.section_name}. Statement {l} text: {fact.text}"

In [10]:
hmi_prefix = "/home/dev/case_data/train Атом/train data/HMI/UC-"
ssts_prefix = "/home/dev/case_data/train Атом/train data/SSTS/SSTS-"

doc_id = 28561

hmi_file = f"{hmi_prefix}{doc_id}.docx"
ssts_file = f"{ssts_prefix}{doc_id}.docx"

rules_data = process_file_hmi(hmi_file)
impl_data = process_docx_file_SSTS(ssts_file)

fewshot_examples = []

fewshot_examples.append([tell_about_fact(rules_data[0]), tell_about_fact(impl_data[0]), "No"])

fewshot_examples.append([tell_about_fact(rules_data[0]), tell_about_fact(impl_data[2]), "Yes"])


The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


In [11]:
def build_prompt(factA: Fact, factB: Fact):
    system_prompt = "You are a compliance analyst tasked with verifying whether an implementation complies with a given regulation."
    user_prompt = f"""
        Logical statement A about car component:
        {factA}

        Logical statement B about car component:
        {factB}

        If you think that dismatch between A and B can lead to some unpleasant accidents, for example car crash, print "Yes"
        Otherwise print "No"

        Write only one word "Yes" or "No"
        Answer: 
    """
    return system_prompt, user_prompt


def build_prompt_description(factA: Fact, factB: Fact):
    system_prompt = "You are a compliance analyst tasked with verifying whether an implementation complies with a given regulation."
    user_prompt = f"""
        You will be provided with two statements:

        - Statement A (Regulation): Contains information from a regulation, policy, or specification outlining conditions and requirements.

        - Statement B (Implementation): Contains a description of an implementation that may or may not comply with the regulation specified in Statement A.


        Determine whether there are any discrepancies or inconsistencies between the regulation and the implementation.

        Also try to make your answer concise, write only the main diff and do not any other information

        Statement A text:
        {factA.text}

        Statement B text:
        {factB.text}

        Your concise discrepancy: 
    """
    return system_prompt, user_prompt

In [12]:
def get_facts(doc):
        content = docx2python(doc).text
        splitting_prompt = f"""You will see a document with list of rules and instructions. I need you to split into many little logical fragments. 
            Basically, I want you to give me a list of facts and list of keywords from a whole document. Output should be like:
            1) <fact 1>
            2) <fact 2>
            3) <fact 3>
            etc...
            Write numbers and facts and only them
            My document to split: {content}
        """
        res = [x for x in generate(splitting_prompt, max_new_tokens=1000).split("\n")]
        return res

def find_disrepancies(rule_doc, impl_doc, calc_facts=False):

    if calc_facts:
        impl_data = get_facts(impl_doc)
        rules_data = get_facts(rule_doc)
        
    else:
        rules_data = process_file_hmi(rule_doc)
        impl_data = process_docx_file_SSTS(impl_doc)

    print("RULES LIST")

    for rule in rules_data:
        print(rule)
    
    print("IMPL LIST")

    for impl in impl_data:
        print(impl)
    
    res = []
    for rule in rules_data:
        for impl in impl_data:
            system_prompt, user_prompt = build_prompt(rule, impl)
            ans = generate(user_prompt, max_new_tokens=1)
            print(ans.lower())
            if ans.lower() == "yes":
                system_prompt_descr, user_prompt_descr = build_prompt_description(rule, impl)
                descr = generate(user_prompt_descr, system_prompt_descr, max_new_tokens=100).strip().strip("\n")
                # system_prompt_severity, user_prompt_severity = build_prompt_severeness(descr)
                # severe = generate(user_prompt_severity, system_prompt_severity, max_new_tokens=1)

                res.append(
                    
                    impl
                )

    return res

In [13]:
res = find_disrepancies(hmi_file, ssts_file, calc_facts=True)

RULES LIST
1) Users can modify the name and password of the vehicle hotspot according to their needs.
2) The use-case title is "Setting Hotspot name & password".
3) The scope of the use-case is SWP.
4) There is no trigger for this use-case.
5) The actors involved are Driver and Owner.
6) The precondition is that the IVI system must start up on SWP Android.
7) In the main scenario, the user navigates to the hotspot page on SWP Android.
8) The user clicks on the hotspot name or password to go to the modification page.
9) The user modifies the name or password and clicks 'save'.
10) The postcondition is that the hotspot name or password will be modified successfully.
11) The requirements state that the hotspot name and password will be saved in the vehicle profile.
12) After changing the password, previously connected devices need to enter the new password to reconnect.
IMPL LIST
1) Users can modify the name and password of the vehicle hotspot when the vehicle is in a stopped state.
2) En

In [14]:
res

[]

In [22]:
# impl_data = get_facts(hmi_file)
# rules_data = get_facts(ssts_file)



In [15]:
print(impl_data[:50])

[Text: hotspot settings Functional Description Users can modify the name and password of the vehicle hotspot according to their needs. This function can only be used when the vehicle is in a stopped state., DocName: hotspot settings, SectionName: pre_section, Text: Enabling conditions (a, b, and c all should be true.): IVI system startup; User can operate SWP; IVI hotspot is enabled., DocName: hotspot settings, SectionName: Enabling conditions (a&b&c):, Text: Trigger conditions (a should be true.): Users set the hotspot name and password in SWP, and IVI_IFT sends vehicle/{VIN}/hardware/interfaces/hotspotNameCommand and vehicle/{VIN}/hardware/interfaces/hotspotPasswordCommand to SGW. Users set hotspot name and password on the SWP, IVI_IFT send vehicle/{VIN}/hardware/interfaces/hotspotNameCommand and vehicle/{VIN}/hardware/interfaces/hotspotPasswordCommand to SGW., DocName: hotspot settings, SectionName: Trigger conditions (a):, Text: Execution output (a, b, c, and d should all be true.)

In [18]:
print(rules_data[:50])

[Text: , DocName: [I-28561]  Setting Hotspot name & password, SectionName: preconditions, Text: 1)		The user guides to the hotspot page out_2. SWP Android.
2)		Click in_2. SWP Android the hotspot name or password to enter the modification page.
3)		The user modifies the name or password and clicks 'save' in_2. SWP Android.
Postcondition:
1)		The hotspot name or password will be modified successfully.
Requirements:
1)		The hotspot name and password will be saved in the vehicle profile.
2)		After changing the password, the previous device needs to enter the new password to reconnect., DocName: [I-28561]  Setting Hotspot name & password, SectionName: main_scenario, Text: , DocName: [I-28561]  Setting Hotspot name & password, SectionName: postconditions]


In [None]:
# Decode and print the response
response = generate(prompt)
print(response)

In [23]:
import timeit

hmi_prefix = "/home/dev/case_data/train Атом/train data/HMI/UC-"
ssts_prefix = "/home/dev/case_data/train Атом/train data/SSTS/SSTS-"

doc_ids = [
        6583,
        # 8604,
        8692,
        # 8800,
        # 11467,
        # 25957,
        # 26160,
        # 26161,
        # 26771,
        # 28561,
        30371,
        # 31523
    ]

answer = []
total_time = 0

for doc_id in doc_ids:
    
    hmi_file = f"{hmi_prefix}{doc_id}.docx"
    ssts_file = f"{ssts_prefix}{doc_id}.docx"
    impl_data = get_facts(hmi_file)[:50]
    rules_data = get_facts(ssts_file)[:50]
    prompt = f"""
Evaluate the compliance of the document with the requirements using the rating system below:

**Compliance Categories:**
- FC (Fully Compliant): Perfect! Nothing can be improved.
- LC (Largely Compliant): Generally correct. Some improvements may be needed (described in comments). No need for review.
- PC (Partially Compliant): Major deviations. Improvements needed (described in comments). After improvement, review is required.
- NC (Non-Compliant): Not compliant. Needs to be re-done and re-reviewed. Directions for update shown in comments.
- NA (Not Applicable): Not applicable. Reason for non-applicability is described in comments.

### Instructions:
1. Review the document content in "File" and compare it to the "REQUIREMENTS" provided.
2. Based on this comparison, assign the appropriate two-letter compliance code (e.g., FC, LC, PC, NC, or NA).

### Notice (few-shot):

LC is returned if the notifications for battery status are slightly delayed, if the sound alerts for seatbelt reminders are missing, if the user interface occasionally lags for less than 1 second, if some minor glitches occur in the navigation system.

PC is returned if the regenerative braking system fails to engage properly at lower speeds, affecting braking consistency, if the navigation system misinterprets an address occasionally,

NC is returned if the emergency braking system fails to activate when necessary, posing a direct safety risk, if the battery management system reports inaccurate charge levels, function can only be used when the vehicle is in a stopped state.

NA is returned if the test involves an audio system, if IVI_IFT sends a video stream to SWP to notify the user

### Data:
File: {impl_data}
REQUIREMENTS: {rules_data}

**Output Format**: Return ONLY ONE two-letter compliance code (e.g., FC, LC, PC, NC, or NA) as the final compliance rating. START YOUR ANSWER WITH THE CODE:
"""

    # Measure the time taken to generate the response
    start_time = timeit.default_timer()
    response = generate(prompt)
    elapsed_time = timeit.default_timer() - start_time
    total_time += elapsed_time
    print(f"Time taken for doc_id {doc_id}: {elapsed_time:.4f} seconds")

    answer.append(response)

average_time = total_time / len(doc_ids)
print(f"Average time taken per document: {average_time:.4f} seconds")

Time taken for doc_id 6583: 3.7890 seconds
Time taken for doc_id 8692: 3.3809 seconds
Time taken for doc_id 30371: 3.0284 seconds
Average time taken per document: 3.3994 seconds


In [24]:
print(answer)

['PC', 'PC', 'LC']
