In [None]:
import collections

import requests
import os
from dotenv import load_dotenv
from pathlib import Path
import time
import datetime
from openai import OpenAI
import logging
import pprint
import datetime
import csv
import collections
import concurrent.futures
import openai

env_path = Path("..") / ".env"
load_dotenv(dotenv_path=env_path)
TESTRAIL_BASE_URL = os.getenv("TESTRAIL_BASE_URL")
TESTRAIL_USERNAME = os.getenv("TESTRAIL_USERNAME")
TESTRAIL_API_KEY = os.getenv("TESTRAIL_API_KEY")
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
MODEL='gpt-4o-mini'

### log debug to file

In [None]:
now = datetime.datetime.now(datetime.UTC)
formated_date = now.strftime("%Y%m%d")
logging.basicConfig(
    level=logging.DEBUG,  # Log level
    format='%(asctime)s [%(levelname)s] %(message)s',  # Log format
    filename='logs/{formated_date}-app.log',  # Log file name
    filemode='a'  # 'a' for append mode, 'w' to overwrite
)

###  helper to convert datestrings

In [None]:
def get_unix_time(date_str: str="01/01/2023"):
    """
    convert a mm/dd/yyyy format to unix time
    Args:
        date_str: mm/dd/yyyy format

    Returns: seconds since epoch
    """
    date_obj = datetime.datetime.strptime(date_str, "%m/%d/%Y")
    return int(time.mktime(date_obj.timetuple()))

def get_human_time(seconds: str) -> str:
    """
    takes in a string of unix seconds (generally an int, but we are converting here)
    Args:
        seconds:

    Returns:

    """
    date = datetime.datetime.fromtimestamp(seconds)
    return date.strftime("%m/%d/%Y")

### updated functions, a helper or two maybe

In [None]:
def validate_env() -> bool:
    """
    validate environment variables
    Returns: boolean
    """
    if not TESTRAIL_BASE_URL or not TESTRAIL_USERNAME or not TESTRAIL_API_KEY:
        print("Error: One or more environment variables are missing. Check your .env file.")
        return False
    else:
        return True

In [None]:
def fetch_projects(limit=250) -> list:
    """
    fetch testrails projects
    Returns: json/dict of projects
    """

    # Construct the API URL
    try:
        # Make the API request
        url = f"{TESTRAIL_BASE_URL}/index.php?/api/v2/get_projects&limit={limit}"
        response = requests.get(url, auth=(TESTRAIL_USERNAME, TESTRAIL_API_KEY))
        return response.json()['projects']
    except requests.exceptions.RequestException as e:
        print(f"Error connecting to TestRail API: {e}")


In [None]:
def fetch_suites(project_id) -> list:
    """
    fetch testrails suites
    Returns: list of suites
    """
    try:
        # Make the API request
        url = f"{TESTRAIL_BASE_URL}/index.php?/api/v2/get_suites/{project_id}"
        response = requests.get(url, auth=(TESTRAIL_USERNAME, TESTRAIL_API_KEY))
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error connecting to TestRail API: {e}")

In [None]:
def fetch_test_cases(project_id, suite_id, date="01/01/2022", limit=250, wait=0) -> list:
    """
    fetch testrails test cases
    Args:
        project_id:
        suite_id:
        date: optional, mm/dd/yyyy format. default 01/01/2022
        limit: optional, default 250, set to whatever number we want to pull at once
        wait: optional, number of seconds to wait between requests

    Returns:

    """

    if not TESTRAIL_USERNAME or not TESTRAIL_API_KEY or not TESTRAIL_API_KEY:
        raise Exception("Error: Missing TestRail configuration in environment variables.")

    date_filter = "created_after" # alternatey we could try "updated_after"
    url = f"{TESTRAIL_BASE_URL}/index.php?/api/v2/get_cases/{project_id}&suite_id={suite_id}&{date_filter}={get_unix_time(date)}&limit={limit}"
    output = []
    while url is not None:
        print(url)
        try:
            response = requests.get(url, auth=(TESTRAIL_USERNAME, TESTRAIL_API_KEY))
            payload = response.json()
            output.extend(payload['cases'])
            next = payload['_links']['next']
            if next:
                url = f"{TESTRAIL_BASE_URL}/index.php?{next}"
            else :
                url = None
            if wait > 0:
                time.sleep(wait)
        except Exception as e:
            print(e)
            pass
    return output


