In [None]:
import os
import psycopg
from psycopg.rows import dict_row
#from config import AppConfig
#cfg =AppConfig()
import os


database_url=""
def get_connection():
    return psycopg.connect(
        database_url,
        row_factory=dict_row
    )


In [4]:
from tools.save_job import save_scheduled_job
#from rq_config import test_generation_queue
from tools.extract_rows import extract_test_cases
from tools.priority_summary import summarize_test_case_priorities
from psycopg.rows import dict_row
from db import get_connection as get_db

def get_job_results(job_id: str):
    # Fetch job from mock DBasync def create_project(req: CreateProjectRequest, request: Request):
    conn = get_connection()
    print(f"Fetching results for job_id: {job_id}")
    try:
        with conn.cursor() as cursor:
            # 1️⃣ Fetch functional test cases for the job
            cursor.execute(
                """
                SELECT
                    ftc.test_case_id,
                    ftc.result
                FROM function_test_cases ftc
                WHERE ftc.job_id = %s
                """,
                (job_id,),
            )
            functional_rows = cursor.fetchall()

            # 2️⃣ Extract test cases
            #test_cases = extract_test_cases(functional_rows)
            #print("Extracted Test Cases:", test_cases)
            results_payloads = [row["result"] for row in functional_rows]
            test_cases = extract_test_cases(results_payloads)
            # 4️⃣ Summarize priorities
            summary = summarize_test_case_priorities(test_cases)

            # 5️⃣ Fetch automation scripts for the job
            cursor.execute(
                """
                SELECT
                    ascr.script
                FROM automation_scripts ascr
                JOIN user_stories us ON ascr.user_story_id = us.user_story_id
                WHERE us.job_id = %s
                """,
                (job_id,),
            )
            automation_rows = cursor.fetchall()

            # 6️⃣ Process automation scripts
            # dict comprehension to store scripts by their 

            automation_scripts = automation_rows[0]["script"] if automation_rows else {}



            # automation_scripts = {
            #     row["automation_id"]: row["script"] for row in automation_rows
            # }
            # automation_scripts = automation_scripts[]
   
            # 3️⃣ Fetch job info
            cursor.execute(
                """
                SELECT
                    job_id,
                    project_name,
                    description,
                    status,
                    submitted_at
                FROM scheduled_jobs
                WHERE job_id = %s
                """,
                (job_id,),
            )   
            job = cursor.fetchone() 
            STATUS_MAP = {
                "IN_QUEUE": "In Queue",
                "IN_PROGRESS": "In Progress",
                "COMPLETED": "Completed",
                "FAILED": "Failed"
            }
            
            job_info= {
                "job_id": job_id,
                "project_name": job["project_name"],
                "description": job["description"],
                "status": STATUS_MAP.get(job["status"], "In Queue"),
                "submitted_at": job["submitted_at"],
                "test_count": len(test_cases)
            }

            return {
                "high_priority_count": summary.get("high", 0),
                "medium_priority_count": summary.get("medium", 0),
                "low_priority_count": summary.get("low", 0),
                "test_cases": test_cases,
                "automation_scripts": automation_scripts,
                "job_info": job_info
            }

    finally:
        conn.close()

In [5]:
ans=get_job_results("23")

Fetching results for job_id: 23
Priority Summary - High: 6, Medium: 4, Low: 0


In [6]:
ans

