In [12]:
pip install groq -q

In [13]:
import requests
import json
import time
import pandas as pd
from collections import deque
import datetime
import sys
import re

In [None]:
from google.colab import userdata

api_key = "***"


## function for extract Sections

In [15]:
def extract_sections(text, sections_to_extract):
    extracted_sections = {}
    current_section = None
    lines = text.split('\n')

    for line in lines:
        for section in sections_to_extract:
            # (# Section) and (**Section:**)
            pattern = r'^\s*((?:#\s+)|(?:\*\*))' + re.escape(section) + r'\s*[:]?(\*\*?|$)'
            if re.match(pattern, line, re.IGNORECASE):
                current_section = section
                extracted_sections[current_section] = []
                break
        if current_section and line.strip() != '':
            if not re.match(r'^\s*((?:#\s+)|(?:\*\*))', line):
                extracted_sections[current_section].append(line.strip())

    return extracted_sections

In [16]:
import time
from collections import deque

class RateLimiter:
    """Manages rate limits for the Capgemini API"""

    def __init__(self, requests_per_minute=60, window_size=60):
        """
        Initializes the RateLimiter with a specified number of requests per window.

        :param requests_per_minute: The maximum number of requests allowed per window.
                                     Default is set to 60 based on a common assumption.
                                     Adjust this value based on Capgemini's actual rate limits.
        :param window_size: The size of the rate limiting window in seconds. Default is 60 seconds.
        """
        self.requests_per_window = requests_per_minute
        self.window_size = window_size
        self.request_timestamps = deque(maxlen=self.requests_per_window)

    def wait_if_needed(self):
        """
        Waits if necessary to respect the rate limit.
        This method should be called before making a request to the API.
        """
        current_time = time.time()

        # Remove timestamps that are outside the current window
        while self.request_timestamps and current_time - self.request_timestamps[0] > self.window_size:
            self.request_timestamps.popleft()

        if len(self.request_timestamps) >= self.requests_per_window:
            # Calculate the time until the next window
            time_since_oldest = current_time - self.request_timestamps[0]
            wait_time = self.window_size - time_since_oldest + 0.1  # Adding 0.1 seconds to ensure the wait is sufficient
            print(f"Rate limit reached, waiting for {wait_time:.2f} seconds...")
            time.sleep(wait_time)

        # Add the current timestamp to the deque
        self.request_timestamps.append(current_time)

    def handle_rate_limit_error(self, error_message):
        """
        Handles rate limit errors based on the error message from the API.

        :param error_message: The error message received from the API.
        """
        if "rate limit" in error_message.lower():
            # Implement logic to handle rate limit errors, such as waiting and retrying
            print("Rate limit error encountered. Implementing backoff strategy...")
            # Example: Wait for the remaining time in the current window
            if self.request_timestamps:
                oldest_request_time = self.request_timestamps[0]
                time_since_oldest = time.time() - oldest_request_time
                wait_time = self.window_size - time_since_oldest + 0.1
                print(f"Waiting for {wait_time:.2f} seconds before retrying...")
                time.sleep(wait_time)
        else:
            # Handle other types of errors
            print(f"Error: {error_message}")

    def update_rate_limit(self, new_rate, new_window=None):
        """
        Updates the rate limit and window size dynamically.

        :param new_rate: The new number of requests allowed per window.
        :param new_window: The new window size in seconds. If None, the current window size is retained.
        """
        self.requests_per_window = new_rate
        if new_window:
            self.window_size = new_window
        self.request_timestamps = deque(maxlen=self.requests_per_window)
        print(f"Rate limit updated to {self.requests_per_window} requests per {self.window_size} seconds.")

## Function of Generating Use Case Diagram

In [24]:
def generate_usecase_diagram(description_dict, rate_limiter):
    url = "https://api.generative.engine.capgemini.com/POST/v2/llm/invoke"


    prompt = f"""You are an expert in UML modeling, specializing in sequence diagrams for automotive systems, particularly ADAS (Advanced Driver Assistance Systems).

    TASK:
    1. Carefully analyze the technical description below
    2. Extract all necessary elements for a complete UML use case diagram:
       - Actors (driver, other road users, external systems)
       - Use cases (main features of the system)
       - Relationships between actors and use cases (associations)
       - Relationships between use cases (include, extend, generalization)
       - System boundaries (system boundary)
    3. Generate valid and complete PlantUML code that accurately represents the system's use cases

    TECHNICAL DESCRIPTION:

    RESPONSE FORMAT:

    1. Extracted structure:
       - Complete list of identified actors
       - List of identified use cases
       - Identified relationships between actors and use cases
       - Identified relationships between use cases
    2. Valid and complete PlantUML code for a use case diagram (including @startuml and @enduml)
    3. Self-assessment: indicate your confidence level in the generated diagram (high/medium/low) and the assumptions you had to make
    """


    payload = {
        "model": "mistral.mistral-large-2402-v1:0",
        "messages": [{"role": "user", "content": prompt}],
        "temperature": 0.2,
        "max_tokens": 2048
    }

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}"
    }

    rate_limiter.wait_if_needed()

    max_retries = 3
    for attempt in range(max_retries):
        try:
            response = requests.post(url, headers=headers, data=json.dumps(payload))
            response.raise_for_status()

            print(f"Status code: {response.status_code}")
            response_json = response.json()

            if "choices" in response_json and len(response_json["choices"]) > 0:
                uml_code = response_json["choices"][0]["message"]["content"]
                return uml_code
            else:
                print(f"Unusual response: {response_json}")
                if "error" in response_json:
                    print(f"Error: {response_json['error']}")
                    rate_limiter.handle_rate_limit_error(response_json['error'].get('message', ''))
                return f"UML generation error: {response_json}"

        except requests.exceptions.RequestException as e:
            print(f"Attempt {attempt+1} failed: {e}")
            if attempt < max_retries - 1:
                if "403" in str(e):
                    print("Authentication failed. Please check your API key.")
                    break
                elif "429" in str(e):
                    wait_time = 60 + 2 ** attempt
                    print(f"Rate limit error. Waiting for {wait_time} seconds...")
                    time.sleep(wait_time)
                else:
                    wait_time = 2 ** attempt
                    print(f"Retrying in {wait_time} seconds...")
                    rate_limiter.wait_if_needed()
            else:
                return f"Failed after {max_retries} attempts: {str(e)}"

