<a href="https://colab.research.google.com/github/apatils-fti/AI_ML_Automation/blob/main/Welcome_To_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
XRay Data Extraction Script - FINAL VERSION WITH HARDCODED KEYS
Extracts all test executions and test cases from XRay/Jira
Run this in Google Colab or locally
"""

import time
import pandas as pd
import requests

# ============================================================================
# HARDCODED API KEYS
# ============================================================================

XRAY_CLIENT_ID = "34E4C280B65E4C63B3F103B86FBAD190"
XRAY_CLIENT_SECRET = "60407aac541855f71996a7f48ee001cfecb5c744c51a1d2cd7f43104caef50cb"

# If you want to use Colab secrets instead, uncomment these lines:
# from google.colab import userdata
# XRAY_CLIENT_ID = userdata.get('XRAY_CLIENT_ID').strip()
# XRAY_CLIENT_SECRET = userdata.get('XRAY_CLIENT_SECRET').strip()

XRAY_GRAPHQL_URL = 'https://xray.cloud.getxray.app/api/v2/graphql'
REQUEST_DELAY = 0.5

def get_xray_token():
    headers = {"Content-Type": "application/json"}
    payload = {"client_id": XRAY_CLIENT_ID, "client_secret": XRAY_CLIENT_SECRET}
    response = requests.post('https://xray.cloud.getxray.app/api/v2/authenticate',
                            json=payload, headers=headers)
    return response.json() if response.status_code == 200 else None

def execute_graphql(graphql, token, max_retries=10):
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    for attempt in range(max_retries):
        try:
            response = requests.post(XRAY_GRAPHQL_URL, json={"query": graphql}, headers=headers)
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 429:
                time.sleep(60)
                continue
        except Exception as e:
            time.sleep(2)
    return {"error": "Max retries exceeded"}

print("üöÄ XRAY DATA EXTRACTION - GET ALL TEST EXECUTIONS & TEST CASES")
print("=" * 60)

# Load mapping
mapping_df = pd.read_excel('App_to_Jira_Testing_Mapping_102925.xlsx', header=0)
mapping_df.columns = mapping_df.iloc[0]
mapping_df = mapping_df[1:].reset_index(drop=True)
mapping_df = mapping_df.drop(columns=[mapping_df.columns[0]])

print(f"‚úÖ Loaded {len(mapping_df)} apps from mapping")

# Create lookups for EVERY key in mapping
key_to_info = {}

for idx, row in mapping_df.iterrows():
    app_name = row['App Name']
    priority = row.get('Priority?')
    status = row.get('Status')

    for key_col, date_cols, phase_type in [
        ('QA Issue Key', ('QA Initial Date', 'QA Due Date', 'QA Date'), 'QA'),
        ('UAT Issue Key', ('UAT Intial Date', 'UAT Due Date', 'UAT Date'), 'UAT'),
        ('IT Issue Key', ('IT Intial Date', 'IT Due Date', None), 'IT')
    ]:
        if pd.notna(row[key_col]):
            key = str(row[key_col]).strip()
            key_to_info[key] = {
                'app_name': app_name,
                'app_priority': priority,
                'app_status': status,
                'phase_type': phase_type,
                'initial_date': row.get(date_cols[0]),
                'due_date': row.get(date_cols[1]),
                'completion_date': row.get(date_cols[2]) if date_cols[2] else None
            }

print(f"‚úÖ Created lookups for {len(key_to_info)} keys from mapping")

xray_token = get_xray_token()
projects = ['SBDS', 'SBIT', 'SBSR', 'SBSS']

# Get ALL work items
print(f"\n{'='*60}")
print(f"üìä GETTING ALL TEST EXECUTIONS")
print(f"{'='*60}\n")

all_work_items = []

for project in projects:
    start = 0
    limit = 100

    while True:
        graphql = f"""{{
            getTestExecutions(jql: "project = {project}", start: {start}, limit: {limit}) {{
                total
                results {{
                    issueId
                    testEnvironments
                    jira(fields: ["key", "summary", "project", "status", "created", "updated", "parent"])
                }}
            }}
        }}"""

        response = execute_graphql(graphql, xray_token)

        if 'errors' in response or 'data' not in response:
            break

        results = response['data']['getTestExecutions']['results']
        total = response['data']['getTestExecutions'].get('total', 0)

        all_work_items.extend(results)

        if len(results) < limit or start + len(results) >= total:
            break

        start += limit
        time.sleep(REQUEST_DELAY)

    print(f"üîç {project}: {len([w for w in all_work_items if w.get('jira', {}).get('project', {}).get('key') == project])} work items")

print(f"\n‚úÖ Total: {len(all_work_items)} work items")

# Try matching by work item key OR parent key
print(f"\nüìã Matching work items to mapping...")
matched_count = 0

for work_item in all_work_items:
    exec_jira = work_item.get('jira', {})
    work_item_key = exec_jira.get('key')
    parent = exec_jira.get('parent', {})
    parent_key = parent.get('key') if parent else None

    # Try matching work item key first
    if work_item_key in key_to_info:
        work_item['_matched_key'] = work_item_key
        matched_count += 1
    # Try matching parent key
    elif parent_key and parent_key in key_to_info:
        work_item['_matched_key'] = parent_key
        matched_count += 1
    else:
        work_item['_matched_key'] = None

print(f"‚úÖ Matched {matched_count} work items to mapping")

# Process ALL work items (matched or not)
print(f"\n{'='*60}")
print(f"üìä EXTRACTING TEST CASES")
print(f"{'='*60}\n")

detailed_data = []
summary_data = []
processed = 0

for work_item in all_work_items:
    exec_id = work_item['issueId']
    exec_jira = work_item.get('jira', {})
    work_item_key = exec_jira.get('key', 'Unknown')
    work_item_summary = exec_jira.get('summary', 'Unknown')
    matched_key = work_item.get('_matched_key')

    if processed % 20 == 0:
        print(f"üìã Progress: {processed}/{len(all_work_items)} - {len(detailed_data)} test cases")

    try:
        project_key = exec_jira.get('project', {}).get('key', 'Unknown')
        environments = work_item.get('testEnvironments', [])
        env_name = environments[0] if environments else 'Unknown'

        work_item_status = exec_jira.get('status', {}).get('name', 'Unknown')
        work_item_created = exec_jira.get('created')
        work_item_updated = exec_jira.get('updated')

        parent = exec_jira.get('parent', {})
        parent_key = parent.get('key') if parent else None

        # Get info from mapping if matched
        if matched_key:
            info = key_to_info[matched_key]
            app_name = info['app_name']
            app_priority = info['app_priority']
            app_status = info['app_status']
            phase_type = info['phase_type']
            phase_initial_date = info['initial_date']
            phase_due_date = info['due_date']
            phase_completion_date = info['completion_date']
        else:
            app_name = 'Not in Mapping'
            app_priority = None
            app_status = None
            phase_type = 'Unknown'
            phase_initial_date = None
            phase_due_date = None
            phase_completion_date = None

        # Get test cases
        all_test_runs = []
        test_start = 0
        test_limit = 100

        while True:
            graphql_tests = f"""{{
                getTestRuns(testExecIssueIds: ["{exec_id}"], start: {test_start}, limit: {test_limit}) {{
                    total
                    results {{
                        status {{ name }}
                        startedOn
                        finishedOn
                        test {{
                            jira(fields: ["key", "summary", "labels", "created", "creator", "status", "assignee", "priority", "components", "updated"])
                        }}
                    }}
                }}
            }}"""

            resp = execute_graphql(graphql_tests, xray_token)

            if 'errors' in resp or 'data' not in resp or 'getTestRuns' not in resp['data']:
                break

            test_results = resp['data']['getTestRuns']['results']
            test_total = resp['data']['getTestRuns'].get('total', 0)

            all_test_runs.extend(test_results)

            if len(test_results) < test_limit or len(all_test_runs) >= test_total:
                break

            test_start += test_limit
            time.sleep(REQUEST_DELAY)

        counts = {'PASSED': 0, 'FAILED': 0, 'BLOCKED': 0, 'EXECUTING': 0, 'TO DO': 0, 'PASS_THROUGH': 0}

        for test in all_test_runs:
            test_jira = test['test']['jira']
            status = test['status']['name']

            if status in counts:
                counts[status] += 1

            detailed_data.append({
                'app_name': app_name,
                'app_key': project_key,
                'app_mapped_key': matched_key if matched_key else 'Not Mapped',
                'app_priority': app_priority,
                'app_status': app_status,
                'test_phase_type': phase_type,
                'phase_initial_date': phase_initial_date,
                'phase_due_date': phase_due_date,
                'phase_completion_date': phase_completion_date,
                'work_item_id': exec_id,
                'work_item_key': work_item_key,
                'work_item_summary': work_item_summary,
                'work_item_parent': parent_key if parent_key else 'No Parent',
                'work_item_environment': env_name,
                'work_item_status': work_item_status,
                'work_item_created': work_item_created,
                'work_item_updated': work_item_updated,
                'test_key': test_jira['key'],
                'test_summary': test_jira['summary'],
                'test_labels': ", ".join(test_jira.get('labels', [])),
                'test_created_date': test_jira.get('created', ''),
                'test_updated_date': test_jira.get('updated', ''),
                'test_created_by': test_jira.get('creator', {}).get('displayName', 'Unknown'),
                'test_status': test_jira.get('status', {}).get('name', 'Unknown'),
                'test_execution_status': status,
                'test_assignee': test_jira.get('assignee', {}).get('displayName', 'Unassigned') if test_jira.get('assignee') else 'Unassigned',
                'test_priority': test_jira.get('priority', {}).get('name', 'Unknown') if test_jira.get('priority') else 'Unknown',
                'test_components': ", ".join([c['name'] for c in test_jira.get('components', [])]),
                'execution_started_date': test.get('startedOn', ''),
                'execution_finished_date': test.get('finishedOn', ''),
            })

        if len(all_test_runs) > 0:
            summary_data.append({
                'app_name': app_name,
                'app_mapped_key': matched_key if matched_key else 'Not Mapped',
                'app_priority': app_priority,
                'test_phase_type': phase_type,
                'phase_initial_date': phase_initial_date,
                'phase_due_date': phase_due_date,
                'phase_completion_date': phase_completion_date,
                'work_item_key': work_item_key,
                'work_item_parent': parent_key if parent_key else 'No Parent',
                'work_item_environment': env_name,
                'passed': counts['PASSED'],
                'failed': counts['FAILED'],
                'blocked': counts['BLOCKED'],
                'executing': counts['EXECUTING'],
                'todo': counts['TO DO'],
                'pass_through': counts['PASS_THROUGH'],
                'total_tests': len(all_test_runs)
            })

        processed += 1

        if processed % 50 == 0:
            xray_token = get_xray_token()

        time.sleep(REQUEST_DELAY)

    except Exception as e:
        continue

df_v1 = pd.DataFrame(summary_data)
df_v2 = pd.DataFrame(detailed_data)

print(f"\n{'='*60}")
print(f"‚úÖ COMPLETE!")
print(f"{'='*60}")
print(f"V1: {len(df_v1)} work items")
print(f"V2: {len(df_v2)} test cases")

if not df_v2.empty:
    print(f"\nüìä Mapped vs Not Mapped:")
    print(df_v2['app_mapped_key'].value_counts().head(20))

    matched_df = df_v2[df_v2['app_mapped_key'] != 'Not Mapped']
    print(f"\n‚úÖ {len(matched_df)} test cases matched to mapping")
    print(f"‚ùå {len(df_v2) - len(matched_df)} test cases NOT in mapping")

filename_v1 = 'v1_summary_ALL_WORK_ITEMS.csv'
filename_v2 = 'v2_detailed_ALL_WORK_ITEMS.csv'

df_v1.to_csv(filename_v1, index=False)
df_v2.to_csv(filename_v2, index=False)

print(f"\nüíæ Files:")
print(f"   {filename_v1}")
print(f"   {filename_v2}")

# Try to download files (works in Colab)
try:
    from google.colab import files
    files.download(filename_v1)
    files.download(filename_v2)
    print("\nüì• Files downloaded!")
except:
    print("\nüí° Files saved to current directory")

print("\nüéâ DATA EXTRACTED!")
print("   Filter in Power BI: app_mapped_key != 'Not Mapped' to see your apps")

In [None]:
"""
XRay Data Extraction Script - FINAL VERSION WITH BETTER RATE LIMITING
Uses JQL query to get only Test Executions with parents
Structure: Parent App (SBSS-8) ‚Üí Work Item (SBSS-180) ‚Üí Tests (SBSS-245, etc.)
"""

import time
import pandas as pd
import requests
from datetime import datetime

# ============================================================================
# HARDCODED API KEYS
# ============================================================================

XRAY_CLIENT_ID = "34E4C280B65E4C63B3F103B86FBAD190"
XRAY_CLIENT_SECRET = "60407aac541855f71996a7f48ee001cfecb5c744c51a1d2cd7f43104caef50cb"

XRAY_GRAPHQL_URL = 'https://xray.cloud.getxray.app/api/v2/graphql'

# Rate limiting settings
BASE_DELAY = 1.0  # 1 second between requests
RETRY_DELAY = 10  # 10 seconds on first retry
MAX_RETRIES = 5

def get_xray_token():
    headers = {"Content-Type": "application/json"}
    payload = {"client_id": XRAY_CLIENT_ID, "client_secret": XRAY_CLIENT_SECRET}
    response = requests.post('https://xray.cloud.getxray.app/api/v2/authenticate',
                            json=payload, headers=headers)
    return response.json() if response.status_code == 200 else None

def execute_graphql(graphql, token, max_retries=MAX_RETRIES):
    """Execute GraphQL with exponential backoff and better rate limiting"""
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }

    for attempt in range(max_retries):
        try:
            response = requests.post(XRAY_GRAPHQL_URL, json={"query": graphql}, headers=headers, timeout=60)

            if response.status_code == 200:
                return response.json()
            elif response.status_code == 429:
                # Exponential backoff for rate limiting
                wait_time = RETRY_DELAY * (2 ** attempt)  # 10s, 20s, 40s, 80s, 160s
                print(f"  ‚ö† Rate limited (attempt {attempt + 1}/{max_retries}), waiting {wait_time}s...")
                time.sleep(wait_time)
                continue
            else:
                print(f"  ‚ö† HTTP {response.status_code}: {response.text[:200]}")
                if attempt < max_retries - 1:
                    time.sleep(RETRY_DELAY)
                continue

        except Exception as e:
            print(f"  ‚ö† Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(RETRY_DELAY)

    return {"error": "Max retries exceeded"}

print("üöÄ XRAY DATA EXTRACTION - OPTIMIZED WITH RATE LIMITING")
print("=" * 80)
print("Structure: Parent App ‚Üí Work Item (Test Execution) ‚Üí Tests")
print("=" * 80)

xray_token = get_xray_token()
if not xray_token:
    print("‚ùå Authentication failed!")
    exit(1)

print("‚úÖ Authenticated")

# JQL Query - only get Test Executions with parents
# Need to escape quotes in the JQL string for GraphQL
JQL_QUERY = 'project in (SBB, SBDS, SBD, SBERP, SBIT, SBSR, SBSS) AND issuetype = \\"Test Execution\\" AND parent != null'

print(f"\nüìã JQL Query: {JQL_QUERY}")

# Get ALL Test Executions using JQL
print(f"\n{'='*80}")
print(f"üìä STEP 1: GETTING TEST EXECUTIONS WITH PARENTS")
print(f"{'='*80}\n")

all_work_items = []
start = 0
limit = 100
page = 1

while True:
    print(f"  Fetching page {page} (items {start + 1}-{start + limit})...")

    graphql = f"""{{
        getTestExecutions(jql: "{JQL_QUERY}", start: {start}, limit: {limit}) {{
            total
            results {{
                issueId
                testEnvironments
                jira(fields: [
                    "key",
                    "summary",
                    "project",
                    "status",
                    "created",
                    "updated",
                    "duedate",
                    "resolutiondate",
                    "parent",
                    "assignee",
                    "priority",
                    "labels",
                    "components"
                ])
            }}
        }}
    }}"""

    response = execute_graphql(graphql, xray_token)

    if 'errors' in response:
        print(f"  ‚úó GraphQL Error: {response['errors']}")
        break

    if 'data' not in response:
        print(f"  ‚úó No data in response")
        break

    data = response['data']['getTestExecutions']
    results = data.get('results', [])
    total = data.get('total', 0)

    if page == 1:
        print(f"  Total Test Executions found: {total}")

    all_work_items.extend(results)
    print(f"  ‚úì Fetched {len(results)} items (total so far: {len(all_work_items)})")

    if len(results) < limit or start + len(results) >= total:
        break

    start += limit
    page += 1
    time.sleep(BASE_DELAY)  # Rate limiting between pages

print(f"\n‚úÖ Total Work Items: {len(all_work_items)}")

# Show breakdown by project
if all_work_items:
    projects_found = {}
    for item in all_work_items:
        proj = item.get('jira', {}).get('project', {}).get('key', 'Unknown')
        projects_found[proj] = projects_found.get(proj, 0) + 1

    print(f"\nüìä Breakdown by project:")
    for proj, count in sorted(projects_found.items()):
        print(f"   {proj}: {count} work items")

# Show sample parent structure
if all_work_items:
    print(f"\nüìã Sample parent app structure:")
    for i, item in enumerate(all_work_items[:5]):
        exec_jira = item.get('jira', {})
        work_key = exec_jira.get('key', 'Unknown')
        parent = exec_jira.get('parent', {})
        if parent:
            parent_key = parent.get('key', 'N/A')
            print(f"   {work_key} ‚Üí Parent: {parent_key}")

# Extract ALL test cases from ALL work items
print(f"\n{'='*80}")
print(f"üìä STEP 2: EXTRACTING ALL TEST CASES")
print(f"{'='*80}")
print(f"This will take time due to rate limiting (~{len(all_work_items)} work items)")
print(f"Estimated time: {len(all_work_items) * 2 / 60:.1f} minutes\n")

detailed_data = []
summary_data = []
processed = 0
start_time = datetime.now()

for work_item in all_work_items:
    exec_id = work_item['issueId']
    exec_jira = work_item.get('jira', {})

    # Work Item details
    work_item_key = exec_jira.get('key', 'Unknown')
    work_item_summary = exec_jira.get('summary', 'Unknown')
    project_key = exec_jira.get('project', {}).get('key', 'Unknown')

    # Work Item dates
    work_item_status = exec_jira.get('status', {}).get('name', 'Unknown')
    work_item_created = exec_jira.get('created')
    work_item_updated = exec_jira.get('updated')
    work_item_due_date = exec_jira.get('duedate')
    work_item_resolution_date = exec_jira.get('resolutiondate')

    # Work Item other fields
    work_item_assignee = exec_jira.get('assignee', {}).get('displayName', 'Unassigned') if exec_jira.get('assignee') else 'Unassigned'
    work_item_priority = exec_jira.get('priority', {}).get('name', 'Unknown') if exec_jira.get('priority') else 'Unknown'
    work_item_labels = ', '.join(exec_jira.get('labels', []))
    work_item_components = ', '.join([c.get('name', '') for c in exec_jira.get('components', [])])

    # Parent/App info (SBSS-8, etc.)
    parent = exec_jira.get('parent', {})
    if parent and isinstance(parent, dict):
        parent_app_key = parent.get('key', 'No Parent')
        parent_app_summary = (parent.get('summary') or
                             parent.get('fields', {}).get('summary', '') if 'fields' in parent else '')
    else:
        parent_app_key = 'No Parent'
        parent_app_summary = ''

    # Environment
    environments = work_item.get('testEnvironments', [])
    env_name = environments[0] if environments else 'Unknown'

    # Progress tracking
    if processed % 10 == 0:
        elapsed = (datetime.now() - start_time).total_seconds()
        rate = processed / elapsed if elapsed > 0 else 0
        remaining = len(all_work_items) - processed
        eta_seconds = remaining / rate if rate > 0 else 0
        eta_minutes = eta_seconds / 60

        print(f"üìã Progress: {processed}/{len(all_work_items)} ({processed/len(all_work_items)*100:.1f}%) - "
              f"{len(detailed_data)} tests - ETA: {eta_minutes:.1f}min")

    try:
        # Get all test cases for this work item
        all_test_runs = []
        test_start = 0
        test_limit = 100
        test_page = 1

        while True:
            graphql_tests = f"""{{
                getTestRuns(testExecIssueIds: ["{exec_id}"], start: {test_start}, limit: {test_limit}) {{
                    total
                    results {{
                        status {{ name color }}
                        startedOn
                        finishedOn
                        test {{
                            issueId
                            jira(fields: [
                                "key",
                                "summary",
                                "labels",
                                "created",
                                "updated",
                                "creator",
                                "status",
                                "assignee",
                                "priority",
                                "components",
                                "duedate",
                                "resolutiondate"
                            ])
                        }}
                    }}
                }}
            }}"""

            resp = execute_graphql(graphql_tests, xray_token)

            if 'errors' in resp or 'data' not in resp or 'getTestRuns' not in resp['data']:
                break

            test_results = resp['data']['getTestRuns']['results']
            test_total = resp['data']['getTestRuns'].get('total', 0)

            all_test_runs.extend(test_results)

            if len(test_results) < test_limit or len(all_test_runs) >= test_total:
                break

            test_start += test_limit
            test_page += 1
            time.sleep(BASE_DELAY)  # Rate limiting between test pages

        # Count test statuses
        counts = {'PASSED': 0, 'FAILED': 0, 'BLOCKED': 0, 'EXECUTING': 0, 'TO DO': 0, 'PASS_THROUGH': 0, 'ABORTED': 0}

        for test in all_test_runs:
            test_jira = test['test']['jira']
            status = test['status']['name']

            if status in counts:
                counts[status] += 1
            else:
                counts[status] = 1

            # Add detailed test case row
            detailed_data.append({
                # Parent App level (SBSS-8, etc.)
                'parent_app_key': parent_app_key,
                'parent_app_summary': parent_app_summary,

                # Project
                'project': project_key,

                # Work Item (Test Execution) level
                'work_item_id': exec_id,
                'work_item_key': work_item_key,
                'work_item_summary': work_item_summary,
                'work_item_status': work_item_status,
                'work_item_created': work_item_created,
                'work_item_updated': work_item_updated,
                'work_item_due_date': work_item_due_date,
                'work_item_resolution_date': work_item_resolution_date,
                'work_item_assignee': work_item_assignee,
                'work_item_priority': work_item_priority,
                'work_item_labels': work_item_labels,
                'work_item_components': work_item_components,
                'work_item_environment': env_name,

                # Test Case level
                'test_id': test['test']['issueId'],
                'test_key': test_jira['key'],
                'test_summary': test_jira['summary'],
                'test_status': test_jira.get('status', {}).get('name', 'Unknown'),
                'test_execution_status': status,
                'test_execution_color': test['status'].get('color', ''),
                'test_created_date': test_jira.get('created', ''),
                'test_updated_date': test_jira.get('updated', ''),
                'test_due_date': test_jira.get('duedate', ''),
                'test_resolution_date': test_jira.get('resolutiondate', ''),
                'test_created_by': test_jira.get('creator', {}).get('displayName', 'Unknown'),
                'test_assignee': test_jira.get('assignee', {}).get('displayName', 'Unassigned') if test_jira.get('assignee') else 'Unassigned',
                'test_priority': test_jira.get('priority', {}).get('name', 'Unknown') if test_jira.get('priority') else 'Unknown',
                'test_labels': ", ".join(test_jira.get('labels', [])),
                'test_components': ", ".join([c['name'] for c in test_jira.get('components', [])]),

                # Execution dates
                'execution_started_date': test.get('startedOn', ''),
                'execution_finished_date': test.get('finishedOn', ''),
            })

        # Add summary row for this work item
        if len(all_test_runs) > 0:
            summary_data.append({
                'parent_app_key': parent_app_key,
                'parent_app_summary': parent_app_summary,
                'project': project_key,
                'work_item_key': work_item_key,
                'work_item_summary': work_item_summary,
                'work_item_status': work_item_status,
                'work_item_created': work_item_created,
                'work_item_updated': work_item_updated,
                'work_item_due_date': work_item_due_date,
                'work_item_resolution_date': work_item_resolution_date,
                'work_item_environment': env_name,
                'passed': counts.get('PASSED', 0),
                'failed': counts.get('FAILED', 0),
                'blocked': counts.get('BLOCKED', 0),
                'executing': counts.get('EXECUTING', 0),
                'todo': counts.get('TO DO', 0),
                'pass_through': counts.get('PASS_THROUGH', 0),
                'aborted': counts.get('ABORTED', 0),
                'total_tests': len(all_test_runs)
            })

        processed += 1

        # Refresh token every 50 work items
        if processed % 50 == 0:
            print(f"  üîÑ Refreshing authentication token...")
            xray_token = get_xray_token()

        # Rate limiting between work items
        time.sleep(BASE_DELAY)

    except Exception as e:
        print(f"  ‚ö† Error processing {work_item_key}: {e}")
        continue

# Create dataframes
df_summary = pd.DataFrame(summary_data)
df_detailed = pd.DataFrame(detailed_data)

total_time = (datetime.now() - start_time).total_seconds() / 60

print(f"\n{'='*80}")
print(f"‚úÖ EXTRACTION COMPLETE!")
print(f"{'='*80}")
print(f"‚è±Ô∏è  Total time: {total_time:.1f} minutes")
print(f"üìä Summary: {len(df_summary)} work items")
print(f"üìä Detailed: {len(df_detailed)} test cases")

# Show breakdown
if not df_summary.empty:
    print(f"\nüìà Breakdown by Project:")
    print(df_summary.groupby('project')['work_item_key'].count())

    print(f"\nüìà Top Parent Apps:")
    parent_counts = df_summary['parent_app_key'].value_counts().head(10)
    print(parent_counts)

    print(f"\nüìà Test Status Summary:")
    status_summary = df_summary[['passed', 'failed', 'blocked', 'todo']].sum()
    print(status_summary)

# Save files
filename_summary = 'COMPLETE_summary_all_work_items.csv'
filename_detailed = 'COMPLETE_detailed_all_tests.csv'

df_summary.to_csv(filename_summary, index=False)
df_detailed.to_csv(filename_detailed, index=False)

print(f"\nüíæ Files saved:")
print(f"   1. {filename_summary} ({len(df_summary)} rows, {len(df_summary.columns)} columns)")
print(f"   2. {filename_detailed} ({len(df_detailed)} rows, {len(df_detailed.columns)} columns)")

# Show sample
if not df_detailed.empty:
    print(f"\nüìã Sample data:")
    sample = df_detailed[['parent_app_key', 'work_item_key', 'test_key', 'test_execution_status']].head(10)
    print(sample.to_string(index=False))

# Try to download (Colab)
try:
    from google.colab import files
    files.download(filename_summary)
    files.download(filename_detailed)
    print("\nüì• Files downloaded!")
except:
    print("\nüí° Files saved to current directory")

print("\nüéâ COMPLETE!")

üöÄ XRAY DATA EXTRACTION - OPTIMIZED WITH RATE LIMITING
Structure: Parent App ‚Üí Work Item (Test Execution) ‚Üí Tests
‚úÖ Authenticated

üìã JQL Query: project in (SBB, SBDS, SBD, SBERP, SBIT, SBSR, SBSS) AND issuetype = \"Test Execution\" AND parent != null

üìä STEP 1: GETTING TEST EXECUTIONS WITH PARENTS

  Fetching page 1 (items 1-100)...
  Total Test Executions found: 107
  ‚úì Fetched 100 items (total so far: 100)
  Fetching page 2 (items 101-200)...
  ‚úì Fetched 7 items (total so far: 107)

‚úÖ Total Work Items: 107

üìä Breakdown by project:
   SBB: 27 work items
   SBD: 9 work items
   SBIT: 14 work items
   SBSR: 2 work items
   SBSS: 55 work items

üìã Sample parent app structure:
   SBIT-84 ‚Üí Parent: SBIT-2
   SBD-133 ‚Üí Parent: SBD-1
   SBD-131 ‚Üí Parent: SBD-2
   SBD-130 ‚Üí Parent: SBD-2
   SBSS-179 ‚Üí Parent: SBSS-8

üìä STEP 2: EXTRACTING ALL TEST CASES
This will take time due to rate limiting (~107 work items)
Estimated time: 3.6 minutes

üìã Progress: 0

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


üì• Files downloaded!

üéâ COMPLETE!


In [None]:
"""
Create Power BI IDs for linking tables
Maps parent_app_key to Application Acronym from Clone & Convey Apps
"""

import pandas as pd
import re

print("üîó CREATING POWER BI IDs FOR TABLE LINKING")
print("=" * 80)

# Load the test data
print("\nüìÅ Loading test data...")
df_tests = pd.read_csv('COMPLETE_detailed_all_tests.csv')
print(f"   Loaded {len(df_tests)} test cases")
print(f"   Unique parent apps: {df_tests['parent_app_key'].nunique()}")

# Load the apps reference data
print("\nüìÅ Loading apps reference data...")
df_apps = pd.read_excel('Clone_and_Convey_Apps.xlsx')
# Remove empty rows
df_apps = df_apps[df_apps['Application ID'].notna() | df_apps['Application Acronym'].notna()]
print(f"   Loaded {len(df_apps)} applications")

print("\n" + "=" * 80)
print("IDENTIFIER CHOICE: Application Acronym")
print("=" * 80)
print("‚úÖ Application Acronym is unique, readable, and user-friendly")
print("‚úÖ Will be used as the primary ID for Power BI relationships")

# Create mapping from parent_app_summary to Application Acronym
print("\n" + "=" * 80)
print("CREATING MAPPING")
print("=" * 80)

# Get unique parent apps from test data
parent_apps = df_tests[['parent_app_key', 'parent_app_summary']].drop_duplicates()
print(f"\nParent apps in test data: {len(parent_apps)}")

# Try to match by app name (fuzzy matching)
def extract_app_name_from_summary(summary):
    """Extract the app name from parent_app_summary"""
    if pd.isna(summary):
        return None
    # Pattern: "Category Clone: APP NAME" or "Category: APP NAME"
    patterns = [
        r'Clone:\s*(.+?)(?:\s*\(|$)',  # Match "Clone: APP NAME"
        r':\s*(.+?)(?:\s*\(|$)',        # Match ": APP NAME"
    ]
    for pattern in patterns:
        match = re.search(pattern, summary)
        if match:
            return match.group(1).strip()
    return summary.strip()

# Add extracted app name
parent_apps['extracted_name'] = parent_apps['parent_app_summary'].apply(extract_app_name_from_summary)

print(f"\nüìã Parent apps with extracted names:")
print(parent_apps[['parent_app_key', 'parent_app_summary', 'extracted_name']].head(10).to_string(index=False))

# Create manual mapping for the ones we can identify
manual_mapping = {
    'Lambda Test': 'ACCELQ',  # Lambda Test is likely ACCELQ
    'SFS': 'SFS',  # Dispatch Clone: SFS
    'QF Mobile': 'QFMOBILE',  # QF Mobile
    'Splunk': 'QF-SPLUNK',
    'WALLARM': 'WALLARM',
    # Add more as needed based on the data
}

# Try to match automatically
print("\n" + "=" * 80)
print("MATCHING PARENT APPS TO APPLICATION ACRONYMS")
print("=" * 80)

# Create a mapping dictionary
app_id_mapping = {}

for idx, row in parent_apps.iterrows():
    parent_key = row['parent_app_key']
    extracted = row['extracted_name']

    # Try direct match in Application Name or Acronym
    match = df_apps[
        (df_apps['Application Name'].str.contains(extracted, case=False, na=False)) |
        (df_apps['Application Acronym'].str.contains(extracted, case=False, na=False))
    ]

    if len(match) > 0:
        acronym = match.iloc[0]['Application Acronym']
        app_id_mapping[parent_key] = acronym
    elif extracted in manual_mapping:
        app_id_mapping[parent_key] = manual_mapping[extracted]
    else:
        # No match found - will use parent_key as fallback
        app_id_mapping[parent_key] = f"UNMAPPED_{parent_key}"

print(f"\n‚úÖ Created mappings for {len(app_id_mapping)} parent apps")

# Show the mappings
print(f"\nüìã Mapping Results:")
mapped_count = sum(1 for v in app_id_mapping.values() if not v.startswith('UNMAPPED_'))
unmapped_count = len(app_id_mapping) - mapped_count
print(f"   Mapped to acronyms: {mapped_count}")
print(f"   Unmapped (need manual review): {unmapped_count}")

print(f"\nüìã Sample mappings:")
for i, (key, value) in enumerate(list(app_id_mapping.items())[:15]):
    parent_summary = parent_apps[parent_apps['parent_app_key'] == key]['parent_app_summary'].iloc[0]
    status = "‚úÖ" if not value.startswith('UNMAPPED_') else "‚ö†Ô∏è"
    print(f"   {status} {key} ‚Üí {value}")
    print(f"      ({parent_summary})")

# Add the app_id to the test data
print("\n" + "=" * 80)
print("ADDING APP_ID TO TEST DATA")
print("=" * 80)

df_tests['app_id'] = df_tests['parent_app_key'].map(app_id_mapping)

print(f"\n‚úÖ Added app_id column to test data")
print(f"   Rows with app_id: {df_tests['app_id'].notna().sum()}")

# Show sample
print(f"\nüìã Sample of updated data:")
sample = df_tests[['parent_app_key', 'app_id', 'work_item_key', 'test_key']].head(10)
print(sample.to_string(index=False))

# Save the updated file
output_file = 'COMPLETE_detailed_all_tests_WITH_APP_ID.csv'
df_tests.to_csv(output_file, index=False)
print(f"\nüíæ Saved: {output_file}")

# Create a reference table for Power BI
print("\n" + "=" * 80)
print("CREATING APP REFERENCE TABLE FOR POWER BI")
print("=" * 80)

app_reference = []
for parent_key, app_id in app_id_mapping.items():
    parent_info = parent_apps[parent_apps['parent_app_key'] == parent_key].iloc[0]

    # Try to find in apps reference
    app_info = df_apps[df_apps['Application Acronym'] == app_id]

    if len(app_info) > 0:
        app_row = app_info.iloc[0]
        app_reference.append({
            'app_id': app_id,
            'parent_app_key': parent_key,
            'parent_app_summary': parent_info['parent_app_summary'],
            'application_name': app_row['Application Name'],
            'application_id_sysgen': app_row['Application ID'],
            'starburst_applicable': app_row['Starburst Applicable'],
            'final_disposition': app_row['Final Disposition'],
            'functional_lane': app_row['Functional Lane'],
        })
    else:
        app_reference.append({
            'app_id': app_id,
            'parent_app_key': parent_key,
            'parent_app_summary': parent_info['parent_app_summary'],
            'application_name': 'Not Found in Reference',
            'application_id_sysgen': None,
            'starburst_applicable': None,
            'final_disposition': None,
            'functional_lane': None,
        })

df_app_reference = pd.DataFrame(app_reference)

reference_file = 'APP_REFERENCE_TABLE.csv'
df_app_reference.to_csv(reference_file, index=False)
print(f"\nüíæ Saved: {reference_file}")

print(f"\nüìä App Reference Table:")
print(f"   Rows: {len(df_app_reference)}")
print(f"   Columns: {', '.join(df_app_reference.columns)}")

# Show unmapped apps that need manual review
unmapped = df_app_reference[df_app_reference['app_id'].str.startswith('UNMAPPED_', na=False)]
if len(unmapped) > 0:
    print("\n" + "=" * 80)
    print("‚ö†Ô∏è  UNMAPPED APPS - NEED MANUAL REVIEW")
    print("=" * 80)
    print(f"\n{len(unmapped)} apps could not be automatically mapped:")
    print(unmapped[['app_id', 'parent_app_key', 'parent_app_summary']].to_string(index=False))

print("\n" + "=" * 80)
print("‚úÖ COMPLETE!")
print("=" * 80)
print("\nüìÇ Output Files:")
print(f"   1. {output_file}")
print(f"      - Original test data with app_id column added")
print(f"      - Use app_id to link to app reference table")
print(f"\n   2. {reference_file}")
print(f"      - Master app reference table")
print(f"      - Links app_id to parent_app_key and application details")
print("\nüí° In Power BI:")
print("   - Link tables using 'app_id' field")
print("   - app_id is the primary key for relationships")

# Try to download (Colab)
try:
    from google.colab import files
    files.download(output_file)
    files.download(reference_file)
    print("\nüì• Files downloaded!")
except:
    print("\nüí° Files saved to current directory")

üîó CREATING POWER BI IDs FOR TABLE LINKING

üìÅ Loading test data...
   Loaded 1368 test cases
   Unique parent apps: 32

üìÅ Loading apps reference data...


FileNotFoundError: [Errno 2] No such file or directory: 'Clone_and_Convey_Apps.xlsx'

In [None]:
"""
Transform Jira Testing Data for Power BI
- Fix all date columns
- Extract test phase type (QA, UAT, IT)
- Split columns by test phase
- Map to reference data
"""

import pandas as pd
import numpy as np
import re
from datetime import datetime

print("üîÑ TRANSFORMING JIRA TESTING DATA FOR POWER BI")
print("=" * 80)

# ============================================================================
# LOAD DATA
# ============================================================================

print("\nüìÅ Loading data...")
df_tests = pd.read_excel('Jira_Testing_Data.xlsx')
print(f"   Test data: {len(df_tests)} rows")

mapping_df = pd.read_excel('App to Jira Testing Mapping_102925.xlsx', header=1)
# Clean mapping file - remove empty rows
mapping_df = mapping_df[mapping_df['App Name'].notna()]
print(f"   Mapping data: {len(mapping_df)} apps")

# ============================================================================
# STEP 1: EXTRACT TEST PHASE TYPE
# ============================================================================

print("\n" + "=" * 80)
print("STEP 1: EXTRACTING TEST PHASE TYPE")
print("=" * 80)

def extract_test_phase(summary, work_item_key):
    """Extract test phase from work_item_summary or work_item_key"""
    if pd.isna(summary):
        return 'Unknown'

    summary_lower = str(summary).lower()

    # Check for explicit mentions
    if 'qa (' in summary_lower or '- qa ' in summary_lower or 'qa(' in summary_lower:
        return 'QA'
    elif 'uat (' in summary_lower or '- uat ' in summary_lower or 'uat(' in summary_lower:
        return 'UAT'
    elif 'it systems' in summary_lower or '- it ' in summary_lower:
        return 'IT'

    # Fallback to work_item_key pattern (not reliable but better than nothing)
    return 'Unknown'

df_tests['test_phase'] = df_tests.apply(
    lambda row: extract_test_phase(row['work_item_summary'], row['work_item_key']),
    axis=1
)

print(f"\nüìä Test phase distribution:")
print(df_tests['test_phase'].value_counts())

# ============================================================================
# STEP 2: MAP TO GET TEST PHASE FROM MAPPING FILE
# ============================================================================

print("\n" + "=" * 80)
print("STEP 2: MAPPING TO GET DEFINITIVE TEST PHASE")
print("=" * 80)

# Create mapping dictionaries from mapping file
qa_keys = {}
uat_keys = {}
it_keys = {}

for idx, row in mapping_df.iterrows():
    app_name = row['App Name']

    if pd.notna(row['QA Issue Key']):
        qa_keys[str(row['QA Issue Key']).strip()] = {
            'app_name': app_name,
            'test_phase': 'QA',
            'initial_date': row.get('QA Initial Date'),
            'due_date': row.get('QA Due Date'),
            'completion_date': row.get('QA Date')
        }

    if pd.notna(row['UAT Issue Key']):
        uat_keys[str(row['UAT Issue Key']).strip()] = {
            'app_name': app_name,
            'test_phase': 'UAT',
            'initial_date': row.get('UAT Intial Date'),
            'due_date': row.get('UAT Due Date'),
            'completion_date': row.get('UAT Date')
        }

    if pd.notna(row['IT Issue Key']):
        it_keys[str(row['IT Issue Key']).strip()] = {
            'app_name': app_name,
            'test_phase': 'IT',
            'initial_date': row.get('IT Intial Date'),
            'due_date': row.get('IT Due Date'),
            'completion_date': None
        }

# Combine all mapping
all_phase_keys = {**qa_keys, **uat_keys, **it_keys}

print(f"   QA keys: {len(qa_keys)}")
print(f"   UAT keys: {len(uat_keys)}")
print(f"   IT keys: {len(it_keys)}")

# Map both parent_app_key AND work_item_key to get test phase and dates
def map_phase_info(parent_key, work_item_key):
    """Get phase info from mapping - check both parent and work item keys"""
    keys_to_check = []

    if pd.notna(parent_key):
        keys_to_check.append(str(parent_key).strip())
    if pd.notna(work_item_key):
        keys_to_check.append(str(work_item_key).strip())

    for key_str in keys_to_check:
        if key_str in all_phase_keys:
            info = all_phase_keys[key_str]
            return pd.Series({
                'mapped_test_phase': info['test_phase'],
                'mapped_initial_date': info['initial_date'],
                'mapped_due_date': info['due_date'],
                'mapped_completion_date': info['completion_date']
            })

    return pd.Series({'mapped_test_phase': None, 'mapped_initial_date': None,
                     'mapped_due_date': None, 'mapped_completion_date': None})

# Apply mapping - check both parent_app_key and work_item_key
phase_info = df_tests.apply(
    lambda row: map_phase_info(row['parent_app_key'], row['work_item_key']),
    axis=1
)
df_tests = pd.concat([df_tests, phase_info], axis=1)

# Use mapped phase if available, otherwise use extracted
df_tests['test_phase_final'] = df_tests['mapped_test_phase'].fillna(df_tests['test_phase'])

print(f"\nüìä Final test phase distribution:")
print(df_tests['test_phase_final'].value_counts())

# ============================================================================
# STEP 3: FIX ALL DATE COLUMNS
# ============================================================================

print("\n" + "=" * 80)
print("STEP 3: CONVERTING ALL DATES TO PROPER DATE FORMAT")
print("=" * 80)

date_columns = [
    'work_item_created', 'work_item_updated', 'work_item_due_date', 'work_item_resolution_date',
    'test_created_date', 'test_updated_date', 'test_due_date', 'test_resolution_date',
    'execution_started_date', 'execution_finished_date',
    'mapped_initial_date', 'mapped_due_date', 'mapped_completion_date'
]

def convert_to_date(value):
    """Convert various date formats to datetime (timezone-naive for Excel)"""
    if pd.isna(value) or value == '' or value == 'NaT':
        return None

    try:
        # Try parsing as datetime
        dt = pd.to_datetime(value, errors='coerce')
        if pd.notna(dt):
            # Remove timezone info for Excel compatibility
            if dt.tzinfo is not None:
                dt = dt.tz_localize(None)
            return dt
    except:
        pass

    return None

for col in date_columns:
    if col in df_tests.columns:
        print(f"   Converting {col}...")
        df_tests[col] = df_tests[col].apply(convert_to_date)
        non_null = df_tests[col].notna().sum()
        print(f"      ‚úì {non_null} non-null dates")

# Additional safety: strip timezone from all datetime columns
print("\n   Ensuring all dates are timezone-naive for Excel...")
for col in df_tests.columns:
    if df_tests[col].dtype == 'datetime64[ns, UTC]' or 'datetime64[ns,' in str(df_tests[col].dtype):
        df_tests[col] = df_tests[col].dt.tz_localize(None)

# ============================================================================
# STEP 4: CREATE PHASE-SPECIFIC COLUMNS
# ============================================================================

print("\n" + "=" * 80)
print("STEP 4: CREATING PHASE-SPECIFIC COLUMNS")
print("=" * 80)

# Initialize phase-specific columns
for phase in ['QA', 'UAT', 'IT']:
    df_tests[f'{phase.lower()}_initial_date'] = None
    df_tests[f'{phase.lower()}_due_date'] = None
    df_tests[f'{phase.lower()}_completion_date'] = None
    df_tests[f'{phase.lower()}_work_item_key'] = None
    df_tests[f'{phase.lower()}_work_item_created'] = None
    df_tests[f'{phase.lower()}_work_item_updated'] = None
    df_tests[f'{phase.lower()}_execution_started'] = None
    df_tests[f'{phase.lower()}_execution_finished'] = None

# Populate phase-specific columns
for idx, row in df_tests.iterrows():
    phase = row['test_phase_final']

    if pd.notna(phase) and phase in ['QA', 'UAT', 'IT']:
        phase_lower = phase.lower()

        # Dates from mapping
        df_tests.at[idx, f'{phase_lower}_initial_date'] = row['mapped_initial_date']
        df_tests.at[idx, f'{phase_lower}_due_date'] = row['mapped_due_date']
        df_tests.at[idx, f'{phase_lower}_completion_date'] = row['mapped_completion_date']

        # Work item info
        df_tests.at[idx, f'{phase_lower}_work_item_key'] = row['work_item_key']
        df_tests.at[idx, f'{phase_lower}_work_item_created'] = row['work_item_created']
        df_tests.at[idx, f'{phase_lower}_work_item_updated'] = row['work_item_updated']

        # Execution dates
        df_tests.at[idx, f'{phase_lower}_execution_started'] = row['execution_started_date']
        df_tests.at[idx, f'{phase_lower}_execution_finished'] = row['execution_finished_date']

print("\n‚úÖ Created phase-specific columns:")
print("   QA: initial_date, due_date, completion_date, work_item_key, etc.")
print("   UAT: initial_date, due_date, completion_date, work_item_key, etc.")
print("   IT: initial_date, due_date, completion_date, work_item_key, etc.")

# ============================================================================
# STEP 5: REORDER AND SELECT FINAL COLUMNS
# ============================================================================

print("\n" + "=" * 80)
print("STEP 5: ORGANIZING FINAL COLUMN STRUCTURE")
print("=" * 80)

final_columns = [
    # Identifiers
    'Application Acronym',
    'parent_app_key',
    'parent_app_summary',
    'project',
    'test_phase_final',

    # QA Phase
    'qa_work_item_key',
    'qa_initial_date',
    'qa_due_date',
    'qa_completion_date',
    'qa_work_item_created',
    'qa_work_item_updated',
    'qa_execution_started',
    'qa_execution_finished',

    # UAT Phase
    'uat_work_item_key',
    'uat_initial_date',
    'uat_due_date',
    'uat_completion_date',
    'uat_work_item_created',
    'uat_work_item_updated',
    'uat_execution_started',
    'uat_execution_finished',

    # IT Phase
    'it_work_item_key',
    'it_initial_date',
    'it_due_date',
    'it_completion_date',
    'it_work_item_created',
    'it_work_item_updated',
    'it_execution_started',
    'it_execution_finished',

    # Work Item Details (current phase)
    'work_item_id',
    'work_item_key',
    'work_item_summary',
    'work_item_status',
    'work_item_assignee',
    'work_item_priority',
    'work_item_environment',

    # Test Case Details
    'test_id',
    'test_key',
    'test_summary',
    'test_status',
    'test_execution_status',
    'test_execution_color',
    'test_assignee',
    'test_priority',
    'test_labels',
    'test_components',
]

# Select only columns that exist
final_columns_exist = [col for col in final_columns if col in df_tests.columns]
df_final = df_tests[final_columns_exist].copy()

print(f"\n‚úÖ Final dataset:")
print(f"   Rows: {len(df_final)}")
print(f"   Columns: {len(df_final.columns)}")

# ============================================================================
# STEP 6: DATA QUALITY SUMMARY
# ============================================================================

print("\n" + "=" * 80)
print("DATA QUALITY SUMMARY")
print("=" * 80)

print(f"\nüìä Test Phase Coverage:")
for phase in ['QA', 'UAT', 'IT']:
    count = (df_final['test_phase_final'] == phase).sum()
    pct = count / len(df_final) * 100
    print(f"   {phase}: {count} tests ({pct:.1f}%)")

unknown = (df_final['test_phase_final'] == 'Unknown').sum()
print(f"   Unknown: {unknown} tests ({unknown/len(df_final)*100:.1f}%)")

print(f"\nüìä Date Field Population:")
for phase in ['QA', 'UAT', 'IT']:
    phase_lower = phase.lower()
    due_count = df_final[f'{phase_lower}_due_date'].notna().sum()
    complete_count = df_final[f'{phase_lower}_completion_date'].notna().sum()
    exec_count = df_final[f'{phase_lower}_execution_started'].notna().sum()
    print(f"   {phase}:")
    print(f"      Due dates: {due_count}")
    print(f"      Completion dates: {complete_count}")
    print(f"      Execution dates: {exec_count}")

# ============================================================================
# STEP 7: SAVE OUTPUT
# ============================================================================

output_file = 'Jira_Testing_Data_PowerBI_Ready.xlsx'

print("\n" + "=" * 80)
print("SAVING OUTPUT")
print("=" * 80)

# Save to Excel with proper date formatting
with pd.ExcelWriter(output_file, engine='openpyxl', datetime_format='yyyy-mm-dd') as writer:
    df_final.to_excel(writer, sheet_name='Testing Data', index=False)

print(f"\n‚úÖ Saved: {output_file}")

# Create a data dictionary
print("\nüìã Creating data dictionary...")

def get_column_description(col_name):
    """Get description for each column"""
    descriptions = {
        'Application Acronym': 'Application identifier/acronym',
        'parent_app_key': 'Parent app Jira key (e.g., SBSS-8)',
        'parent_app_summary': 'Parent app name/description',
        'project': 'Jira project key',
        'test_phase_final': 'Test phase: QA, UAT, or IT',
        'qa_work_item_key': 'QA work item (Test Execution) key',
        'qa_initial_date': 'QA testing initial/start date',
        'qa_due_date': 'QA testing target due date',
        'qa_completion_date': 'QA testing actual completion date',
        'qa_work_item_created': 'QA work item created date',
        'qa_work_item_updated': 'QA work item last updated date',
        'qa_execution_started': 'QA test execution start date',
        'qa_execution_finished': 'QA test execution completion date',
        'uat_work_item_key': 'UAT work item (Test Execution) key',
        'uat_initial_date': 'UAT testing initial/start date',
        'uat_due_date': 'UAT testing target due date',
        'uat_completion_date': 'UAT testing actual completion date',
        'uat_work_item_created': 'UAT work item created date',
        'uat_work_item_updated': 'UAT work item last updated date',
        'uat_execution_started': 'UAT test execution start date',
        'uat_execution_finished': 'UAT test execution completion date',
        'it_work_item_key': 'IT work item (Test Execution) key',
        'it_initial_date': 'IT testing initial/start date',
        'it_due_date': 'IT testing target due date',
        'it_completion_date': 'IT testing actual completion date',
        'it_work_item_created': 'IT work item created date',
        'it_work_item_updated': 'IT work item last updated date',
        'it_execution_started': 'IT test execution start date',
        'it_execution_finished': 'IT test execution completion date',
        'work_item_id': 'Work item internal ID',
        'work_item_key': 'Current work item Jira key',
        'work_item_summary': 'Work item description',
        'work_item_status': 'Work item status (Open, Done, etc.)',
        'work_item_assignee': 'Person assigned to work item',
        'work_item_priority': 'Work item priority level',
        'work_item_environment': 'Testing environment',
        'test_id': 'Test case internal ID',
        'test_key': 'Individual test case key',
        'test_summary': 'Test case description',
        'test_status': 'Test case status',
        'test_execution_status': 'Test result: PASSED, FAILED, BLOCKED, etc.',
        'test_execution_color': 'Color code for test status',
        'test_assignee': 'Person assigned to test case',
        'test_priority': 'Test case priority level',
        'test_labels': 'Test case labels/tags',
        'test_components': 'Test case components',
    }
    return descriptions.get(col_name, '')

data_dict = []
for col in df_final.columns:
    dtype = str(df_final[col].dtype)
    non_null = df_final[col].notna().sum()
    null_pct = (len(df_final) - non_null) / len(df_final) * 100

    data_dict.append({
        'Column Name': col,
        'Data Type': dtype,
        'Non-Null Count': non_null,
        'Null %': f"{null_pct:.1f}%",
        'Description': get_column_description(col)
    })

df_dict = pd.DataFrame(data_dict)

dict_file = 'Data_Dictionary.xlsx'
df_dict.to_excel(dict_file, index=False)
print(f"‚úÖ Saved: {dict_file}")

print("\n" + "=" * 80)
print("‚úÖ TRANSFORMATION COMPLETE!")
print("=" * 80)

print("\nüìÇ Output Files:")
print(f"   1. {output_file}")
print(f"      - Ready for Power BI import")
print(f"      - All dates properly formatted")
print(f"      - Separate columns for QA, UAT, IT")
print(f"\n   2. {dict_file}")
print(f"      - Data dictionary with column descriptions")

print("\nüí° Next Steps:")
print("   1. Import into Power BI")
print("   2. Create date table for time intelligence")
print("   3. Build relationships using parent_app_key")
print("   4. Create measures for pass rates, completion %, etc.")

# Try to download (Colab)
try:
    from google.colab import files
    files.download(output_file)
    files.download(dict_file)
    print("\nüì• Files downloaded!")
except:
    print("\nüí° Files saved to current directory")

print("\nüéâ DONE!")

üîÑ TRANSFORMING JIRA TESTING DATA FOR POWER BI

üìÅ Loading data...
   Test data: 1368 rows
   Mapping data: 52 apps

STEP 1: EXTRACTING TEST PHASE TYPE

üìä Test phase distribution:
test_phase
QA         645
UAT        634
Unknown     84
IT           5
Name: count, dtype: int64

STEP 2: MAPPING TO GET DEFINITIVE TEST PHASE
   QA keys: 52
   UAT keys: 52
   IT keys: 1

üìä Final test phase distribution:
test_phase_final
QA         645
UAT        634
Unknown     84
IT           5
Name: count, dtype: int64

STEP 3: CONVERTING ALL DATES TO PROPER DATE FORMAT
   Converting work_item_created...
      ‚úì 1368 non-null dates
   Converting work_item_updated...
      ‚úì 1368 non-null dates
   Converting work_item_due_date...
      ‚úì 1030 non-null dates
   Converting work_item_resolution_date...
      ‚úì 55 non-null dates
   Converting test_created_date...
      ‚úì 1368 non-null dates
   Converting test_updated_date...
      ‚úì 1368 non-null dates
   Converting test_due_date...
     

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


üì• Files downloaded!

üéâ DONE!


üöÄ XRAY DATA EXTRACTION + DEADLINE FLAGS - COMPLETE PIPELINE

üìÅ Loading mapping file...
‚úÖ Loaded 52 apps from mapping
‚úÖ Created lookups for 105 keys from mapping

üìä EXTRACTING ALL TEST EXECUTIONS FROM XRAY

üîç SBB: 28 work items
üîç SBDS: 1 work items
üîç SBD: 9 work items
üîç SBERP: 0 work items
üîç SBIT: 14 work items
üîç SBSR: 2 work items
üîç SBSS: 53 work items

‚úÖ Total: 107 work items

üìã Matching work items to mapping...
‚úÖ Matched 67 work items to mapping

üìä EXTRACTING TEST CASES FROM EACH WORK ITEM

üìã Progress: 0/107 - 0 test cases
üìã Progress: 20/107 - 47 test cases
üìã Progress: 40/107 - 485 test cases
üìã Progress: 60/107 - 625 test cases
üìã Progress: 80/107 - 721 test cases
üìã Progress: 100/107 - 1228 test cases

‚úÖ Extracted 1265 test cases from 107 work items

PROCESSING DATA AND MAPPING TEST PHASES

üìä Test phase distribution:
test_phase_final
Unknown    485
UAT        451
QA         329
Name: count, dtype: int64

CONVERTING ALL

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


üì• Files downloaded!

üéâ DONE!


In [3]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

print("üöÄ ADDING DEADLINE FLAGS TO JIRA TEST REPORTING")
print("=" * 80)



print("\nüìÅ Loading existing file...")
df = pd.read_excel('Jira_Test_Reporting.xlsx')
print(f"   Loaded {len(df)} rows with {len(df.columns)} columns")


today = pd.Timestamp(datetime.now().date())
print(f"\nüìÖ Today's date: {today.strftime('%Y-%m-%d')}")



print("\n" + "=" * 80)
print("CREATING DEADLINE FLAGS")
print("=" * 80)

# Initialize new columns
df['days_until_due'] = None
df['nearest_due_date'] = None
df['due_date_phase'] = None
df['deadline_status'] = None
df['is_reopened'] = 'No'
df['is_completed'] = 'No'

print("\n   Calculating flags for each row...")

# Process each row
for idx, row in df.iterrows():

    if idx % 100 == 0:
        print(f"   Progress: {idx}/{len(df)} rows processed...")

    if pd.notna(row['test_status']) and str(row['test_status']).lower() == 'reopened':
        df.at[idx, 'is_reopened'] = 'Yes'


    if pd.notna(row['work_item_status']) and str(row['work_item_status']).lower() == 'closed':
        df.at[idx, 'is_completed'] = 'Yes'
    elif pd.notna(row['test_execution_status']) and str(row['test_execution_status']).upper() == 'PASSED':
        df.at[idx, 'is_completed'] = 'Yes'



    # If work item is closed, everything is completed - skip deadline logic
    if pd.notna(row['work_item_status']) and str(row['work_item_status']).lower() == 'closed':
        df.at[idx, 'deadline_status'] = 'Completed'
        continue

    # If test execution is PASSED, mark as completed
    if pd.notna(row['test_execution_status']) and str(row['test_execution_status']).upper() == 'PASSED':
        df.at[idx, 'deadline_status'] = 'Completed'
        continue

    # Otherwise, check due dates for items that are TO DO or EXECUTING
    qa_due = row['qa_due_date']
    uat_due = row['uat_due_date']
    it_due = row['it_due_date']

    # Find the nearest due date
    due_dates = []
    if pd.notna(qa_due):
        due_dates.append(('QA', qa_due))
    if pd.notna(uat_due):
        due_dates.append(('UAT', uat_due))
    if pd.notna(it_due):
        due_dates.append(('IT', it_due))

    # If no due dates, mark as "No Deadline"
    if len(due_dates) == 0:
        df.at[idx, 'deadline_status'] = 'No Deadline'
        continue

    # Sort by date to find the nearest
    due_dates.sort(key=lambda x: x[1])
    nearest_phase, nearest_date = due_dates[0]

    # Store nearest due date info
    df.at[idx, 'nearest_due_date'] = nearest_date
    df.at[idx, 'due_date_phase'] = nearest_phase

    # Calculate days until due
    days_diff = (nearest_date - today).days
    df.at[idx, 'days_until_due'] = days_diff

    # Determine deadline status based on days until due
    if days_diff < 0:
        df.at[idx, 'deadline_status'] = 'Past Due'
    elif days_diff <= 3:
        df.at[idx, 'deadline_status'] = 'Deadline Approaching'
    else:
        df.at[idx, 'deadline_status'] = 'On Track'

print(f"\n‚úÖ All {len(df)} rows processed!")


print("\n" + "=" * 80)
print("SUMMARY STATISTICS")
print("=" * 80)

print("\nüìä Deadline Status Distribution:")
print(df['deadline_status'].value_counts().to_string())

print("\nüìä Is Reopened Distribution:")
print(df['is_reopened'].value_counts().to_string())

print("\nüìä Is Completed Distribution:")
print(df['is_completed'].value_counts().to_string())

print("\nüìä Due Date Phase Distribution (for items with due dates):")
phase_counts = df['due_date_phase'].value_counts()
if len(phase_counts) > 0:
    print(phase_counts.to_string())
else:
    print("   No items with due dates")

# Show some examples
print("\nüìã Sample rows with 'Deadline Approaching':")
approaching = df[df['deadline_status'] == 'Deadline Approaching']
if len(approaching) > 0:
    sample_cols = ['work_item_key', 'test_key', 'nearest_due_date', 'days_until_due', 'deadline_status']
    print(approaching[sample_cols].head(5).to_string())
else:
    print("   No items with approaching deadlines")

print("\nüìã Sample rows with 'Past Due':")
past_due = df[df['deadline_status'] == 'Past Due']
if len(past_due) > 0:
    sample_cols = ['work_item_key', 'test_key', 'nearest_due_date', 'days_until_due', 'deadline_status']
    print(past_due[sample_cols].head(5).to_string())
else:
    print("   No past due items")

# ============================================================================
# STEP 5: SAVE OUTPUT
# ============================================================================

output_file = 'Jira_Test_Reporting_With_Deadline_Flags.xlsx'

print("\n" + "=" * 80)
print("SAVING OUTPUT")
print("=" * 80)

# Save to Excel with proper date formatting
with pd.ExcelWriter(output_file, engine='openpyxl', datetime_format='yyyy-mm-dd') as writer:
    df.to_excel(writer, sheet_name='Testing Data', index=False)

print(f"\n‚úÖ Saved: {output_file}")
print(f"   Rows: {len(df)}")
print(f"   Columns: {len(df.columns)}")

print("\nüìä New Columns Added:")
print("   1. days_until_due - Days until nearest due date (negative = overdue)")
print("   2. nearest_due_date - The actual nearest due date")
print("   3. due_date_phase - Which phase (QA/UAT/IT) has the nearest deadline")
print("   4. deadline_status - Past Due, Deadline Approaching, On Track, Completed, No Deadline")
print("   5. is_reopened - Yes if test_status is Reopened")
print("   6. is_completed - Yes if work_item closed or test passed")

# Create quick summary
print("\n" + "=" * 80)
print("‚úÖ COMPLETE!")
print("=" * 80)

print("\nüí° Power BI Next Steps:")
print("   ‚Ä¢ Import the new file into Power BI")
print("   ‚Ä¢ Use 'deadline_status' column for your donut charts")
print("   ‚Ä¢ Filter by 'is_reopened' to highlight reopened items")
print("   ‚Ä¢ Use 'days_until_due' for custom thresholds")
print("   ‚Ä¢ Filter by 'is_completed' to exclude completed items")

# Try to download (Colab)
try:
    from google.colab import files
    files.download(output_file)
    print("\nüì• File downloaded!")
except:
    print("\nüí° File saved to current directory")

print("\nüéâ DONE!")

üöÄ ADDING DEADLINE FLAGS TO JIRA TEST REPORTING

üìÅ Loading existing file...
   Loaded 1368 rows with 46 columns

üìÖ Today's date: 2025-11-05

CREATING DEADLINE FLAGS

   Calculating flags for each row...
   Progress: 0/1368 rows processed...
   Progress: 100/1368 rows processed...
   Progress: 200/1368 rows processed...
   Progress: 300/1368 rows processed...
   Progress: 400/1368 rows processed...
   Progress: 500/1368 rows processed...
   Progress: 600/1368 rows processed...
   Progress: 700/1368 rows processed...
   Progress: 800/1368 rows processed...
   Progress: 900/1368 rows processed...
   Progress: 1000/1368 rows processed...
   Progress: 1100/1368 rows processed...
   Progress: 1200/1368 rows processed...
   Progress: 1300/1368 rows processed...

‚úÖ All 1368 rows processed!

SUMMARY STATISTICS

üìä Deadline Status Distribution:
deadline_status
No Deadline    823
On Track       487
Completed       58

üìä Is Reopened Distribution:
is_reopened
No     1362
Yes       6


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


üì• File downloaded!

üéâ DONE!
