In [7]:
!pip install youtube-transcript-api langchain-community chromadb pydantic playwright google-generativeai==0.3.2 streamlit pandas pdfkit
!playwright install



import os
import json
import re
import time
import random
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from typing import List, Dict, Optional, Tuple
from pydantic import BaseModel
from youtube_transcript_api import YouTubeTranscriptApi
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
import google.generativeai as genai
from google.api_core import client_options as google_client_options
import pandas as pd
import pdfkit

class Config:
    YOUTUBE_URL = "https://www.youtube.com/watch?v=IK62Rk47aas"
    with open("gemapi.txt") as f:
        GEMINI_API_KEY = f.read().strip()
    
    
    TEST_ENVIRONMENTS = {
        'local': 'http://localhost:3000',
        'staging': 'https://staging.recruter.ai'
    }
    BROWSERS = ['chromium', 'firefox', 'webkit']
    MOBILE_VIEWPORTS = [
        {'name': 'iPhone', 'width': 375, 'height': 667},
        {'name': 'Pixel', 'width': 412, 'height': 732}
    ]
    
    
    CHUNK_SIZE = 1000
    CHUNK_OVERLAP = 200
    TEST_CASE_OUTPUT_JSON = "test_cases.json"
    TEST_CASE_OUTPUT_MD = "test_cases.md"
    PLAYWRIGHT_SCRIPT_OUTPUT = "test_script.spec.ts"
    TEST_RESULTS_DIR = "test_results"
    SCREENSHOTS_DIR = os.path.join(TEST_RESULTS_DIR, "screenshots")
    VIDEOS_DIR = os.path.join(TEST_RESULTS_DIR, "videos")
    
    
    EMAIL_CONFIG = {
        'sender': 'qa@recruter.ai',
        'recipients': ['team@recruter.ai'],
        'smtp_server': 'smtp.recruter.ai',
        'smtp_port': 587
    }
    
     
    os.makedirs(TEST_RESULTS_DIR, exist_ok=True)
    os.makedirs(SCREENSHOTS_DIR, exist_ok=True)
    os.makedirs(VIDEOS_DIR, exist_ok=True)


class QAPersonality:
    NAME = "QAgenie"
    MISSION = "Ensure flawless user experiences on Recruter.ai"
    DESCRIPTION = """You are QAgenie — a calm, thorough AI QA assistant.
 Your mission is to ensure flawless user experiences on Recruter.ai.
 You carefully read help documents and watch training videos to understand user flows, edge cases, and expected UI behaviors.
 You automatically generate complete, accurate, and maintainable frontend test cases in Playwright.
 You run tests systematically, capture results, and summarize findings clearly with actionable insights.
 You never skip edge cases and always consider accessibility, cross-browser compatibility, and user error handling.
 You escalate ambiguous flows with clear context for clarification rather than guessing"""
    TRAITS = {
        "calm": 0.9,
        "thorough": 1.0,
        "systematic": 0.95,
        "accessible": 0.85,
        "precise": 0.95
    }


genai.configure(
    api_key=Config.GEMINI_API_KEY,
    transport="rest",
    client_options=google_client_options.ClientOptions(
        api_endpoint="generativelanguage.googleapis.com"
    )
)
model = genai.GenerativeModel('gemini-1.5-flash')


class TestStep(BaseModel):
    step_number: int
    action: str
    expected_result: str
    element_locator: Optional[str] = None
    input_data: Optional[str] = None
    notes: Optional[str] = None

class TestCase(BaseModel):
    title: str
    description: str
    category: str
    steps: List[TestStep]
    tags: List[str]
    priority: str
    generated_by: str = QAPersonality.NAME
    testing_approach: str = "Systematic validation"


def get_video_transcript(youtube_url: str) -> str:
    """Fetch transcript with network resilience"""
    max_retries = 3
    for attempt in range(max_retries):
        try:
            video_id = youtube_url.split("v=")[1].split("&")[0]
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            return " ".join([t['text'] for t in transcript])
        except Exception as e:
            if attempt == max_retries - 1:
                print(f"QAgenie: Failed after {max_retries} attempts - {str(e)}")
                return ""
            time.sleep(2 ** attempt)  

