In [43]:
# Importing libraries for data retrieval
from dotenv import load_dotenv
import requests
import zipfile
import shutil

# Importing libraries for data processing
import os
import xml.etree.ElementTree as ET
from collections import defaultdict
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [51]:
# Load environment variables from .env file
load_dotenv()

TOKEN = os.getenv("TOKEN")

if TOKEN is None:
    raise ValueError("TOKEN environment variable is not set. Please set it in the .env file.")

OWNER = os.getenv("OWNER")
REPO = os.getenv("REPO")
START_TIME = os.getenv("START_TIME")
END_TIME = os.getenv("END_TIME")
DESTINATION_DIR_ZIPS = os.getenv("DESTINATION_DIR_ZIPS")
DESTINATION_DIR_DATA = os.getenv("DESTINATION_DIR_DATA")
WORKFLOW_ID = os.getenv("WORKFLOW_ID")

# Create the destination directories if they don't exist
os.makedirs(DESTINATION_DIR_ZIPS, exist_ok=True)
os.makedirs(DESTINATION_DIR_DATA, exist_ok=True)

# Ensure the destination directories are writable
os.chmod(DESTINATION_DIR_ZIPS, 0o777)
os.chmod(DESTINATION_DIR_DATA, 0o777)

# Fetch all workflow runs for the specified workflow
url = f"https://api.github.com/repos/{OWNER}/{REPO}/actions/workflows/{WORKFLOW_ID}/runs"
headers = {"Authorization": f"token {TOKEN}"}
response = requests.get(url, headers=headers)
response.raise_for_status()  # Raise an exception for failed requests

# Extract run IDs from the JSON response, filtering by the time range
runs = response.json()["workflow_runs"]
run_ids = [run["id"] for run in runs if START_TIME <= run["created_at"] <= END_TIME]

# Check if any runs are found in the time range
if not run_ids:
    print("No runs found in the specified time range.")
    exit(1)

In [17]:
# Parse the data
def parse_test_report(file_path: str) -> ET.Element:
    tree = ET.parse(file_path)
    return tree.getroot()

# Create a test data object for each file
def extract_test_data(root: ET.Element, test_type: str) -> list:
    test_data = []
    for testcase in root.findall('testcase'):
        name = testcase.attrib.get('name')
        classname = testcase.attrib.get('classname')
        time = float(testcase.attrib.get('time', 0))
        failure = testcase.find('failure') is not None
        error = testcase.find('error') is not None
        test_data.append({
            'name': name,
            'classname': classname,
            'test_type': test_type,
            'time': time,
            'failure': failure,
            'error': error
        })
    return test_data

# Extracts the data from all files and returns in list form for analysis
def collect_test_data(report_dir: str, test_type: str) -> list:
    test_data = []
    for root, _, files in os.walk(report_dir):
        for file_name in files:
            if file_name.endswith('.xml'):
                file_path = os.path.join(root, file_name)
                root_element = parse_test_report(file_path)
                data = extract_test_data(root_element, test_type)
                test_data.append(data)
    return test_data

In [18]:
# Retrieves the relevant info from the test data and creates a dataframe
def analyze_test_data(test_data: list) -> pd.DataFrame:
    test_runs = defaultdict(lambda: {'test_class': '','runs': 0, 'failures': 0, 'total_time': 0.0, 'test_type': ''})

    for run_data in test_data:
        for test in run_data:
            test_name = f"{test['name']}"
            test_runs[test_name]['runs'] += 1
            test_runs[test_name]['test_class'] = test['classname']
            test_runs[test_name]['test_type'] = test['test_type']
            test_runs[test_name]['total_time'] += test['time']
            if test['failure'] or test['error']:
                test_runs[test_name]['failures'] += 1

    # Create a DataFrame
    test_data_df = pd.DataFrame([
        {'test_name': test_name, 'test_class': counts['test_class'], 'test_type': counts['test_type'], 'runs': counts['runs'], 'failures': counts['failures'], 'total_time': counts['total_time']}
        for test_name, counts in test_runs.items()
    ])

    # Calculate failure percentage for each test
    test_data_df['failure_percentage'] = (test_data_df['failures'] / test_data_df['runs']) * 100

    # Threshold percentage for flakiness
    threshold_percentage = 5  # Adjust as needed

    # Mark a test as flakey if failure percentage is > threshold
    test_data_df['flakey'] = test_data_df['failure_percentage'] > threshold_percentage
    test_data_df['avg_time'] = test_data_df['total_time'] / test_data_df['runs']

    return test_data_df


In [52]:
# Retrieving data
test_data = []

# Loop through each workflow run
for run_id in run_ids:
    print(f"Downloading artifacts from run {run_id}...")

    # Get artifacts for the current run
    url = f"https://api.github.com/repos/{OWNER}/{REPO}/actions/runs/{run_id}/artifacts"
    response = requests.get(url, headers=headers)
    response.raise_for_status()  # Raise an exception for failed requests
    
    # Extract artifact URLs and names from the JSON response
    artifacts = response.json()["artifacts"]
    for artifact in artifacts:
        if "Test Results" in artifact["name"]:
            artifact_name = artifact["name"]
            artifact_url = artifact["archive_download_url"]

            # Download the artifact to the zips folder
            filename = f"{artifact_name}.zip"
            zip_path = os.path.join(DESTINATION_DIR_ZIPS, filename)
            response = requests.get(artifact_url, headers=headers)
            response.raise_for_status()  # Raise an exception for failed requests
            with open(zip_path, "wb") as f:
                f.write(response.content)

            # Create a folder for unzipped artifact
            artifact_dir = os.path.join(DESTINATION_DIR_DATA, artifact_name)
            os.makedirs(artifact_dir, exist_ok=True)

            # Ensure the directory is writable
            os.chmod(artifact_dir, 0o777)

            # Unzip the artifact
            with zipfile.ZipFile(zip_path, "r") as zip_ref:
                zip_ref.extractall(artifact_dir)

            # Load data to list
            test_data.append(collect_test_data(report_dir="./temp", test_type=artifact_name[:30]))
            
            # Delete the zip and extracted files after unzipping to prevent storage from filling up
            os.remove(zip_path)
            shutil.rmtree(artifact_dir)