{'high_priority_count': 6,
 'medium_priority_count': 4,
 'low_priority_count': 0,
 'test_cases': [{'ID': 'C1001',
   'Type': 'positive',
   'Steps': "1. Enter 'testuser@example.com' into the email field.\n2. Enter 'Password123!' into the password field.\n3. Click the 'Login' button.",
   'Title': 'Verify successful login with valid credentials',
   'Priority': 'High',
   'TestData': {'Inputs': {'email': 'testuser@example.com',
     'password': 'Password123!'},
    'DB_Mock': {'users': {'email': 'testuser@example.com',
      'password_hash': 'hashed_password_of_Password123!'}},
    'API_Payload': {'email': 'testuser@example.com',
     'password': 'Password123!'}},
   'Preconditions': "A registered user 'testuser@example.com' with password 'Password123!' exists in the system. User is on the login page.",
   'Expected Results': 'User is successfully authenticated and redirected to the dashboard page. The dashboard header or a specific dashboard element is visible.'},
  {'ID': 'C1002',
   

In [7]:
def get_all_jobs():
    jobs_list = []

    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute(
                """
                SELECT
                    sj.job_id,
                    sj.project_name,
                    sj.description,
                    sj.status,
                    sj.submitted_at,
                    COUNT(ftc.test_case_id) AS test_count
                FROM scheduled_jobs sj
                LEFT JOIN function_test_cases ftc
                    ON sj.job_id = ftc.job_id
                GROUP BY
                    sj.job_id,
                    sj.project_name,
                    sj.description,
                    sj.status,
                    sj.submitted_at
                ORDER BY sj.submitted_at DESC
                """
            )

            rows = cursor.fetchall()
            STATUS_MAP = {
                "IN_QUEUE": "In Queue",
                "IN_PROGRESS": "In Progress",
                "COMPLETED": "Completed",
                "FAILED": "Failed"
            }

            for row in rows:
                jobs_list.append(
                    {
                        "id": row["job_id"],  # UI uses this for the list
                        "project": row["project_name"],
                        "description": row["description"],
                        "status": STATUS_MAP.get(row["status"], "In Queue"),
                        # PostgreSQL returns datetime objects already
                        "submitted": row["submitted_at"].strftime("%b %d, %I:%M %p"),
                        "tests": row["test_count"],
                    }
                )

        return jobs_list

    finally:
        conn.close()

In [None]:
def get_all_jobs():
    jobs_list = []

    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute(
                """
                    SELECT
                        sj.job_id,
                        sj.project_name,
                        sj.description,
                        sj.status,
                        sj.submitted_at,
                        COALESCE(
                            SUM(
                                CASE
                                    WHEN jsonb_typeof(ftc.result) = 'array'
                                    AND jsonb_typeof(ftc.result->0) = 'array'
                                    THEN jsonb_array_length(ftc.result->0)
                                    WHEN jsonb_typeof(ftc.result) = 'array'
                                    THEN jsonb_array_length(ftc.result)
                                    ELSE 0
                                END
                            ),
                            0
                        ) AS test_count
                    FROM scheduled_jobs sj
                    LEFT JOIN function_test_cases ftc
                        ON sj.job_id = ftc.job_id
                    GROUP BY
                        sj.job_id,
                        sj.project_name,
                        sj.description,
                        sj.status,
                        sj.submitted_at
                    ORDER BY sj.submitted_at DESC;
                """
            )

            rows = cursor.fetchall()
            STATUS_MAP = {
                "IN_QUEUE": "In Queue",
                "IN_PROGRESS": "In Progress",
                "COMPLETED": "Completed",
                "FAILED": "Failed"
            }

            for row in rows:
                jobs_list.append(
                    {
                        "id": row["job_id"],
                        "project": row["project_name"],
                        "description": row["description"],
                        "status": STATUS_MAP.get(row["status"], "In Queue"),
                        "submitted": row["submitted_at"].strftime("%b %d, %I:%M %p"),
                        "tests": row["test_count"],
                    }
                )

        return jobs_list

    finally:
        conn.close()


In [11]:
r=get_all_jobs()

InvalidParameterValue: cannot get array length of a non-array

In [9]:
r

[{'id': 23,
  'project': 'Amazon',
  'description': 'Fun1',
  'status': 'Completed',
  'submitted': 'Feb 09, 08:10 AM',
  'tests': 1},
 {'id': 22,
  'project': 'Blinkit',
  'description': 'Functional test request for validating the user login flow.',
  'status': 'Completed',
  'submitted': 'Feb 09, 07:56 AM',
  'tests': 1},
 {'id': 21,
  'project': 'Amazon',
  'description': 'Functional test request for validating the user login flow',
  'status': 'Completed',
  'submitted': 'Feb 09, 07:33 AM',
  'tests': 1},
 {'id': 20,
  'project': 'Blinkit',
  'description': 'Functional test request for validating the user login flow',
  'status': 'Failed',
  'submitted': 'Feb 09, 07:09 AM',
  'tests': 0},
 {'id': 18,
  'project': 'Amazon',
  'description': 'amazon ui test',
  'status': 'Completed',
  'submitted': 'Feb 08, 08:17 AM',
  'tests': 0},
 {'id': 17,
  'project': 'Blinkit',
  'description': 'blinkit test ',
  'status': 'Completed',
  'submitted': 'Feb 07, 04:53 PM',
  'tests': 0},
 {'id': 