def generate_test_cases(transcript: str) -> List[TestCase]:
    """Generate test cases with bulletproof JSON handling and QAgenie's meticulous approach"""
    print("\n QAgenie is meticulously analyzing the content...")
    time.sleep(1)  

    
    prompt = f"""
    {QAPersonality.DESCRIPTION}

    TASK: Generate exactly 6 test cases in perfect JSON format.

    REQUIREMENTS:
    1. Output MUST be a JSON array wrapped in ```json``` markers
    2. Each test case MUST have:
       - title (string)
       - description (string)
       - steps (array with at least 2 steps)
       - tags (array of strings)
       - priority (string: 'high', 'medium', or 'low')

    EXAMPLE OUTPUT:
    ```json
    [
        {{
            "title": "Login with valid credentials",
            "description": "Verify successful login with correct credentials",
            "steps": [
                {{
                    "step_number": 1,
                    "action": "Enter valid email",
                    "expected_result": "Email field accepts input",
                    "element_locator": "#email-field"
                }},
                {{
                    "step_number": 2,
                    "action": "Click login button",
                    "expected_result": "User is redirected to dashboard",
                    "element_locator": "#login-btn"
                }}
            ],
            "tags": ["authentication", "happy-path"],
            "priority": "high"
        }}
    ]
    ```

    CONTENT TO ANALYZE:
    {transcript[:3000]}  <!-- Reduced length for reliability -->
    """

    for attempt in range(3):
        try:
            print(f"\n QAgenie's Analysis Attempt {attempt + 1}/3")
            response = model.generate_content(prompt)
            
            
            json_match = None
            json_patterns = [
                r'```json\n(.*?)\n```',  
                r'```(.*?)```',          
                r'(\[.*?\])'             
            ]

            for pattern in json_patterns:
                match = re.search(pattern, response.text, re.DOTALL)
                if match:
                    json_str = match.group(1).strip()
                    if json_str.startswith('json'):
                        json_str = json_str[4:].strip()
                    break

            if not json_str:
                print(" QAgenie Note: No JSON found in response")
                continue


            test_data = json.loads(json_str)
            if not isinstance(test_data, list):
                test_data = [test_data]

            valid_cases = []
            for tc in test_data:
                try:
                    
                    if not all(k in tc for k in ['title', 'steps']):
                        print(f" QAgenie: Skipping incomplete test case")
                        continue

                   
                    normalized_steps = []
                    for i, step in enumerate(tc.get('steps', [])):
                        if not all(k in step for k in ['action', 'expected_result']):
                            continue
                        normalized_steps.append({
                            'step_number': i + 1,
                            'action': step['action'],
                            'expected_result': step['expected_result'],
                            'element_locator': step.get('element_locator', ''),
                            'input_data': step.get('input_data', '')
                        })

                    if not normalized_steps:
                        continue

                    
                    valid_case = {
                        'title': tc['title'],
                        'description': tc.get('description', ''),
                        'category': tc.get('category', 'functional'),
                        'steps': normalized_steps,
                        'tags': tc.get('tags', []),
                        'priority': tc.get('priority', 'medium')
                    }
                    valid_cases.append(TestCase(**valid_case))

                except Exception as e:
                    print(f" QAgenie: Refined test case - {str(e)}")

            if valid_cases:
                print(f"✅ QAgenie Success: Generated {len(valid_cases)} valid test cases")
                return valid_cases

        except json.JSONDecodeError as e:
            print(f" QAgenie: JSON parsing error - {str(e)}")
            print(" Tip: Trying alternative parsing approach...")
            
            try:
                clean_str = response.text.replace("'", '"').replace("None", "null")
                test_data = json.loads(clean_str)
                if isinstance(test_data, list):
                    print(" QAgenie: Recovered test cases from cleaned response")
                    return [TestCase(**tc) for tc in test_data]
            except:
                pass
        except Exception as e:
            print(f" QAgenie: Unexpected error - {str(e)}")

        if attempt < 2:
            print(" QAgenie is thoughtfully retrying...")
            time.sleep(2)

    print("\n QAgenie Critical Alert: Unable to generate valid test cases")
    print("Recommended Actions:")
    print("1. Verify the transcript contains clear instructions")
    print("2. Try a shorter transcript segment (under 3000 characters)")
    print("3. Check the AI model's response format")
    return []
    
    
    mandatory_cases = [
        {
            "title": "Network Offline Handling",
            "category": "edge_case",
            "tags": ["network", "error_handling"],
            "steps": [{
                "action": "Simulate offline network conditions",
                "expected_result": "App shows proper offline UI",
                "element_locator": ".network-status"
            }]
        },
        {
            "title": "Mobile View Validation",
            "category": "compatibility",
            "tags": ["mobile", "responsive"],
            "steps": [{
                "action": "Resize to mobile viewport",
                "expected_result": "UI adapts without horizontal scrolling",
                "element_locator": "body"
            }]
        }
    ]
    
    return test_cases + [TestCase(**case) for case in mandatory_cases]