## Process The Output

In [25]:
def process_csv_batch(file_path, batch_size=20, start_from=0):
    """Processes the CSV in batches without creating a progression file"""

    try:
        df = pd.read_csv(file_path)
        total_rows = len(df)

        print(f"File loaded successfully: {total_rows} entries found")
        rate_limiter = RateLimiter(requests_per_minute=25)
        start_idx = start_from
        end_idx = min(start_idx + batch_size, total_rows)
        print(f"Processing lines {start_idx} to {end_idx-1}")
        results = []
        output_file = f"uml_usecase_diagrams.csv"

        for index in range(start_idx, end_idx):
            row = df.iloc[index]
            try:
                print(f"Processing line {index}...")
                if 'Combined_Paragraph' not in row:
                    print(f"Error: 'Combined_Paragraph' column not found in line {index}")
                    print(f"Available columns: {row.index.tolist()}")
                    continue

                description = row['Combined_Paragraph']
                if pd.isna(description) or not description.strip():
                    print(f"Line {index}: empty or invalid description, skipped")
                    continue

                sections = extract_sections(description, sections_to_extract=[
                    "Technical Details",
                    "Operational Steps",
                    "Interactions with Components and Systems",
                    "User Interface and Controls",
                    "Fault Detection and Management",
                    "Regulatory Compliance",
                    "Performance Metrics"
                ])

                structured_description = ""
                for section, content in sections.items():
                    structured_description += f"# {section}\n"
                    for line in content:
                        structured_description += f"{line}\n"
                    structured_description += "\n"

                uml_content = generate_usecase_diagram(sections, rate_limiter)
                uml_structure = ""
                uml_code = ""

                if "```" in uml_content:
                    parts = uml_content.split("```")
                    uml_structure = parts[0].strip()
                    if len(parts) > 2:
                        uml_code = parts[1].strip()
                else:
                    uml_structure = uml_content

                result_dict = {
                    'index': index,
                    'description': structured_description.strip(),
                    'uml_code': uml_code
                }

                if 'ID' in row:
                    result_dict['ID'] = row['ID']

                results.append(result_dict)
                if index % 5 == 0 or index == end_idx - 1:
                    temp_df = pd.DataFrame(results)
                    temp_df.to_csv(output_file, index=False)
                    print(f"Progress saved: {len(results)}/{end_idx-start_idx} lines processed")

            except Exception as e:
                print(f"Error processing line {index}: {str(e)}")
                with open("errors_processing.log", "a") as log:
                    log.write(f"{datetime.datetime.now()} - Error line {index}: {str(e)}\n")
                continue

        if results:
            output_df = pd.DataFrame(results)
            output_df.to_csv(output_file, index=False)
            print(f"Results saved in '{output_file}'")
        else:
            print("No results to save")

        return end_idx, total_rows

    except Exception as e:
        print(f"Critical error processing the CSV: {str(e)}")
        return start_from, 0

In [26]:
if __name__ == "__main__":
    FILE_PATH = '/content/adas_systems_step-02.csv'
    BATCH_SIZE = 3
    api_key = "OpDgCFH07T4waIo2R1WzV1iQOkMT28a72n4h3RdC"  # Replace with your actual API key

    start_from = 0
    if len(sys.argv) > 1:
        try:
            start_from = int(sys.argv[1])
        except ValueError:
            print(f"Invalid argument for starting point: {sys.argv[1]}, using 0 by default.")

    rate_limiter = RateLimiter(requests_per_minute=60, window_size=60)

    current_position, total = process_csv_batch(FILE_PATH, BATCH_SIZE, start_from)

    if total > 0:
        print(f"\nProcessing completed: {current_position}/{total} entries ({current_position/total*100:.1f}%)")

        if current_position < total:
            print(f"\nTo continue processing from the next entry, run:")
            print(f"python {sys.argv[0]} {current_position}")

Invalid argument for starting point: -f, using 0 by default.
File loaded successfully: 10 entries found
Processing lines 0 to 2
Processing line 0...
Attempt 1 failed: 403 Client Error: Forbidden for url: https://api.generative.engine.capgemini.com/POST/v2/llm/invoke
Authentication failed. Please check your API key.
Error processing line 0: argument of type 'NoneType' is not iterable
Processing line 1...
Attempt 1 failed: 403 Client Error: Forbidden for url: https://api.generative.engine.capgemini.com/POST/v2/llm/invoke
Authentication failed. Please check your API key.
Error processing line 1: argument of type 'NoneType' is not iterable
Processing line 2...
Attempt 1 failed: 403 Client Error: Forbidden for url: https://api.generative.engine.capgemini.com/POST/v2/llm/invoke
Authentication failed. Please check your API key.
Error processing line 2: argument of type 'NoneType' is not iterable
No results to save

Processing completed: 3/10 entries (30.0%)

To continue processing from the ne