In [None]:
cases = []
if validate_env():
    projects = fetch_projects()
    project_ids = [project['id'] for project in projects]
    for pid in project_ids:
        suite_ids = [_['id'] for _ in fetch_suites(pid)]
        for sid in suite_ids:
            c = fetch_test_cases(pid, sid, date="06/01/2024")
            for case in c:
                case['project_id'] = pid
                case['suite_id'] = sid
            cases.extend(c)

## we now have all our test cases in a list.

In [None]:
pprint.pprint(cases[0])  # just to see what the output looks like
print()
print(get_human_time(cases[0]['created_on']))
print(get_human_time(cases[0]['updated_on']))


In [None]:
print(len(cases))

In [None]:
# This is just the same thing from evaluate with the model as an optional argument
def evaluate_test_case(test_case):
    """
    Sends a test case to OpenAI for evaluation based on QA best practices.

    Parameters:
        test_case (dict): A dictionary containing the test case details.

    Returns:
        str: OpenAI's evaluation of the test case, including a grade and recommendations.

    Raises:
        Exception: If the OpenAI API request fails.
    """

    # Safely extract fields with fallbacks
    description = test_case.get('description') or test_case.get('custom_testcase_description', 'N/A')
    preconditions = test_case.get('preconditions') or test_case.get('custom_preconds', 'N/A')

    # Handle 'steps'
    steps_separated = test_case.get('custom_steps_separated', [])
    if isinstance(steps_separated, list):
        steps = "\n".join([step.get('content', '') for step in steps_separated])
    else:
        steps = test_case.get('custom_steps', 'N/A')

    expected_result = test_case.get('expected_result') or test_case.get('custom_expected', 'N/A')
    created_by = test_case.get('created_by', 'Unknown Creator')

    # Build the prompt from the test case details
    prompt = (
        f"Evaluate the following test case based on QA best practices."
        f"Grade the test case quality with one of the following categories: "
        f"'No improvement needed,' 'Needs slight improvement,' or 'Needs major improvement.' "
        f"Additionally, review the test case for adherence to US mental healthcare standards and best practices, "
        f"and identify any gaps or improvements that would ensure the system meets compliance, user needs, and clinical effectiveness:\n\n"
    f"ID: {test_case.get('id', 'N/A')}\n"
    f"Title: {test_case.get('title', 'N/A')}\n"
    f"Description: {description}\n"
    f"Preconditions: {preconditions}\n"
    f"Steps: {steps}\n"
    f"Expected Result: {expected_result}\n"
    f"Created By: {created_by}\n"
    )

    try:
        # Create a chat completion
        completion = client.chat.completions.create(
            model=MODEL,
            messages=[
                {"role": "system","content": "You are an expert in software testing, quality assurance, and electronic medical records (EMRs), especially in mental healthcare systems. Your goal is to evaluate test cases not only for QA best practices but also for their adherence to mental healthcare standards, compliance requirements, and clinical usability in an EMR system."},
                {"role": "user", "content": prompt}
            ],
        )
        # Assuming 'response' is the variable holding the API response
        message_content = completion.choices[0].message.content
        return(message_content)
        # return(completion)

    except openai.APIError as e:
        # Handle API errors (e.g., invalid requests or server errors)
        print(f"OpenAI API returned an API Error: {e}")
        return f"Error: API Error - {e}"
    except openai.APIConnectionError as e:
        # Handle connection errors (e.g., network issues)
        print(f"Failed to connect to OpenAI API: {e}")
        return f"Error: Connection Error - {e}"
    except openai.RateLimitError as e:
        # Handle rate limit errors (e.g., too many requests)
        print(f"OpenAI API request exceeded rate limit: {e}")
        return f"Error: Rate Limit Exceeded - {e}"
    except Exception as e:
        # Handle unexpected exceptions
        print(f"An unexpected error occurred: {e}")
        return f"Error: Unexpected Error - {e}"