def save_outputs(test_cases: List[TestCase]):
    """Save results with QAgenie's clear documentation style (Unicode-safe)"""
    
    with open(Config.TEST_CASE_OUTPUT_JSON, 'w', encoding='utf-8') as f:
        json.dump([tc.dict() for tc in test_cases], f, indent=2, ensure_ascii=False)
    
    
    with open(Config.TEST_CASE_OUTPUT_MD, 'w', encoding='utf-8') as f:
        f.write(f"# {QAPersonality.NAME} Test Suite\n\n")
        f.write(f"*{QAPersonality.MISSION}*\n")
        f.write(f"*Generated on: {time.strftime('%Y-%m-%d %H:%M')}*\n\n")
        
        for tc in test_cases:
            f.write(f"## {tc.title}\n")
            f.write(f"**Purpose**: {tc.description}\n\n")
            f.write(f"- **Category**: {tc.category}\n")
            f.write(f"- **Priority**: {tc.priority}\n")
            f.write(f"- **Approach**: {tc.testing_approach}\n")
            
            for step in tc.steps:
                f.write(f"\n{step.step_number}. **{step.action}**")
                f.write(f"\n   → Expected: {step.expected_result}")
                if step.element_locator:
                    f.write(f"\n   → Element: `{step.element_locator}`")
                if step.notes:
                    f.write(f"\n   → Note: {step.notes}")
            
            f.write("\n\n---\n")

def generate_playwright_script(test_cases: List[TestCase]):
    """Generate cross-browser test scripts with proper indentation"""
    script = """// QAgenie Test Suite
import { test, expect } from '@playwright/test';

// Configure browsers
const browsers = [
    { name: 'Chromium', browserName: 'chromium' },
    { name: 'Firefox', browserName: 'firefox' },
    { name: 'WebKit', browserName: 'webkit' }
];

// Mobile viewports
const mobileDevices = [
    { name: 'iPhone', width: 375, height: 667 },
    { name: 'Pixel', width: 412, height: 732 }
];

// Test configurations
for (const browser of browsers) {
    test.describe(`Recruter.ai Tests - ${browser.name}`, () => {
        test.use({ browserName: browser.browserName });
        
        // Mobile tests
        test.describe('Mobile Tests', () => {
            for (const device of mobileDevices) {
                test.use({ viewport: { width: device.width, height: device.height } });
                
                test('${device.name} - Responsive Layout', async ({ page }) => {
                    await page.goto(process.env.TEST_URL);
                    await expect(page.locator('body')).not.toHaveCSS('overflow-x', 'visible');
                });
            }
        });
        
        // Generated Test Cases
        test.describe('Generated Test Cases', () => {\n"""

    
    for tc in test_cases:
        script += f"            test('{tc.title}', async ({{ page }}) => {{\n"
        for step in tc.steps:
            action = step.action.lower()
            
            
            if "click" in action:
                script += f"                await page.locator('{step.element_locator}').click();\n"
            elif any(x in action for x in ["type", "enter", "fill"]):
                script += f"                await page.locator('{step.element_locator}').fill('{step.input_data or 'testdata'}');\n"
            elif "navigate" in action or "go to" in action:
                script += f"                await page.goto('{step.input_data}');\n"
            
            # Add verification
            if "should see" in step.expected_result.lower():
                script += f"                await expect(page.locator('{step.element_locator}')).toBeVisible();\n"
            elif "should contain" in step.expected_result.lower():
                text = step.expected_result.split("contain")[-1].strip()
                script += f"                await expect(page.locator('{step.element_locator}')).toContainText('{text}');\n"
        
        script += "            });\n\n"
    
    
    script += "        });\n    });\n}\n"
    
    with open(Config.PLAYWRIGHT_SCRIPT_OUTPUT, 'w') as f:
        f.write(script)

def execute_tests():
    """Run tests with QAgenie's systematic approach"""
    print("\nQAgenie is running tests...")
    
    os.makedirs(Config.TEST_RESULTS_DIR, exist_ok=True)
    
    
    os.system(
        f"npx playwright test {Config.PLAYWRIGHT_SCRIPT_OUTPUT} "
        f"--reporter=json,html "
        f"--output={Config.TEST_RESULTS_DIR}/results.json "
        f"--project=chromium"
    )
    
    
    if os.path.exists("playwright-report"):
        os.system(f"mv playwright-report/* {Config.TEST_RESULTS_DIR}/")


def generate_reports():
    """Generate all report formats"""
    results_path = os.path.join(Config.TEST_RESULTS_DIR, 'results.json')
    
    if not os.path.exists(results_path):
        print(" QAgenie Note: No test results found")
        print("Possible reasons:")
        print("1. Tests haven't been executed yet")
        print("2. Playwright failed to generate results")
        print("3. File path is incorrect")
        return
    
    try:
        with open(results_path) as f:
            results = json.load(f)
        
        
        
    except Exception as e:
        print(f" QAgenie Error: Failed to process results - {str(e)}")
    
    
    send_email_report()