print("All artifacts downloaded and extracted successfully.")

Downloading artifacts from run 9230076215...
Test {'id': 1536400860, 'node_id': 'MDg6QXJ0aWZhY3QxNTM2NDAwODYw', 'name': '(_lib-cache_integrationTest) - 20240524212101-dae9c99072-PTR-43 Test Results', 'size_in_bytes': 18026, 'url': 'https://api.github.com/repos/ecobee/op-ts-server-core/actions/artifacts/1536400860', 'archive_download_url': 'https://api.github.com/repos/ecobee/op-ts-server-core/actions/artifacts/1536400860/zip', 'expired': False, 'created_at': '2024-05-24T21:30:00Z', 'updated_at': '2024-05-24T21:30:00Z', 'expires_at': '2024-05-31T21:20:54Z', 'workflow_run': {'id': 9230076215, 'repository_id': 297339652, 'head_repository_id': 297339652, 'head_branch': 'PTR-43-auth-logging-no-filter', 'head_sha': 'dae9c990725f611aa3c14399a378ba3ea7e65db9'}}...
Test {'id': 1536403054, 'node_id': 'MDg6QXJ0aWZhY3QxNTM2NDAzMDU0', 'name': '(communicator_integrationTest) - 20240524212101-dae9c99072-PTR-43 Test Results', 'size_in_bytes': 92361, 'url': 'https://api.github.com/repos/ecobee/op-ts-se

In [None]:
# Processing and analyzing data
all_test_data = []
for test_data_run in test_data:
    all_test_data.extend(test_data_run)

test_data_df = analyze_test_data(all_test_data)
# Display the DataFrame
display(test_data_df)

In [None]:
# Seeing all parts of repo which contains tests
def get_class_mapping(row):
    classname = row['test_class'].split('.')[2]
    return classname
test_data_df['class_location'] = test_data_df.apply(get_class_mapping, axis=1)
display(test_data_df['class_location'].unique())


In [49]:
# Adding github link for each test

folder_class_map = { # Did some for now
    "foundation": "foundation/test/source",
    # "cache": "lib-cache",
    "communicator": "communicator/test/source",
    "ests": "ests-pubsub/src/test/java",
    "events": "lib-events/test/java",
    "reportprocessor": "report-processor/src/test/java",
    "webapp": "gui/test/src",
    "contractorservice": "libs/communicator/contractor-service-client",
}

# Base URL for the GitHub repository
base_github_url = f"https://github.com/{OWNER}/{REPO}/tree/main"

def create_github_link(row):
    location = row['test_class'].split('.')[2]
    # Replace dots in the class name with slashes to form the path
    class_path = row['test_class'].replace('.', '/')
    # Construct the GitHub URL
    return f"{base_github_url}/{folder_class_map.get(location)}/{class_path}.java"

# Add a column for GitHub links 
test_data_df['github_link'] = test_data_df.apply(create_github_link, axis=1)

# NOTE
Not all github links will work, just did the quickest approach to get the majority of links for ease of searching up

In [None]:
top_time_tests = test_data_df.sort_values(by='avg_time', ascending=False).head(50)

display(top_time_tests)

In [None]:
flakey_tests = test_data_df[test_data_df['flakey']].sort_values(by='failure_percentage', ascending=False)
display(flakey_tests)

In [None]:
# Calculate the percentage of flakey tests per class_location
flakey_counts = test_data_df.groupby('class_location')['flakey'].sum().reset_index(name='flakey_count')
total_counts = test_data_df.groupby('class_location')['flakey'].count().reset_index(name='total_count')

flakey_percentage_per_class = pd.merge(flakey_counts, total_counts, on='class_location')
flakey_percentage_per_class['flakey_percentage'] = (flakey_percentage_per_class['flakey_count'] / flakey_percentage_per_class['total_count']) * 100

# Calculate the average time per class_location
avg_time_per_class = test_data_df.groupby('class_location')['avg_time'].mean().reset_index()

# Merge the two DataFrames
class_metrics_df = pd.merge(flakey_percentage_per_class, avg_time_per_class, on='class_location')

# Plotting
fig, ax = plt.subplots(2, 1, figsize=(12, 10))

# Plot flakey percentage
sns.barplot(x='flakey_percentage', y='class_location', data=class_metrics_df.sort_values('flakey_percentage', ascending=False), ax=ax[0])
ax[0].set_title('Class Locations with Highest Percentage of Flakey Tests')
ax[0].set_xlabel('Flakey Percentage')
ax[0].set_ylabel('Class Location')

# Plot average time
sns.barplot(x='avg_time', y='class_location', data=class_metrics_df.sort_values('avg_time', ascending=False), ax=ax[1])
ax[1].set_title('Class Locations with Highest Average Time')
ax[1].set_xlabel('Average Time (s)')
ax[1].set_ylabel('Class Location')

# Rotate class location labels for better readability
for axis in ax:
    for label in axis.get_yticklabels():
        label.set_rotation(0)

plt.tight_layout()
plt.show()

This above is just a quick attempt to see if there is a way to graph the data and see which areas are the biggest pain points right now in terms of time and flakeyness (probably needs to be better processed)

In [39]:
# Export data
test_data_df.to_csv('test_data.csv', index=False)