In [None]:
def process_case(case_with_index):
    """
    Processes a single case and returns its evaluation with project/suite information
    """
    idx, total, case = case_with_index
    print(f"{idx}/{total} - {case['project_id']}:{case['suite_id']}:{case['id']}: {case['title']}")

    eval_result = evaluate_test_case(case)

    # Extract steps and expected results exactly as in your sequential code
    steps = []
    expected_results = []
    if case['custom_steps_separated'] is not None:
        steps = [step['content'] for step in case['custom_steps_separated'] if 'content' in step.keys()]
        expected_results = [step['expected'] for step in case['custom_steps_separated'] if 'expected' in step.keys()]

    if case['title'] is None:
        case['title'] = f"BAD TESTCASE: No Title"

    if case['custom_testcase_description'] is None:
        case['custom_testcase_description'] = f"BAD TESTCASE: No Description"

    if case['custom_preconds'] is None:
        case['custom_preconds'] = f"BAD TESTCASE: No Preconditions"

    return {
        'project_id': case['project_id'],
        'suite_id': case['suite_id'],
        'evaluation_data': {
            'id': case['id'],
            'title': case['title'],
            'description': case['custom_testcase_description'],
            'steps': steps,
            'expected_result': expected_results,
            'preconditions': case['custom_preconds'],
            'created_by': case['created_by'],
            'created_on': get_human_time(case['created_on']),
            'evaluation': eval_result
        }
    }

def parallel_evaluate_cases(cases, max_workers=10):
    """
    Evaluates test cases in parallel while maintaining the same output structure
    """
    evaluations = collections.defaultdict(dict)
    total = len(cases)

    # Prepare cases with their indices for progress tracking
    cases_with_index = [(i+1, total, case) for i, case in enumerate(cases)]

    # Process cases in parallel
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Using list() to force immediate execution and show progress
        results = list(executor.map(process_case, cases_with_index))

    # Organize results into the same structure as your sequential code
    for result in results:
        pid = result['project_id']
        sid = result['suite_id']

        if sid not in evaluations[pid]:
            evaluations[pid][sid] = []

        evaluations[pid][sid].append(result['evaluation_data'])

    return dict(evaluations)

In [None]:
evaluations = parallel_evaluate_cases(cases, 100)

import concurrent.futures

evaluations = collections.defaultdict(dict)
count = 0
for c in cases:
    count += 1
    print(f"{count}/{len(cases)} - {c['project_id']}:{c['suite_id']}:{c['id']}: {c['title']}")
    eval = evaluate_test_case(c)

    steps = []
    expected_results = []
    if c['custom_steps_separated'] is not None:
        steps = [step['content'] for step in c['custom_steps_separated']]
        expected_results = [step['expected'] for step in  c['custom_steps_separated']]


    pid = c['project_id']
    sid = c['suite_id']
    tid = c['id']

    if pid not in evaluations.keys():
        evaluations[pid] = {}
    if sid not in evaluations[pid].keys():
        evaluations[pid][sid] = []

    evaluations[pid][sid].append({
        'id' : tid,
        'title': c['title'],
        'description': c['custom_testcase_description'],
        'steps': steps,
        'expected_result': expected_results,
        'preconditions': c['custom_preconds'],
        'created_by': c['created_by'],
        'created_on': get_human_time(c['created_on']),
        'evaluation': eval
    })

In [None]:
count = 0
for k, v in evaluations.items():
    for kk, vv in v.items():
        print(f"project={k} suite={kk} count of cases = {len(vv)}")
        count += len(vv)
print(count)

In [None]:
for pid, suites in evaluations.items():
    for sid, tests in suites.items():
        now = datetime.datetime.now(datetime.UTC)
        formated_date = now.strftime("%Y%m%d-%H%M%S")
        filename = f"csv_output/{formated_date}-project_{pid}-suite_{sid}-evaluations.csv"
        fieldnames = tests[0].keys()
        try:
            # with open(filename, mode='w', newline='', encoding='utf-8') as csv_file:
            with open(filename, mode='w', newline='', encoding='utf-16') as csv_file:
                writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(tests)
            print(f"CSV file '{filename}' created successfully.")
        except Exception as e:
            print(e)