def send_email_report():
    """Send comprehensive email report"""
    msg = MIMEMultipart()
    msg['From'] = Config.EMAIL_CONFIG['sender']
    msg['To'] = ", ".join(Config.EMAIL_CONFIG['recipients'])
    msg['Subject'] = f"QAgenie Test Report - {time.strftime('%Y-%m-%d')}"
    
    
    with open(os.path.join(Config.TEST_RESULTS_DIR, 'results.md')) as f:
        msg.attach(MIMEText(f.read(), 'plain'))
    
    
    with open(os.path.join(Config.TEST_RESULTS_DIR, 'report.pdf'), 'rb') as f:
        msg.attach(MIMEApplication(f.read(), Name='QA_Report.pdf'))
    
    
    with open(os.path.join(Config.TEST_RESULTS_DIR, 'results.csv'), 'rb') as f:
        msg.attach(MIMEApplication(f.read(), Name='results.csv'))
    
    
    with smtplib.SMTP(Config.EMAIL_CONFIG['smtp_server'], Config.EMAIL_CONFIG['smtp_port']) as server:
        server.send_message(msg)


def run_dashboard():
    import streamlit as st
    
    st.set_page_config(layout="wide")
    st.title(f" {QAPersonality.NAME} Dashboard")
    
    
    with open(os.path.join(Config.TEST_RESULTS_DIR, 'results.json')) as f:
        results = json.load(f)
    
    
    col1, col2, col3 = st.columns(3)
    col1.metric("Total Tests", len(results['tests']))
    col2.metric("Passed", sum(1 for t in results['tests'] if t['status'] == 'passed'))
    col3.metric("Failed", sum(1 for t in results['tests'] if t['status'] != 'passed'))
    
    
    with st.expander("Test Details"):
        for test in results['tests']:
            status = "PASSED" if test['status'] == 'passed' else "FAILED"
            st.markdown(f"**{status} {test['title']}** ({test.get('browser', 'N/A')})")
            
            if test['status'] != 'passed':
                col1, col2 = st.columns(2)
                if os.path.exists(test.get('screenshot', '')):
                    col1.image(test['screenshot'], caption="Failure Screenshot")
                if os.path.exists(test.get('video', '')):
                    col2.video(test['video'])
                
                st.error(test.get('error', 'No error details available'))


def run_pipeline():
    """QAgenie's end-to-end testing workflow"""
    print(f"\n Initializing {QAPersonality.NAME}")
    print(f" Mission: {QAPersonality.MISSION}")
    
    # 1. Gather Training Materials
    transcript = get_video_transcript(Config.YOUTUBE_URL)
    if not transcript:
        return
    
    # 2. Generate Test Cases
    test_cases = generate_test_cases(transcript)
    if not test_cases:
        return
    
    # 3. Prepare Artifacts
    save_outputs(test_cases)
    generate_playwright_script(test_cases)
    
    # 4. Execute Tests
    execute_tests()
    
    # 5. Reporting
    print("\n QAgenie Report Summary:")
    print(f"- Test cases generated: {len(test_cases)}")
    print(f"- Results directory: {Config.TEST_RESULTS_DIR}")
    
if __name__ == "__main__":
    run_pipeline()


 Initializing QAgenie
 Mission: Ensure flawless user experiences on Recruter.ai

 QAgenie is meticulously analyzing the content...

 QAgenie's Analysis Attempt 1/3
 QAgenie: JSON parsing error - Expecting ',' delimiter: line 108 column 34 (char 3990)
 Tip: Trying alternative parsing approach...
 QAgenie is thoughtfully retrying...

 QAgenie's Analysis Attempt 2/3
✅ QAgenie Success: Generated 6 valid test cases

QAgenie is running tests...

 QAgenie Report Summary:
- Test cases generated: 6
- Results directory: test_results


In [8]:
import subprocess
import threading
import time
import webbrowser

def run_streamlit():
    try:
        subprocess.run([
            "streamlit", 
            "run", 
            "QA_frontend.py",
            "--server.port=8501",
            "--server.headless=true",
            "--browser.serverAddress=localhost",
            "--logger.level=error"
        ], check=True)
    except subprocess.CalledProcessError as e:
        print(f"Error running Streamlit: {e}")


thread = threading.Thread(target=run_streamlit, daemon=True)
thread.start()
time.sleep(3)
webbrowser.open("http://localhost:8501")

print("Dashboard running at: http://localhost:8501")
print("Keep this notebook running to maintain the connection")

Error running Streamlit: Command '['streamlit', 'run', 'QA_frontend.py', '--server.port=8501', '--server.headless=true', '--browser.serverAddress=localhost', '--logger.level=error']' returned non-zero exit status 1.
Dashboard running at: http://localhost:8501
Keep this notebook running to maintain the connection
